[ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513030#comment-16513030
 ] 

ASF GitHub Bot commented on KAFKA-6975:
---------------------------------------

hachikuji closed pull request #5229: KAFKA-6975: Fix replica fetching from 
non-batch-aligned log start offset
URL: https://github.com/apache/kafka/pull/5229
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 68faf00c079..9339d29202f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.LeaderAndIsr
 import kafka.api.Request
+import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.metrics.KafkaMetricsGroup
@@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.AdminZkClient
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, 
NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, 
NotEnoughReplicasException, NotLeaderForPartitionException, 
PolicyViolationException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
@@ -187,6 +188,10 @@ class Partition(val topic: String,
 
   def getReplica(replicaId: Int = localBrokerId): Option[Replica] = 
Option(allReplicasMap.get(replicaId))
 
+  def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+    getReplica(replicaId).getOrElse(
+      throw new ReplicaNotAvailableException(s"Replica $replicaId is not 
available for partition $topicPartition"))
+
   def leaderReplicaIfLocal: Option[Replica] =
     leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
 
@@ -549,15 +554,41 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
-  def appendRecordsToFutureReplica(records: MemoryRecords) {
-    
getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records)
+  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
+      if (isFuture)
+        
getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
+      else {
+        // The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
+        // is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
+        inReadLock(leaderIsrUpdateLock) {
+           getReplicaOrException().log.get.appendAsFollower(records)
+        }
+      }
   }
 
-  def appendRecordsToFollower(records: MemoryRecords) {
-    // The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
-    // is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
-    inReadLock(leaderIsrUpdateLock) {
-      getReplica().get.log.get.appendAsFollower(records)
+  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: 
Boolean) {
+    try {
+      doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+    } catch {
+      case e: UnexpectedAppendOffsetException =>
+        val replica = if (isFuture) 
getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException()
+        val logEndOffset = replica.logEndOffset.messageOffset
+        if (logEndOffset == replica.logStartOffset &&
+            e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+          // This may happen if the log start offset on the leader (or current 
replica) falls in
+          // the middle of the batch due to delete records request and the 
follower tries to
+          // fetch its first offset from the leader.
+          // We handle this case here instead of Log#append() because we will 
need to remove the
+          // segment that start with log start offset and create a new one 
with earlier offset
+          // (base offset of the batch), which will move recoveryPoint 
backwards, so we will need
+          // to checkpoint the new recovery point before we append
+          val replicaName = if (isFuture) "future replica" else "follower"
+          info(s"Unexpected offset in append to $topicPartition. First offset 
${e.firstOffset} is less than log start offset ${replica.logStartOffset}." +
+               s" Since this is the first record to be appended to the 
$replicaName's log, will start the log from offset ${e.firstOffset}.")
+          truncateFullyAndStartAt(e.firstOffset, isFuture)
+          doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+        } else
+          throw e
     }
   }
 
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala 
b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
new file mode 100644
index 00000000000..f8daaa4a181
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower received records with non-monotonically increasing 
offsets
+ */
+class OffsetsOutOfOrderException(message: String) extends 
RuntimeException(message) {
+}
+
diff --git 
a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala 
b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
new file mode 100644
index 00000000000..e719a93006d
--- /dev/null
+++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower or the future replica received records from the 
leader (or current
+ * replica) with first offset less than expected next offset. 
+ * @param firstOffset The first offset of the records to append
+ * @param lastOffset  The last offset of the records to append
+ */
+class UnexpectedAppendOffsetException(val message: String,
+                                      val firstOffset: Long,
+                                      val lastOffset: Long) extends 
RuntimeException(message) {
+}
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 1e9ae4df404..8b62918bc97 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, 
TimeUnit}
 
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common.{InvalidOffsetException, KafkaException, LongRef}
+import kafka.common.{InvalidOffsetException, KafkaException, LongRef, 
UnexpectedAppendOffsetException, OffsetsOutOfOrderException}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, 
LogOffsetMetadata}
 import kafka.utils._
@@ -634,6 +634,8 @@ class Log(@volatile var dir: File,
    * @param assignOffsets Should the log assign offsets to this message set or 
blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to 
messages when offsets are assigned on the leader
    * @throws KafkaStorageException If the append fails due to an I/O error.
+   * @throws OffsetsOutOfOrderException If out of order offsets found in 
'records'
+   * @throws UnexpectedAppendOffsetException If the first or last offset in 
append is less than next offset
    * @return Information about the appended messages including the first and 
last offset.
    */
   private def append(records: MemoryRecords, isFromClient: Boolean, 
assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
@@ -695,8 +697,24 @@ class Log(@volatile var dir: File,
           }
         } else {
           // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < 
nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException("Out of order offsets found in 
" + records.records.asScala.map(_.offset))
+          if (!appendInfo.offsetsMonotonic)
+            throw new OffsetsOutOfOrderException(s"Out of order offsets found 
in append to $topicPartition: " +
+                                                 
records.records.asScala.map(_.offset))
+
+          if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) {
+            // we may still be able to recover if the log is empty
+            // one example: fetching from log start offset on the leader which 
is not batch aligned,
+            // which may happen as a result of AdminClient#deleteRecords()
+            // appendInfo.firstOffset maybe either first offset or last offset 
of the first batch.
+            // get the actual first offset, which may require decompressing 
the data
+            val firstOffset = records.batches.asScala.head.baseOffset()
+            throw new UnexpectedAppendOffsetException(
+              s"Unexpected offset in append to $topicPartition. First offset 
or last offset of the first batch " +
+              s"${appendInfo.firstOffset} is less than the next offset 
${nextOffsetMetadata.messageOffset}. " +
+              s"First 10 offsets in append: 
${records.records.asScala.take(10).map(_.offset)}, last offset in" +
+              s" append: ${appendInfo.lastOffset}. Log start offset = 
$logStartOffset",
+              firstOffset, appendInfo.lastOffset)
+          }
         }
 
         // update the epoch cache with the epoch stamped onto the message by 
the leader
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 48c83d43835..74ef3e848ed 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -103,7 +103,7 @@ class ReplicaAlterLogDirsThread(name: String,
         topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
 
     // Append the leader's messages to the log
-    partition.appendRecordsToFutureReplica(records)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
     futureReplica.highWatermark = new 
LogOffsetMetadata(partitionData.highWatermark)
     futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
 
@@ -263,4 +263,4 @@ object ReplicaAlterLogDirsThread {
 
     override def toString = underlying.toString
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index fa45e7ff9e7..8f4a7569a0e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -109,7 +109,7 @@ class ReplicaFetcherThread(name: String,
         .format(replica.logEndOffset.messageOffset, topicPartition, 
records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
-    partition.appendRecordsToFollower(records)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
 
     if (isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes 
of messages for partition %s"
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 867e03db852..5f7ae57f58b 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -799,6 +799,83 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     client.close()
   }
 
+  @Test
+  def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
+    val leaders = createTopic(topic, numPartitions = 1, replicationFactor = 
serverCount)
+    val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
+
+    def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: 
Long): Unit = {
+      TestUtils.waitUntilTrue(() => 
servers(followerIndex).replicaManager.getReplica(topicPartition) != None,
+                              "Expected follower to create replica for 
partition")
+
+      // wait until the follower discovers that log start offset moved beyond 
its HW
+      TestUtils.waitUntilTrue(() => {
+        
servers(followerIndex).replicaManager.getReplica(topicPartition).get.logStartOffset
 == expectedStartOffset
+      }, s"Expected follower to discover new log start offset 
$expectedStartOffset")
+
+      TestUtils.waitUntilTrue(() => {
+        
servers(followerIndex).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset
 == expectedEndOffset
+      }, s"Expected follower to catch up to log end offset $expectedEndOffset")
+    }
+
+    // we will produce to topic and delete records while one follower is down
+    killBroker(followerIndex)
+
+    client = AdminClient.create(createConfig)
+    sendRecords(producers.head, 100, topicPartition)
+
+    val result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(3L)).asJava)
+    result.all().get()
+
+    // start the stopped broker to verify that it will be able to fetch from 
new log start offset
+    restartDeadBrokers()
+
+    waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L)
+
+    // after the new replica caught up, all replicas should have same log 
start offset
+    for (i <- 0 until serverCount)
+      assertEquals(3, 
servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+
+    // kill the same follower again, produce more records, and delete records 
beyond follower's LOE
+    killBroker(followerIndex)
+    sendRecords(producers.head, 100, topicPartition)
+    val result1 = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(117L)).asJava)
+    result1.all().get()
+    restartDeadBrokers()
+    waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
+  }
+
+  @Test
+  def testAlterLogDirsAfterDeleteRecords(): Unit = {
+    client = AdminClient.create(createConfig)
+    createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+    val expectedLEO = 100
+    sendRecords(producers.head, expectedLEO, topicPartition)
+
+    // delete records to move log start offset
+    val result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(3L)).asJava)
+    result.all().get()
+    // make sure we are in the expected state after delete records
+    for (i <- 0 until serverCount) {
+      assertEquals(3, 
servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+      assertEquals(expectedLEO, 
servers(i).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+    }
+
+    // we will create another dir just for one server
+    val futureLogDir = servers(0).config.logDirs(1)
+    val futureReplica = new TopicPartitionReplica(topic, 0, 
servers(0).config.brokerId)
+
+    // Verify that replica can be moved to the specified log directory
+    client.alterReplicaLogDirs(Map(futureReplica -> 
futureLogDir).asJava).all.get
+    TestUtils.waitUntilTrue(() => {
+      futureLogDir == 
servers(0).logManager.getLog(topicPartition).get.dir.getParent
+    }, "timed out waiting for replica movement")
+
+    // once replica moved, its LSO and LEO should match other replicas
+    assertEquals(3, 
servers(0).replicaManager.getReplica(topicPartition).get.logStartOffset)
+    assertEquals(expectedLEO, 
servers(0).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+  }
+
   @Test
   def testOffsetsForTimesAfterDeleteRecords(): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
new file mode 100644
index 00000000000..fe5d578533b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.UnexpectedAppendOffsetException
+import kafka.log.{Log, LogConfig, LogManager, CleanerConfig}
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils, MockScheduler}
+import kafka.utils.timer.MockTimer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record._
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.Assertions.assertThrows
+import scala.collection.JavaConverters._
+
+class PartitionTest {
+
+  val brokerId = 101
+  val topicPartition = new TopicPartition("test-topic", 0)
+  val time = new MockTime()
+  val brokerTopicStats = new BrokerTopicStats
+  val metrics = new Metrics
+
+  var tmpDir: File = _
+  var logDir: File = _
+  var replicaManager: ReplicaManager = _
+  var logManager: LogManager = _
+  var logConfig: LogConfig = _
+
+  @Before
+  def setup(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+    logConfig = LogConfig(logProps)
+
+    tmpDir = TestUtils.tempDir()
+    logDir = TestUtils.randomPartitionLogDir(tmpDir)
+    logManager = TestUtils.createLogManager(
+      logDirs = Seq(logDir), defaultConfig = logConfig, 
CleanerConfig(enableCleaner = false), time)
+    logManager.startup()
+
+    val brokerProps = TestUtils.createBrokerConfig(brokerId, 
TestUtils.MockZkConnect)
+    brokerProps.put("log.dir", logDir.getAbsolutePath)
+    val brokerConfig = KafkaConfig.fromProps(brokerProps)
+    replicaManager = new ReplicaManager(
+      config = brokerConfig, metrics, time, zkClient = null, new 
MockScheduler(time),
+      logManager, new AtomicBoolean(false), 
QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
+      brokerTopicStats, new MetadataCache(brokerId), new 
LogDirFailureChannel(brokerConfig.logDirs.size))
+  }
+
+  @After
+  def tearDown(): Unit = {
+    brokerTopicStats.close()
+    metrics.close()
+
+    logManager.shutdown()
+    Utils.delete(tmpDir)
+    logManager.liveLogDirs.foreach(Utils.delete)
+    replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
+  def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new Partition(topicPartition.topic, 
topicPartition.partition, time, replicaManager)
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(Some(replica), partition.getReplica(replica.brokerId))
+
+    val initialLogStartOffset = 5L
+    partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
+    assertEquals(s"Log end offset after truncate fully and start at 
$initialLogStartOffset:",
+                 initialLogStartOffset, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after truncate fully and start at 
$initialLogStartOffset:",
+                 initialLogStartOffset, replica.logStartOffset)
+
+    // verify that we cannot append records that do not contain log start 
offset even if the log is empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      // append one record with offset = 3
+      partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new 
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false)
+    }
+    assertEquals(s"Log end offset should not change after failure to append", 
initialLogStartOffset, replica.logEndOffset.messageOffset)
+
+    // verify that we can append records that contain log start offset, even 
when first
+    // offset < log start offset if the log is empty
+    val newLogStartOffset = 4L
+    val records = createRecords(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
+                                     new SimpleRecord("k2".getBytes, 
"v2".getBytes),
+                                     new SimpleRecord("k3".getBytes, 
"v3".getBytes)),
+                                baseOffset = newLogStartOffset)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
+    assertEquals(s"Log end offset after append of 3 records with base offset 
$newLogStartOffset:", 7L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after append of 3 records with base offset 
$newLogStartOffset:", newLogStartOffset, replica.logStartOffset)
+
+    // and we can append more records after that
+    partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new 
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false)
+    assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, 
replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", 
newLogStartOffset, replica.logStartOffset)
+
+    // but we cannot append to offset < log start if the log is not empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      val records2 = createRecords(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
+                                        new SimpleRecord("k2".getBytes, 
"v2".getBytes)),
+                                   baseOffset = 3L)
+      partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = 
false)
+    }
+    assertEquals(s"Log end offset should not change after failure to append", 
8L, replica.logEndOffset.messageOffset)
+
+    // we still can append to next offset
+    partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new 
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false)
+    assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, 
replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", 
newLogStartOffset, replica.logStartOffset)
+  }
+
+  @Test
+  def testGetReplica(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new
+        Partition(topicPartition.topic, topicPartition.partition, time, 
replicaManager)
+
+    assertEquals(None, partition.getReplica(brokerId))
+    assertThrows[ReplicaNotAvailableException] {
+      partition.getReplicaOrException(brokerId)
+    }
+
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(replica, partition.getReplicaOrException(brokerId))
+  }
+
+  @Test
+  def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
+    val partition = new Partition(topicPartition.topic, 
topicPartition.partition, time, replicaManager)
+    assertThrows[ReplicaNotAvailableException] {
+      partition.appendRecordsToFollowerOrFutureReplica(
+           createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), 
baseOffset = 0L), isFuture = false)
+    }
+  }
+
+  def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, 
partitionLeaderEpoch: Int = 0): MemoryRecords = {
+    val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+    val builder = MemoryRecords.builder(
+      buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, 
TimestampType.LOG_APPEND_TIME,
+      baseOffset, time.milliseconds, partitionLeaderEpoch)
+    records.foreach(builder.append)
+    builder.build()
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 352be918b8d..cf679f63513 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -23,7 +23,7 @@ import java.nio.file.Files
 import java.util.Properties
 
 import org.apache.kafka.common.errors._
-import kafka.common.KafkaException
+import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException, KafkaException}
 import kafka.log.Log.DeleteDirSuffix
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -41,6 +41,7 @@ import org.easymock.EasyMock
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import org.scalatest.Assertions.{assertThrows, intercept, withClue}
 
 class LogTest {
 
@@ -1885,13 +1886,72 @@ class LogTest {
     assertTrue("Message payload should be null.", !head.hasValue)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test
   def testAppendWithOutOfOrderOffsetsThrowsException() {
-    val log = createLog(logDir, LogConfig())
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = 
brokerTopicStats)
+
+    val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
+    val buffer = ByteBuffer.allocate(512)
+    for (offset <- appendOffsets) {
+      val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE,
+                                          TimestampType.LOG_APPEND_TIME, 
offset, mockTime.milliseconds(),
+                                          1L, 0, 0, false, 0)
+      builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+      builder.close()
+    }
+    buffer.flip()
+    val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+    assertThrows[OffsetsOutOfOrderException] {
+      log.appendAsFollower(memoryRecords)
+    }
+  }
+
+  @Test
+  def testAppendBelowExpectedOffsetThrowsException() {
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = 
brokerTopicStats)
     val records = (0 until 2).map(id => new 
SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => 
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), 
leaderEpoch = 0))
-    val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(1.toString.getBytes))
-    log.appendAsFollower(invalidRecord)
+
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val invalidRecord = MemoryRecords.withRecords(magic, compression, new 
SimpleRecord(1.toString.getBytes))
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        assertThrows[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(invalidRecord)
+        }
+      }
+    }
+  }
+
+  @Test
+  def testAppendEmptyLogBelowLogStartOffsetThrowsException() {
+    createEmptyLogs(logDir, 7)
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = 
brokerTopicStats)
+    assertEquals(7L, log.logStartOffset)
+    assertEquals(7L, log.logEndOffset)
+
+    val firstOffset = 4L
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
+                                         new SimpleRecord("k2".getBytes, 
"v2".getBytes),
+                                         new SimpleRecord("k3".getBytes, 
"v3".getBytes)),
+                                    magicValue = magic, codec = compression,
+                                    baseOffset = firstOffset)
+
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        val exception = intercept[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(records = batch)
+        }
+        assertEquals(s"Magic=$magic, compressionType=$compression, 
UnexpectedAppendOffsetException#firstOffset",
+                     firstOffset, exception.firstOffset)
+        assertEquals(s"Magic=$magic, compressionType=$compression, 
UnexpectedAppendOffsetException#lastOffset",
+                     firstOffset + 2, exception.lastOffset)
+      }
+    }
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-6975
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6975
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Anna Povzner
>            Assignee: Anna Povzner
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to 
> the requested offset. If the requested offset is in the middle of the batch, 
> the replica will not be able to fetch from that offset (because it is in the 
> middle of the batch). 
> One use-case where this could cause problems is replica re-assignment. 
> Suppose we have a topic partition with 3 initial replicas, and at some point 
> the user issues  AdminClient.deleteRecords() for the offset that falls in the 
> middle of the batch. It now becomes log start offset for this topic 
> partition. Suppose at some later time, the user starts partition 
> re-assignment to 3 new replicas. The new replicas (followers) will start with 
> HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < 
> log start offset (LSO); the follower will be able to reset offset to LSO of 
> the leader and fetch LSO; the leader will send a batch in response with base 
> offset <LSO, this will cause "out of order offset" on the follower which will 
> stop the fetcher thread. The end result is that the new replicas will not be 
> able to start fetching unless LSO moves to an offset that is not in the 
> middle of the batch, and the re-assignment will be stuck for a possibly a 
> very log time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to