[ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512609#comment-16512609
 ] 

ASF GitHub Bot commented on KAFKA-6975:
---------------------------------------

hachikuji closed pull request #5133: KAFKA-6975: Fix fetching from 
non-batch-aligned log start offset
URL: https://github.com/apache/kafka/pull/5133
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index b9180a45378..55f870e96f7 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.LeaderAndIsr
 import kafka.api.Request
+import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.metrics.KafkaMetricsGroup
@@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.AdminZkClient
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, 
NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, 
NotEnoughReplicasException, NotLeaderForPartitionException, 
PolicyViolationException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
@@ -187,6 +188,10 @@ class Partition(val topic: String,
 
   def getReplica(replicaId: Int = localBrokerId): Option[Replica] = 
Option(allReplicasMap.get(replicaId))
 
+  def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+    getReplica(replicaId).getOrElse(
+      throw new ReplicaNotAvailableException(s"Replica $replicaId is not 
available for partition $topicPartition"))
+
   def leaderReplicaIfLocal: Option[Replica] =
     leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
 
@@ -545,15 +550,41 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
-  def appendRecordsToFutureReplica(records: MemoryRecords) {
-    
getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records)
+  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
+      if (isFuture)
+        
getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
+      else {
+        // The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
+        // is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
+        inReadLock(leaderIsrUpdateLock) {
+           getReplicaOrException().log.get.appendAsFollower(records)
+        }
+      }
   }
 
-  def appendRecordsToFollower(records: MemoryRecords) {
-    // The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
-    // is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
-    inReadLock(leaderIsrUpdateLock) {
-      getReplica().get.log.get.appendAsFollower(records)
+  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: 
Boolean) {
+    try {
+      doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+    } catch {
+      case e: UnexpectedAppendOffsetException =>
+        val replica = if (isFuture) 
getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException()
+        val logEndOffset = replica.logEndOffset.messageOffset
+        if (logEndOffset == replica.logStartOffset &&
+            e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+          // This may happen if the log start offset on the leader (or current 
replica) falls in
+          // the middle of the batch due to delete records request and the 
follower tries to
+          // fetch its first offset from the leader.
+          // We handle this case here instead of Log#append() because we will 
need to remove the
+          // segment that start with log start offset and create a new one 
with earlier offset
+          // (base offset of the batch), which will move recoveryPoint 
backwards, so we will need
+          // to checkpoint the new recovery point before we append
+          val replicaName = if (isFuture) "future replica" else "follower"
+          info(s"Unexpected offset in append to $topicPartition. First offset 
${e.firstOffset} is less than log start offset ${replica.logStartOffset}." +
+               s" Since this is the first record to be appended to the 
$replicaName's log, will start the log from offset ${e.firstOffset}.")
+          truncateFullyAndStartAt(e.firstOffset, isFuture)
+          doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+        } else
+          throw e
     }
   }
 
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala 
b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
new file mode 100644
index 00000000000..f8daaa4a181
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower received records with non-monotonically increasing 
offsets
+ */
+class OffsetsOutOfOrderException(message: String) extends 
RuntimeException(message) {
+}
+
diff --git 
a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala 
b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
new file mode 100644
index 00000000000..e719a93006d
--- /dev/null
+++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower or the future replica received records from the 
leader (or current
+ * replica) with first offset less than expected next offset. 
+ * @param firstOffset The first offset of the records to append
+ * @param lastOffset  The last offset of the records to append
+ */
+class UnexpectedAppendOffsetException(val message: String,
+                                      val firstOffset: Long,
+                                      val lastOffset: Long) extends 
RuntimeException(message) {
+}
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index c7d2a6e3b6e..c92beee0f34 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -29,7 +29,7 @@ import java.util.regex.Pattern
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common.{InvalidOffsetException, KafkaException, 
LogSegmentOffsetOverflowException, LongRef}
+import kafka.common.{InvalidOffsetException, KafkaException, 
LogSegmentOffsetOverflowException, LongRef, UnexpectedAppendOffsetException, 
OffsetsOutOfOrderException}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
@@ -49,11 +49,11 @@ import scala.collection.{Seq, Set, mutable}
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, 
-1L, RecordBatch.NO_TIMESTAMP, -1L,
-    RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, 
-1, offsetsMonotonic = false)
+    RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, 
-1, offsetsMonotonic = false, -1L)
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): 
LogAppendInfo =
     LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, 
