[ https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514315#comment-16514315 ]
ASF GitHub Bot commented on KAFKA-6975: --------------------------------------- hachikuji closed pull request #5235: KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset URL: https://github.com/apache/kafka/pull/5235 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e038b5885b7..55edd10b5a3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr +import kafka.common.UnexpectedAppendOffsetException import kafka.controller.KafkaController import kafka.log.{LogAppendInfo, LogConfig} import kafka.metrics.KafkaMetricsGroup @@ -29,7 +30,7 @@ import kafka.server._ import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords @@ -155,6 +156,10 @@ class Partition(val topic: String, def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(assignedReplicaMap.get(replicaId)) + def getReplicaOrException(replicaId: Int = localBrokerId): Replica = + getReplica(replicaId).getOrElse( + throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition")) + def leaderReplicaIfLocal: Option[Replica] = leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica) @@ -486,6 +491,31 @@ class Partition(val topic: String, laggingReplicas } + def appendRecordsToFollower(records: MemoryRecords) { + try { + getReplicaOrException().log.get.appendAsFollower(records) + } catch { + case e: UnexpectedAppendOffsetException => + val replica = getReplicaOrException() + val logEndOffset = replica.logEndOffset.messageOffset + if (logEndOffset == replica.logStartOffset && + e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) { + // This may happen if the log start offset on the leader (or current replica) falls in + // the middle of the batch due to delete records request and the follower tries to + // fetch its first offset from the leader. + // We handle this case here instead of Log#append() because we will need to remove the + // segment that start with log start offset and create a new one with earlier offset + // (base offset of the batch), which will move recoveryPoint backwards, so we will need + // to checkpoint the new recovery point before we append + info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${replica.logStartOffset}." + + s" Since this is the first record to be appended to the follower's log, will start the log from offset ${e.firstOffset}.") + logManager.truncateFullyAndStartAt(topicPartition, e.firstOffset) + replica.log.get.appendAsFollower(records) + } else + throw e + } + } + def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala new file mode 100644 index 00000000000..f8daaa4a181 --- /dev/null +++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the follower received records with non-monotonically increasing offsets + */ +class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) { +} + diff --git a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala new file mode 100644 index 00000000000..e719a93006d --- /dev/null +++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the follower or the future replica received records from the leader (or current + * replica) with first offset less than expected next offset. + * @param firstOffset The first offset of the records to append + * @param lastOffset The last offset of the records to append + */ +class UnexpectedAppendOffsetException(val message: String, + val firstOffset: Long, + val lastOffset: Long) extends RuntimeException(message) { +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index caa7bf5bfac..9157ee1fe1f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic._ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit} import kafka.api.KAFKA_0_10_0_IV0 -import kafka.common.{InvalidOffsetException, KafkaException, LongRef} +import kafka.common.{InvalidOffsetException, KafkaException, LongRef, UnexpectedAppendOffsetException, OffsetsOutOfOrderException} import kafka.metrics.KafkaMetricsGroup import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ @@ -618,6 +618,8 @@ class Log(@volatile var dir: File, * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader * @throws KafkaStorageException If the append fails due to an I/O error. + * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' + * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset * @return Information about the appended messages including the first and last offset. */ private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { @@ -679,8 +681,24 @@ class Log(@volatile var dir: File, } } else { // we are taking the offsets we are given - if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) - throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset)) + if (!appendInfo.offsetsMonotonic) + throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + + records.records.asScala.map(_.offset)) + + if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) { + // we may still be able to recover if the log is empty + // one example: fetching from log start offset on the leader which is not batch aligned, + // which may happen as a result of AdminClient#deleteRecords() + // appendInfo.firstOffset maybe either first offset or last offset of the first batch. + // get the actual first offset, which may require decompressing the data + val firstOffset = records.batches.asScala.head.baseOffset() + throw new UnexpectedAppendOffsetException( + s"Unexpected offset in append to $topicPartition. First offset or last offset of the first batch " + + s"${appendInfo.firstOffset} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + + s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + + s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", + firstOffset, appendInfo.lastOffset) + } } // update the epoch cache with the epoch stamped onto the message by the leader diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 2fb04486a05..af5763a73d5 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -87,6 +87,7 @@ class ReplicaFetcherThread(name: String, // process fetched data def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) { val replica = replicaMgr.getReplicaOrException(topicPartition) + val partition = replicaMgr.getPartition(topicPartition).get val records = partitionData.toRecords maybeWarnIfOversizedRecords(records, topicPartition) @@ -99,7 +100,7 @@ class ReplicaFetcherThread(name: String, .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // Append the leader's messages to the log - replica.log.get.appendAsFollower(records) + partition.appendRecordsToFollower(records) if (logger.isTraceEnabled) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala new file mode 100644 index 00000000000..2798b5abe3f --- /dev/null +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.cluster + +import java.io.File +import java.nio.ByteBuffer +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.common.UnexpectedAppendOffsetException +import kafka.log.{LogConfig, LogManager, CleanerConfig} +import kafka.server._ +import kafka.utils.{MockTime, TestUtils, MockScheduler} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ReplicaNotAvailableException +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.record._ +import org.junit.{After, Before, Test} +import org.junit.Assert._ +import org.scalatest.Assertions.assertThrows +import scala.collection.JavaConverters._ + +class PartitionTest { + + val brokerId = 101 + val topicPartition = new TopicPartition("test-topic", 0) + val time = new MockTime() + val brokerTopicStats = new BrokerTopicStats + val metrics = new Metrics + + var tmpDir: File = _ + var logDir: File = _ + var replicaManager: ReplicaManager = _ + var logManager: LogManager = _ + var logConfig: LogConfig = _ + + @Before + def setup(): Unit = { + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer) + logConfig = LogConfig(logProps) + + tmpDir = TestUtils.tempDir() + logDir = TestUtils.randomPartitionLogDir(tmpDir) + logManager = TestUtils.createLogManager( + logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) + logManager.startup() + + val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) + brokerProps.put("log.dir", logDir.getAbsolutePath) + val brokerConfig = KafkaConfig.fromProps(brokerProps) + replicaManager = new ReplicaManager( + config = brokerConfig, metrics, time, zkUtils = null, new MockScheduler(time), + logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, "").follower, + brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size)) + } + + @After + def tearDown(): Unit = { + brokerTopicStats.close() + metrics.close() + + logManager.shutdown() + Utils.delete(tmpDir) + logManager.liveLogDirs.foreach(Utils.delete) + replicaManager.shutdown(checkpointHW = false) + } + + @Test + def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, logConfig) + val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + partition.addReplicaIfNotExists(replica) + assertEquals(Some(replica), partition.getReplica(replica.brokerId)) + + val initialLogStartOffset = 5L + logManager.truncateFullyAndStartAt(topicPartition, initialLogStartOffset) + assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:", + initialLogStartOffset, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset after truncate fully and start at $initialLogStartOffset:", + initialLogStartOffset, replica.logStartOffset) + + // verify that we cannot append records that do not contain log start offset even if the log is empty + assertThrows[UnexpectedAppendOffsetException] { + // append one record with offset = 3 + partition.appendRecordsToFollower(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L)) + } + assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset, replica.logEndOffset.messageOffset) + + // verify that we can append records that contain log start offset, even when first + // offset < log start offset if the log is empty + val newLogStartOffset = 4L + val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes)), + baseOffset = newLogStartOffset) + partition.appendRecordsToFollower(records) + assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:", 7L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:", newLogStartOffset, replica.logStartOffset) + + // and we can append more records after that + partition.appendRecordsToFollower(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L)) + assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset) + + // but we cannot append to offset < log start if the log is not empty + assertThrows[UnexpectedAppendOffsetException] { + val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes)), + baseOffset = 3L) + partition.appendRecordsToFollower(records2) + } + assertEquals(s"Log end offset should not change after failure to append", 8L, replica.logEndOffset.messageOffset) + + // we still can append to next offset + partition.appendRecordsToFollower(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L)) + assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset) + } + + @Test + def testGetReplica(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, logConfig) + val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) + val partition = new + Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + + assertEquals(None, partition.getReplica(brokerId)) + assertThrows[ReplicaNotAvailableException] { + partition.getReplicaOrException(brokerId) + } + + partition.addReplicaIfNotExists(replica) + assertEquals(replica, partition.getReplicaOrException(brokerId)) + } + + @Test + def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = { + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + assertThrows[ReplicaNotAvailableException] { + partition.appendRecordsToFollower( + createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L)) + } + } + + def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val builder = MemoryRecords.builder( + buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, + baseOffset, time.milliseconds, partitionLeaderEpoch) + records.foreach(builder.append) + builder.build() + } + +} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 3c8b01e4473..41f6ab79de5 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import java.util.Properties import org.apache.kafka.common.errors._ -import kafka.common.KafkaException +import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException} import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ @@ -39,6 +39,7 @@ import org.easymock.EasyMock import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import org.scalatest.Assertions.{assertThrows, intercept, withClue} class LogTest { @@ -1862,13 +1863,72 @@ class LogTest { assertTrue("Message payload should be null.", !head.hasValue) } - @Test(expected = classOf[IllegalArgumentException]) + @Test def testAppendWithOutOfOrderOffsetsThrowsException() { - val log = createLog(logDir, LogConfig()) + val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + + val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L) + val buffer = ByteBuffer.allocate(512) + for (offset <- appendOffsets) { + val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, + TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(), + 1L, 0, 0, false, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + } + buffer.flip() + val memoryRecords = MemoryRecords.readableRecords(buffer) + + assertThrows[OffsetsOutOfOrderException] { + log.appendAsFollower(memoryRecords) + } + } + + @Test + def testAppendBelowExpectedOffsetThrowsException() { + val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0)) - val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes)) - log.appendAsFollower(invalidRecord) + + val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) + val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) + for (magic <- magicVals; compression <- compressionTypes) { + val invalidRecord = MemoryRecords.withRecords(magic, compression, new SimpleRecord(1.toString.getBytes)) + withClue(s"Magic=$magic, compressionType=$compression") { + assertThrows[UnexpectedAppendOffsetException] { + log.appendAsFollower(invalidRecord) + } + } + } + } + + @Test + def testAppendEmptyLogBelowLogStartOffsetThrowsException() { + createEmptyLogs(logDir, 7) + val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + assertEquals(7L, log.logStartOffset) + assertEquals(7L, log.logEndOffset) + + val firstOffset = 4L + val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) + val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) + for (magic <- magicVals; compression <- compressionTypes) { + val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes)), + magicValue = magic, codec = compression, + baseOffset = firstOffset) + + withClue(s"Magic=$magic, compressionType=$compression") { + val exception = intercept[UnexpectedAppendOffsetException] { + log.appendAsFollower(records = batch) + } + assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset", + firstOffset, exception.firstOffset) + assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset", + firstOffset + 2, exception.lastOffset) + } + } } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient.deleteRecords() may cause replicas unable to fetch from beginning > ----------------------------------------------------------------------------- > > Key: KAFKA-6975 > URL: https://issues.apache.org/jira/browse/KAFKA-6975 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.0, 1.0.1 > Reporter: Anna Povzner > Assignee: Anna Povzner > Priority: Blocker > Fix For: 2.0.0, 1.1.1 > > > AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to > the requested offset. If the requested offset is in the middle of the batch, > the replica will not be able to fetch from that offset (because it is in the > middle of the batch). > One use-case where this could cause problems is replica re-assignment. > Suppose we have a topic partition with 3 initial replicas, and at some point > the user issues AdminClient.deleteRecords() for the offset that falls in the > middle of the batch. It now becomes log start offset for this topic > partition. Suppose at some later time, the user starts partition > re-assignment to 3 new replicas. The new replicas (followers) will start with > HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < > log start offset (LSO); the follower will be able to reset offset to LSO of > the leader and fetch LSO; the leader will send a batch in response with base > offset <LSO, this will cause "out of order offset" on the follower which will > stop the fetcher thread. The end result is that the new replicas will not be > able to start fetching unless LSO moves to an offset that is not in the > middle of the batch, and the re-assignment will be stuck for a possibly a > very log time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)