[ https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512609#comment-16512609 ]
ASF GitHub Bot commented on KAFKA-6975: --------------------------------------- hachikuji closed pull request #5133: KAFKA-6975: Fix fetching from non-batch-aligned log start offset URL: https://github.com/apache/kafka/pull/5133 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b9180a45378..55f870e96f7 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge import kafka.api.LeaderAndIsr import kafka.api.Request +import kafka.common.UnexpectedAppendOffsetException import kafka.controller.KafkaController import kafka.log.{LogAppendInfo, LogConfig} import kafka.metrics.KafkaMetricsGroup @@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.AdminZkClient import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords @@ -187,6 +188,10 @@ class Partition(val topic: String, def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId)) + def getReplicaOrException(replicaId: Int = localBrokerId): Replica = + getReplica(replicaId).getOrElse( + throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition")) + def leaderReplicaIfLocal: Option[Replica] = leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica) @@ -545,15 +550,41 @@ class Partition(val topic: String, laggingReplicas } - def appendRecordsToFutureReplica(records: MemoryRecords) { - getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records) + private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { + if (isFuture) + getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records) + else { + // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread + // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. + inReadLock(leaderIsrUpdateLock) { + getReplicaOrException().log.get.appendAsFollower(records) + } + } } - def appendRecordsToFollower(records: MemoryRecords) { - // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread - // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. - inReadLock(leaderIsrUpdateLock) { - getReplica().get.log.get.appendAsFollower(records) + def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) { + try { + doAppendRecordsToFollowerOrFutureReplica(records, isFuture) + } catch { + case e: UnexpectedAppendOffsetException => + val replica = if (isFuture) getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException() + val logEndOffset = replica.logEndOffset.messageOffset + if (logEndOffset == replica.logStartOffset && + e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) { + // This may happen if the log start offset on the leader (or current replica) falls in + // the middle of the batch due to delete records request and the follower tries to + // fetch its first offset from the leader. + // We handle this case here instead of Log#append() because we will need to remove the + // segment that start with log start offset and create a new one with earlier offset + // (base offset of the batch), which will move recoveryPoint backwards, so we will need + // to checkpoint the new recovery point before we append + val replicaName = if (isFuture) "future replica" else "follower" + info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${replica.logStartOffset}." + + s" Since this is the first record to be appended to the $replicaName's log, will start the log from offset ${e.firstOffset}.") + truncateFullyAndStartAt(e.firstOffset, isFuture) + doAppendRecordsToFollowerOrFutureReplica(records, isFuture) + } else + throw e } } diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala new file mode 100644 index 00000000000..f8daaa4a181 --- /dev/null +++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the follower received records with non-monotonically increasing offsets + */ +class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) { +} + diff --git a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala new file mode 100644 index 00000000000..e719a93006d --- /dev/null +++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the follower or the future replica received records from the leader (or current + * replica) with first offset less than expected next offset. + * @param firstOffset The first offset of the records to append + * @param lastOffset The last offset of the records to append + */ +class UnexpectedAppendOffsetException(val message: String, + val firstOffset: Long, + val lastOffset: Long) extends RuntimeException(message) { +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index c7d2a6e3b6e..c92beee0f34 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -29,7 +29,7 @@ import java.util.regex.Pattern import com.yammer.metrics.core.Gauge import kafka.api.KAFKA_0_10_0_IV0 -import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef} +import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef, UnexpectedAppendOffsetException, OffsetsOutOfOrderException} import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} @@ -49,11 +49,11 @@ import scala.collection.{Seq, Set, mutable} object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) } /** @@ -72,6 +72,7 @@ object LogAppendInfo { * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + * @param lastOffsetOfFirstBatch The last offset of the first batch */ case class LogAppendInfo(var firstOffset: Option[Long], var lastOffset: Long, @@ -84,12 +85,15 @@ case class LogAppendInfo(var firstOffset: Option[Long], targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, - offsetsMonotonic: Boolean) { + offsetsMonotonic: Boolean, + lastOffsetOfFirstBatch: Long) { /** - * Get the first offset if it exists, else get the last offset. - * @return The offset of first message if it exists; else offset of the last message. + * Get the first offset if it exists, else get the last offset of the first batch + * For magic versions 2 and newer, this method will return first offset. For magic versions + * older than 2, we use the last offset of the first batch as an approximation of the first + * offset to avoid decompressing the data. */ - def firstOrLastOffset: Long = firstOffset.getOrElse(lastOffset) + def firstOrLastOffsetOfFirstBatch: Long = firstOffset.getOrElse(lastOffsetOfFirstBatch) /** * Get the (maximum) number of messages described by LogAppendInfo @@ -736,6 +740,8 @@ class Log(@volatile var dir: File, * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader * @throws KafkaStorageException If the append fails due to an I/O error. + * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' + * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset * @return Information about the appended messages including the first and last offset. */ private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { @@ -798,9 +804,27 @@ class Log(@volatile var dir: File, } } else { // we are taking the offsets we are given - if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset) - throw new IllegalArgumentException(s"Out of order offsets found in append to $topicPartition: " + - records.records.asScala.map(_.offset)) + if (!appendInfo.offsetsMonotonic) + throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + + records.records.asScala.map(_.offset)) + + if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) { + // we may still be able to recover if the log is empty + // one example: fetching from log start offset on the leader which is not batch aligned, + // which may happen as a result of AdminClient#deleteRecords() + val firstOffset = appendInfo.firstOffset match { + case Some(offset) => offset + case None => records.batches.asScala.head.baseOffset() + } + + val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch" + throw new UnexpectedAppendOffsetException( + s"Unexpected offset in append to $topicPartition. $firstOrLast " + + s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + + s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + + s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", + firstOffset, appendInfo.lastOffset) + } } // update the epoch cache with the epoch stamped onto the message by the leader @@ -830,7 +854,7 @@ class Log(@volatile var dir: File, val segment = maybeRoll(validRecords.sizeInBytes, appendInfo) val logOffsetMetadata = LogOffsetMetadata( - messageOffset = appendInfo.firstOrLastOffset, + messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) @@ -970,6 +994,7 @@ class Log(@volatile var dir: File, var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L var readFirstMessage = false + var lastOffsetOfFirstBatch = -1L for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients @@ -986,6 +1011,7 @@ class Log(@volatile var dir: File, if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) + lastOffsetOfFirstBatch = batch.lastOffset readFirstMessage = true } @@ -1024,7 +1050,7 @@ class Log(@volatile var dir: File, // Apply broker-side compression if any val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) + RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) } private def updateProducers(batch: RecordBatch, diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 5a505c3d377..e46473b69e9 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -98,8 +98,7 @@ class ReplicaAlterLogDirsThread(name: String, throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset)) - // Append the leader's messages to the log - partition.appendRecordsToFutureReplica(records) + partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true) val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark) futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark) futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index cf8d829f850..80940f61470 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String, .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // Append the leader's messages to the log - partition.appendRecordsToFollower(records) + partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) if (isTraceEnabled) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 986fa4a366a..02baf66068b 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -811,6 +811,83 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset) } + @Test + def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = { + val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount) + val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1 + + def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { + TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.getReplica(topicPartition) != None, + "Expected follower to create replica for partition") + + // wait until the follower discovers that log start offset moved beyond its HW + TestUtils.waitUntilTrue(() => { + servers(followerIndex).replicaManager.getReplica(topicPartition).get.logStartOffset == expectedStartOffset + }, s"Expected follower to discover new log start offset $expectedStartOffset") + + TestUtils.waitUntilTrue(() => { + servers(followerIndex).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset == expectedEndOffset + }, s"Expected follower to catch up to log end offset $expectedEndOffset") + } + + // we will produce to topic and delete records while one follower is down + killBroker(followerIndex) + + client = AdminClient.create(createConfig) + sendRecords(producers.head, 100, topicPartition) + + val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) + result.all().get() + + // start the stopped broker to verify that it will be able to fetch from new log start offset + restartDeadBrokers() + + waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L) + + // after the new replica caught up, all replicas should have same log start offset + for (i <- 0 until serverCount) + assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset) + + // kill the same follower again, produce more records, and delete records beyond follower's LOE + killBroker(followerIndex) + sendRecords(producers.head, 100, topicPartition) + val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava) + result1.all().get() + restartDeadBrokers() + waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L) + } + + @Test + def testAlterLogDirsAfterDeleteRecords(): Unit = { + client = AdminClient.create(createConfig) + createTopic(topic, numPartitions = 1, replicationFactor = serverCount) + val expectedLEO = 100 + sendRecords(producers.head, expectedLEO, topicPartition) + + // delete records to move log start offset + val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) + result.all().get() + // make sure we are in the expected state after delete records + for (i <- 0 until serverCount) { + assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset) + assertEquals(expectedLEO, servers(i).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset) + } + + // we will create another dir just for one server + val futureLogDir = servers(0).config.logDirs(1) + val futureReplica = new TopicPartitionReplica(topic, 0, servers(0).config.brokerId) + + // Verify that replica can be moved to the specified log directory + client.alterReplicaLogDirs(Map(futureReplica -> futureLogDir).asJava).all.get + TestUtils.waitUntilTrue(() => { + futureLogDir == servers(0).logManager.getLog(topicPartition).get.dir.getParent + }, "timed out waiting for replica movement") + + // once replica moved, its LSO and LEO should match other replicas + assertEquals(3, servers(0).replicaManager.getReplica(topicPartition).get.logStartOffset) + assertEquals(expectedLEO, servers(0).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset) + } + @Test def testOffsetsForTimesAfterDeleteRecords(): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = serverCount) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala new file mode 100644 index 00000000000..fe5d578533b --- /dev/null +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.cluster + +import java.io.File +import java.nio.ByteBuffer +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.common.UnexpectedAppendOffsetException +import kafka.log.{Log, LogConfig, LogManager, CleanerConfig} +import kafka.server._ +import kafka.utils.{MockTime, TestUtils, MockScheduler} +import kafka.utils.timer.MockTimer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ReplicaNotAvailableException +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.record._ +import org.junit.{After, Before, Test} +import org.junit.Assert._ +import org.scalatest.Assertions.assertThrows +import scala.collection.JavaConverters._ + +class PartitionTest { + + val brokerId = 101 + val topicPartition = new TopicPartition("test-topic", 0) + val time = new MockTime() + val brokerTopicStats = new BrokerTopicStats + val metrics = new Metrics + + var tmpDir: File = _ + var logDir: File = _ + var replicaManager: ReplicaManager = _ + var logManager: LogManager = _ + var logConfig: LogConfig = _ + + @Before + def setup(): Unit = { + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer) + logConfig = LogConfig(logProps) + + tmpDir = TestUtils.tempDir() + logDir = TestUtils.randomPartitionLogDir(tmpDir) + logManager = TestUtils.createLogManager( + logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) + logManager.startup() + + val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) + brokerProps.put("log.dir", logDir.getAbsolutePath) + val brokerConfig = KafkaConfig.fromProps(brokerProps) + replicaManager = new ReplicaManager( + config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time), + logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""), + brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size)) + } + + @After + def tearDown(): Unit = { + brokerTopicStats.close() + metrics.close() + + logManager.shutdown() + Utils.delete(tmpDir) + logManager.liveLogDirs.foreach(Utils.delete) + replicaManager.shutdown(checkpointHW = false) + } + + @Test + def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, logConfig) + val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + partition.addReplicaIfNotExists(replica) + assertEquals(Some(replica), partition.getReplica(replica.brokerId)) + + val initialLogStartOffset = 5L + partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false) + assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:", + initialLogStartOffset, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset after truncate fully and start at $initialLogStartOffset:", + initialLogStartOffset, replica.logStartOffset) + + // verify that we cannot append records that do not contain log start offset even if the log is empty + assertThrows[UnexpectedAppendOffsetException] { + // append one record with offset = 3 + partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false) + } + assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset, replica.logEndOffset.messageOffset) + + // verify that we can append records that contain log start offset, even when first + // offset < log start offset if the log is empty + val newLogStartOffset = 4L + val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes)), + baseOffset = newLogStartOffset) + partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) + assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:", 7L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:", newLogStartOffset, replica.logStartOffset) + + // and we can append more records after that + partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false) + assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset) + + // but we cannot append to offset < log start if the log is not empty + assertThrows[UnexpectedAppendOffsetException] { + val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes)), + baseOffset = 3L) + partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = false) + } + assertEquals(s"Log end offset should not change after failure to append", 8L, replica.logEndOffset.messageOffset) + + // we still can append to next offset + partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false) + assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset) + } + + @Test + def testGetReplica(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, logConfig) + val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) + val partition = new + Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + + assertEquals(None, partition.getReplica(brokerId)) + assertThrows[ReplicaNotAvailableException] { + partition.getReplicaOrException(brokerId) + } + + partition.addReplicaIfNotExists(replica) + assertEquals(replica, partition.getReplicaOrException(brokerId)) + } + + @Test + def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = { + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + assertThrows[ReplicaNotAvailableException] { + partition.appendRecordsToFollowerOrFutureReplica( + createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false) + } + } + + def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val builder = MemoryRecords.builder( + buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, + baseOffset, time.milliseconds, partitionLeaderEpoch) + records.foreach(builder.append) + builder.build() + } + +} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1171e5e00df..6c62e5e7b2e 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -22,7 +22,8 @@ import java.nio.ByteBuffer import java.nio.file.{Files, Paths} import java.util.Properties -import kafka.common.KafkaException +import org.apache.kafka.common.errors._ +import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException} import kafka.log.Log.DeleteDirSuffix import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} @@ -42,6 +43,7 @@ import org.junit.{After, Before, Test} import scala.collection.Iterable import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import org.scalatest.Assertions.{assertThrows, intercept, withClue} class LogTest { var config: KafkaConfig = null @@ -1885,13 +1887,72 @@ class LogTest { assertTrue("Message payload should be null.", !head.hasValue) } - @Test(expected = classOf[IllegalArgumentException]) + @Test def testAppendWithOutOfOrderOffsetsThrowsException() { val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + + val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L) + val buffer = ByteBuffer.allocate(512) + for (offset <- appendOffsets) { + val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, + TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(), + 1L, 0, 0, false, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + } + buffer.flip() + val memoryRecords = MemoryRecords.readableRecords(buffer) + + assertThrows[OffsetsOutOfOrderException] { + log.appendAsFollower(memoryRecords) + } + } + + @Test + def testAppendBelowExpectedOffsetThrowsException() { + val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0)) - val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes)) - log.appendAsFollower(invalidRecord) + + val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) + val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) + for (magic <- magicVals; compression <- compressionTypes) { + val invalidRecord = MemoryRecords.withRecords(magic, compression, new SimpleRecord(1.toString.getBytes)) + withClue(s"Magic=$magic, compressionType=$compression") { + assertThrows[UnexpectedAppendOffsetException] { + log.appendAsFollower(invalidRecord) + } + } + } + } + + @Test + def testAppendEmptyLogBelowLogStartOffsetThrowsException() { + createEmptyLogs(logDir, 7) + val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + assertEquals(7L, log.logStartOffset) + assertEquals(7L, log.logEndOffset) + + val firstOffset = 4L + val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) + val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) + for (magic <- magicVals; compression <- compressionTypes) { + val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes)), + magicValue = magic, codec = compression, + baseOffset = firstOffset) + + withClue(s"Magic=$magic, compressionType=$compression") { + val exception = intercept[UnexpectedAppendOffsetException] { + log.appendAsFollower(records = batch) + } + assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset", + firstOffset, exception.firstOffset) + assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset", + firstOffset + 2, exception.lastOffset) + } + } } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient.deleteRecords() may cause replicas unable to fetch from beginning > ----------------------------------------------------------------------------- > > Key: KAFKA-6975 > URL: https://issues.apache.org/jira/browse/KAFKA-6975 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.0, 1.0.1 > Reporter: Anna Povzner > Assignee: Anna Povzner > Priority: Blocker > Fix For: 2.0.0, 1.0.2, 1.1.1 > > > AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to > the requested offset. If the requested offset is in the middle of the batch, > the replica will not be able to fetch from that offset (because it is in the > middle of the batch). > One use-case where this could cause problems is replica re-assignment. > Suppose we have a topic partition with 3 initial replicas, and at some point > the user issues AdminClient.deleteRecords() for the offset that falls in the > middle of the batch. It now becomes log start offset for this topic > partition. Suppose at some later time, the user starts partition > re-assignment to 3 new replicas. The new replicas (followers) will start with > HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < > log start offset (LSO); the follower will be able to reset offset to LSO of > the leader and fetch LSO; the leader will send a batch in response with base > offset <LSO, this will cause "out of order offset" on the follower which will > stop the fetcher thread. The end result is that the new replicas will not be > able to start fetching unless LSO moves to an offset that is not in the > middle of the batch, and the re-assignment will be stuck for a possibly a > very log time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)