RecordBatch.NO_TIMESTAMP, logStartOffset,
-      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, 
-1, offsetsMonotonic = false)
+      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, 
-1, offsetsMonotonic = false, -1L)
 }
 
 /**
@@ -72,6 +72,7 @@ object LogAppendInfo {
  * @param shallowCount The number of shallow messages
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
+ * @param lastOffsetOfFirstBatch The last offset of the first batch
  */
 case class LogAppendInfo(var firstOffset: Option[Long],
                          var lastOffset: Long,
@@ -84,12 +85,15 @@ case class LogAppendInfo(var firstOffset: Option[Long],
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
                          validBytes: Int,
-                         offsetsMonotonic: Boolean) {
+                         offsetsMonotonic: Boolean,
+                         lastOffsetOfFirstBatch: Long) {
   /**
-   * Get the first offset if it exists, else get the last offset.
-   * @return The offset of first message if it exists; else offset of the last 
message.
+   * Get the first offset if it exists, else get the last offset of the first 
batch
+   * For magic versions 2 and newer, this method will return first offset. For 
magic versions
+   * older than 2, we use the last offset of the first batch as an 
approximation of the first
+   * offset to avoid decompressing the data.
    */
-  def firstOrLastOffset: Long = firstOffset.getOrElse(lastOffset)
+  def firstOrLastOffsetOfFirstBatch: Long = 
firstOffset.getOrElse(lastOffsetOfFirstBatch)
 
   /**
    * Get the (maximum) number of messages described by LogAppendInfo
@@ -736,6 +740,8 @@ class Log(@volatile var dir: File,
    * @param assignOffsets Should the log assign offsets to this message set or 
blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to 
messages when offsets are assigned on the leader
    * @throws KafkaStorageException If the append fails due to an I/O error.
+   * @throws OffsetsOutOfOrderException If out of order offsets found in 
'records'
+   * @throws UnexpectedAppendOffsetException If the first or last offset in 
append is less than next offset
    * @return Information about the appended messages including the first and 
last offset.
    */
   private def append(records: MemoryRecords, isFromClient: Boolean, 
assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
@@ -798,9 +804,27 @@ class Log(@volatile var dir: File,
           }
         } else {
           // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < 
nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException(s"Out of order offsets found in 
append to $topicPartition: " +
-              records.records.asScala.map(_.offset))
+          if (!appendInfo.offsetsMonotonic)
+            throw new OffsetsOutOfOrderException(s"Out of order offsets found 
in append to $topicPartition: " +
+                                                 
records.records.asScala.map(_.offset))
+
+          if (appendInfo.firstOrLastOffsetOfFirstBatch < 
nextOffsetMetadata.messageOffset) {
+            // we may still be able to recover if the log is empty
+            // one example: fetching from log start offset on the leader which 
is not batch aligned,
+            // which may happen as a result of AdminClient#deleteRecords()
+            val firstOffset = appendInfo.firstOffset match {
+              case Some(offset) => offset
+              case None => records.batches.asScala.head.baseOffset()
+            }
+
+            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First 
offset" else "Last offset of the first batch"
+            throw new UnexpectedAppendOffsetException(
+              s"Unexpected offset in append to $topicPartition. $firstOrLast " 
+
+              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the 
next offset ${nextOffsetMetadata.messageOffset}. " +
+              s"First 10 offsets in append: 
${records.records.asScala.take(10).map(_.offset)}, last offset in" +
+              s" append: ${appendInfo.lastOffset}. Log start offset = 
$logStartOffset",
+              firstOffset, appendInfo.lastOffset)
+          }
         }
 
         // update the epoch cache with the epoch stamped onto the message by 
the leader
@@ -830,7 +854,7 @@ class Log(@volatile var dir: File,
         val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
 
         val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOrLastOffset,
+          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
           segmentBaseOffset = segment.baseOffset,
           relativePositionInSegment = segment.size)
 
@@ -970,6 +994,7 @@ class Log(@volatile var dir: File,
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
     var readFirstMessage = false
+    var lastOffsetOfFirstBatch = -1L
 
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility 
issues with older clients
@@ -986,6 +1011,7 @@ class Log(@volatile var dir: File,
       if (!readFirstMessage) {
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
           firstOffset = Some(batch.baseOffset)
+        lastOffsetOfFirstBatch = batch.lastOffset
         readFirstMessage = true
       }
 
@@ -1024,7 +1050,7 @@ class Log(@volatile var dir: File,
     // Apply broker-side compression if any
     val targetCodec = 
BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, 
sourceCodec)
     LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, 
RecordBatch.NO_TIMESTAMP, logStartOffset,
-      RecordConversionStats.EMPTY, sourceCodec, targetCodec, 
shallowMessageCount, validBytesCount, monotonic)
+      RecordConversionStats.EMPTY, sourceCodec, targetCodec, 
shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
   }
 
   private def updateProducers(batch: RecordBatch,
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 5a505c3d377..e46473b69e9 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -98,8 +98,7 @@ class ReplicaAlterLogDirsThread(name: String,
       throw new IllegalStateException("Offset mismatch for the future replica 
%s: fetched offset = %d, log end offset = %d.".format(
         topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
 
-    // Append the leader's messages to the log
-    partition.appendRecordsToFutureReplica(records)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
     val futureReplicaHighWatermark = 
futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
     futureReplica.highWatermark = new 
LogOffsetMetadata(futureReplicaHighWatermark)
     futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index cf8d829f850..80940f61470 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String,
         .format(replica.logEndOffset.messageOffset, topicPartition, 
records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
-    partition.appendRecordsToFollower(records)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
 
     if (isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes 
of messages for partition %s"
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 986fa4a366a..02baf66068b 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -811,6 +811,83 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
       assertEquals(3, 
servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
   }
 
+  @Test
+  def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
+    val leaders = createTopic(topic, numPartitions = 1, replicationFactor = 
serverCount)
+    val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
+
+    def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: 
Long): Unit = {
+      TestUtils.waitUntilTrue(() => 
servers(followerIndex).replicaManager.getReplica(topicPartition) != None,
+                              "Expected follower to create replica for 
partition")
+
+      // wait until the follower discovers that log start offset moved beyond 
its HW
+      TestUtils.waitUntilTrue(() => {
+        
servers(followerIndex).replicaManager.getReplica(topicPartition).get.logStartOffset
 == expectedStartOffset
+      }, s"Expected follower to discover new log start offset 
$expectedStartOffset")
+
+      TestUtils.waitUntilTrue(() => {
+        
servers(followerIndex).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset
 == expectedEndOffset
+      }, s"Expected follower to catch up to log end offset $expectedEndOffset")
+    }
+
+    // we will produce to topic and delete records while one follower is down
+    killBroker(followerIndex)
+
+    client = AdminClient.create(createConfig)
+    sendRecords(producers.head, 100, topicPartition)
+
+    val result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(3L)).asJava)
+    result.all().get()
+
+    // start the stopped broker to verify that it will be able to fetch from 
new log start offset
+    restartDeadBrokers()
+
+    waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L)
+
+    // after the new replica caught up, all replicas should have same log 
start offset
+    for (i <- 0 until serverCount)
+      assertEquals(3, 
servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+
+    // kill the same follower again, produce more records, and delete records 
beyond follower's LOE
+    killBroker(followerIndex)
+    sendRecords(producers.head, 100, topicPartition)
+    val result1 = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(117L)).asJava)
+    result1.all().get()
+    restartDeadBrokers()
+    waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
+  }
+
+  @Test
+  def testAlterLogDirsAfterDeleteRecords(): Unit = {
+    client = AdminClient.create(createConfig)
+    createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+    val expectedLEO = 100
+    sendRecords(producers.head, expectedLEO, topicPartition)
+
+    // delete records to move log start offset
+    val result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(3L)).asJava)
+    result.all().get()
+    // make sure we are in the expected state after delete records
+    for (i <- 0 until serverCount) {
+      assertEquals(3, 
servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+      assertEquals(expectedLEO, 
servers(i).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+    }
+
+    // we will create another dir just for one server
+    val futureLogDir = servers(0).config.logDirs(1)
+    val futureReplica = new TopicPartitionReplica(topic, 0, 
servers(0).config.brokerId)
+
+    // Verify that replica can be moved to the specified log directory
+    client.alterReplicaLogDirs(Map(futureReplica -> 
futureLogDir).asJava).all.get
+    TestUtils.waitUntilTrue(() => {
+      futureLogDir == 
servers(0).logManager.getLog(topicPartition).get.dir.getParent
+    }, "timed out waiting for replica movement")
+
+    // once replica moved, its LSO and LEO should match other replicas
+    assertEquals(3, 
servers(0).replicaManager.getReplica(topicPartition).get.logStartOffset)
+    assertEquals(expectedLEO, 
servers(0).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+  }
+
   @Test
   def testOffsetsForTimesAfterDeleteRecords(): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
new file mode 100644
index 00000000000..fe5d578533b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.UnexpectedAppendOffsetException
+import kafka.log.{Log, LogConfig, LogManager, CleanerConfig}
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils, MockScheduler}
+import kafka.utils.timer.MockTimer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record._
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.Assertions.assertThrows
+import scala.collection.JavaConverters._
+
+class PartitionTest {
+
+  val brokerId = 101
+  val topicPartition = new TopicPartition("test-topic", 0)
+  val time = new MockTime()
+  val brokerTopicStats = new BrokerTopicStats
+  val metrics = new Metrics
+
+  var tmpDir: File = _
+  var logDir: File = _
+  var replicaManager: ReplicaManager = _
+  var logManager: LogManager = _
+  var logConfig: LogConfig = _
+
+  @Before
+  def setup(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+    logConfig = LogConfig(logProps)
+
+    tmpDir = TestUtils.tempDir()
+    logDir = TestUtils.randomPartitionLogDir(tmpDir)
+    logManager = TestUtils.createLogManager(
+      logDirs = Seq(logDir), defaultConfig = logConfig, 
CleanerConfig(enableCleaner = false), time)
+    logManager.startup()
+
+    val brokerProps = TestUtils.createBrokerConfig(brokerId, 
TestUtils.MockZkConnect)
+    brokerProps.put("log.dir", logDir.getAbsolutePath)
+    val brokerConfig = KafkaConfig.fromProps(brokerProps)
+    replicaManager = new ReplicaManager(
+      config = brokerConfig, metrics, time, zkClient = null, new 
MockScheduler(time),
+      logManager, new AtomicBoolean(false), 
QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
+      brokerTopicStats, new MetadataCache(brokerId), new 
LogDirFailureChannel(brokerConfig.logDirs.size))
+  }
+
+  @After
+  def tearDown(): Unit = {
+    brokerTopicStats.close()
+    metrics.close()
+
+    logManager.shutdown()
+    Utils.delete(tmpDir)
+    logManager.liveLogDirs.foreach(Utils.delete)
+    replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
+  def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new Partition(topicPartition.topic, 
topicPartition.partition, time, replicaManager)
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(Some(replica), partition.getReplica(replica.brokerId))
+
+    val initialLogStartOffset = 5L
+    partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
+    assertEquals(s"Log end offset after truncate fully and start at 
$initialLogStartOffset:",
+                 initialLogStartOffset, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after truncate fully and start at 
$initialLogStartOffset:",
+                 initialLogStartOffset, replica.logStartOffset)
+
+    // verify that we cannot append records that do not contain log start 
offset even if the log is empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      // append one record with offset = 3
+      partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new 
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false)
+    }
+    assertEquals(s"Log end offset should not change after failure to append", 
initialLogStartOffset, replica.logEndOffset.messageOffset)
+
+    // verify that we can append records that contain log start offset, even 
when first
+    // offset < log start offset if the log is empty
+    val newLogStartOffset = 4L
+    val records = createRecords(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
+                                     new SimpleRecord("k2".getBytes, 
"v2".getBytes),
+                                     new SimpleRecord("k3".getBytes, 
"v3".getBytes)),
+                                baseOffset = newLogStartOffset)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
+    assertEquals(s"Log end offset after append of 3 records with base offset 
$newLogStartOffset:", 7L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after append of 3 records with base offset 
$newLogStartOffset:", newLogStartOffset, replica.logStartOffset)
+
+    // and we can append more records after that
+    partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new 
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false)
+    assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, 
replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", 
newLogStartOffset, replica.logStartOffset)
+
+    // but we cannot append to offset < log start if the log is not empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      val records2 = createRecords(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
+                                        new SimpleRecord("k2".getBytes, 
"v2".getBytes)),
+                                   baseOffset = 3L)
+      partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = 
false)
+    }
+    assertEquals(s"Log end offset should not change after failure to append", 
8L, replica.logEndOffset.messageOffset)
+
+    // we still can append to next offset
+    partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new 
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false)
+    assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, 
replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", 
newLogStartOffset, replica.logStartOffset)
+  }
+
+  @Test
+  def testGetReplica(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new
+        Partition(topicPartition.topic, topicPartition.partition, time, 
replicaManager)
+
+    assertEquals(None, partition.getReplica(brokerId))
+    assertThrows[ReplicaNotAvailableException] {
+      partition.getReplicaOrException(brokerId)
+    }
+
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(replica, partition.getReplicaOrException(brokerId))
+  }
+
+  @Test
+  def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
+    val partition = new Partition(topicPartition.topic, 
topicPartition.partition, time, replicaManager)
+    assertThrows[ReplicaNotAvailableException] {
+      partition.appendRecordsToFollowerOrFutureReplica(
+           createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), 
baseOffset = 0L), isFuture = false)
+    }
+  }
+
+  def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, 
partitionLeaderEpoch: Int = 0): MemoryRecords = {
+    val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+    val builder = MemoryRecords.builder(
+      buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, 
TimestampType.LOG_APPEND_TIME,
+      baseOffset, time.milliseconds, partitionLeaderEpoch)
+    records.foreach(builder.append)
+    builder.build()
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1171e5e00df..6c62e5e7b2e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,7 +22,8 @@ import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
 import java.util.Properties
 
-import kafka.common.KafkaException
+import org.apache.kafka.common.errors._
+import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException, KafkaException}
 import kafka.log.Log.DeleteDirSuffix
 import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, 
LogDirFailureChannel}
@@ -42,6 +43,7 @@ import org.junit.{After, Before, Test}
 import scala.collection.Iterable
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import org.scalatest.Assertions.{assertThrows, intercept, withClue}
 
 class LogTest {
   var config: KafkaConfig = null
@@ -1885,13 +1887,72 @@ class LogTest {
     assertTrue("Message payload should be null.", !head.hasValue)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test
   def testAppendWithOutOfOrderOffsetsThrowsException() {
     val log = createLog(logDir, LogConfig(), brokerTopicStats = 
brokerTopicStats)
+
+    val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
+    val buffer = ByteBuffer.allocate(512)
+    for (offset <- appendOffsets) {
+      val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE,
+                                          TimestampType.LOG_APPEND_TIME, 
offset, mockTime.milliseconds(),
+                                          1L, 0, 0, false, 0)
+      builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+      builder.close()
+    }
+    buffer.flip()
+    val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+    assertThrows[OffsetsOutOfOrderException] {
+      log.appendAsFollower(memoryRecords)
+    }
+  }
+
+  @Test
+  def testAppendBelowExpectedOffsetThrowsException() {
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = 
brokerTopicStats)
     val records = (0 until 2).map(id => new 
SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => 
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), 
leaderEpoch = 0))
-    val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(1.toString.getBytes))
-    log.appendAsFollower(invalidRecord)
+
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val invalidRecord = MemoryRecords.withRecords(magic, compression, new 
SimpleRecord(1.toString.getBytes))
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        assertThrows[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(invalidRecord)
+        }
+      }
+    }
+  }
+
+  @Test
+  def testAppendEmptyLogBelowLogStartOffsetThrowsException() {
+    createEmptyLogs(logDir, 7)
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = 
brokerTopicStats)
+    assertEquals(7L, log.logStartOffset)
+    assertEquals(7L, log.logEndOffset)
+
+    val firstOffset = 4L
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
+                                         new SimpleRecord("k2".getBytes, 
"v2".getBytes),
+                                         new SimpleRecord("k3".getBytes, 
"v3".getBytes)),
+                                    magicValue = magic, codec = compression,
+                                    baseOffset = firstOffset)
+
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        val exception = intercept[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(records = batch)
+        }
+        assertEquals(s"Magic=$magic, compressionType=$compression, 
UnexpectedAppendOffsetException#firstOffset",
+                     firstOffset, exception.firstOffset)
+        assertEquals(s"Magic=$magic, compressionType=$compression, 
UnexpectedAppendOffsetException#lastOffset",
+                     firstOffset + 2, exception.lastOffset)
+      }
+    }
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-6975
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6975
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Anna Povzner
>            Assignee: Anna Povzner
>            Priority: Blocker
>             Fix For: 2.0.0, 1.0.2, 1.1.1
>
>
> AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to 
> the requested offset. If the requested offset is in the middle of the batch, 
> the replica will not be able to fetch from that offset (because it is in the 
> middle of the batch). 
> One use-case where this could cause problems is replica re-assignment. 
> Suppose we have a topic partition with 3 initial replicas, and at some point 
> the user issues  AdminClient.deleteRecords() for the offset that falls in the 
> middle of the batch. It now becomes log start offset for this topic 
> partition. Suppose at some later time, the user starts partition 
> re-assignment to 3 new replicas. The new replicas (followers) will start with 
> HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < 
> log start offset (LSO); the follower will be able to reset offset to LSO of 
> the leader and fetch LSO; the leader will send a batch in response with base 
> offset <LSO, this will cause "out of order offset" on the follower which will 
> stop the fetcher thread. The end result is that the new replicas will not be 
> able to start fetching unless LSO moves to an offset that is not in the 
> middle of the batch, and the re-assignment will be stuck for a possibly a 
> very log time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to