This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new ce4f4e1 KAFKA-6975; Fix replica fetching from non-batch-aligned log
start offset (#5235)
ce4f4e1 is described below
commit ce4f4e1ab9ab800f7ea962570cc29e5b00e12f23
Author: Anna Povzner <[email protected]>
AuthorDate: Fri Jun 15 13:26:32 2018 -0700
KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset
(#5235)
It is possible that log start offset may fall in the middle of the batch
after AdminClient#deleteRecords(). This will cause a follower starting from log
start offset to fail fetching (all records). Use-cases when a follower will
start fetching from log start offset includes: 1) new replica due to partition
re-assignment; 2) new local replica created as a result of
AdminClient#AlterReplicaLogDirs(); 3) broker that was down for some time while
AdminClient#deleteRecords() move log start [...]
Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>,
Jason Gustafson <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 32 +++-
.../kafka/common/OffsetsOutOfOrderException.scala | 25 +++
.../common/UnexpectedAppendOffsetException.scala | 29 ++++
core/src/main/scala/kafka/log/Log.scala | 24 ++-
.../scala/kafka/server/ReplicaFetcherThread.scala | 3 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 173 +++++++++++++++++++++
core/src/test/scala/unit/kafka/log/LogTest.scala | 70 ++++++++-
7 files changed, 346 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index e038b58..55edd10 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import com.yammer.metrics.core.Gauge
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
+import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.KafkaController
import kafka.log.{LogAppendInfo, LogConfig}
import kafka.metrics.KafkaMetricsGroup
@@ -29,7 +30,7 @@ import kafka.server._
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException,
NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException,
NotEnoughReplicasException, NotLeaderForPartitionException,
PolicyViolationException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.record.MemoryRecords
@@ -155,6 +156,10 @@ class Partition(val topic: String,
def getReplica(replicaId: Int = localBrokerId): Option[Replica] =
Option(assignedReplicaMap.get(replicaId))
+ def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+ getReplica(replicaId).getOrElse(
+ throw new ReplicaNotAvailableException(s"Replica $replicaId is not
available for partition $topicPartition"))
+
def leaderReplicaIfLocal: Option[Replica] =
leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
@@ -486,6 +491,31 @@ class Partition(val topic: String,
laggingReplicas
}
+ def appendRecordsToFollower(records: MemoryRecords) {
+ try {
+ getReplicaOrException().log.get.appendAsFollower(records)
+ } catch {
+ case e: UnexpectedAppendOffsetException =>
+ val replica = getReplicaOrException()
+ val logEndOffset = replica.logEndOffset.messageOffset
+ if (logEndOffset == replica.logStartOffset &&
+ e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+ // This may happen if the log start offset on the leader (or current
replica) falls in
+ // the middle of the batch due to delete records request and the
follower tries to
+ // fetch its first offset from the leader.
+ // We handle this case here instead of Log#append() because we will
need to remove the
+ // segment that start with log start offset and create a new one
with earlier offset
+ // (base offset of the batch), which will move recoveryPoint
backwards, so we will need
+ // to checkpoint the new recovery point before we append
+ info(s"Unexpected offset in append to $topicPartition. First offset
${e.firstOffset} is less than log start offset ${replica.logStartOffset}." +
+ s" Since this is the first record to be appended to the
follower's log, will start the log from offset ${e.firstOffset}.")
+ logManager.truncateFullyAndStartAt(topicPartition, e.firstOffset)
+ replica.log.get.appendAsFollower(records)
+ } else
+ throw e
+ }
+ }
+
def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean,
requiredAcks: Int = 0): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
new file mode 100644
index 0000000..f8daaa4
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower received records with non-monotonically increasing
offsets
+ */
+class OffsetsOutOfOrderException(message: String) extends
RuntimeException(message) {
+}
+
diff --git
a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
new file mode 100644
index 0000000..e719a93
--- /dev/null
+++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower or the future replica received records from the
leader (or current
+ * replica) with first offset less than expected next offset.
+ * @param firstOffset The first offset of the records to append
+ * @param lastOffset The last offset of the records to append
+ */
+class UnexpectedAppendOffsetException(val message: String,
+ val firstOffset: Long,
+ val lastOffset: Long) extends
RuntimeException(message) {
+}
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index caa7bf5..9157ee1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic._
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap,
TimeUnit}
import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common.{InvalidOffsetException, KafkaException, LongRef}
+import kafka.common.{InvalidOffsetException, KafkaException, LongRef,
UnexpectedAppendOffsetException, OffsetsOutOfOrderException}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel,
LogOffsetMetadata}
import kafka.utils._
@@ -618,6 +618,8 @@ class Log(@volatile var dir: File,
* @param assignOffsets Should the log assign offsets to this message set or
blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to
messages when offsets are assigned on the leader
* @throws KafkaStorageException If the append fails due to an I/O error.
+ * @throws OffsetsOutOfOrderException If out of order offsets found in
'records'
+ * @throws UnexpectedAppendOffsetException If the first or last offset in
append is less than next offset
* @return Information about the appended messages including the first and
last offset.
*/
private def append(records: MemoryRecords, isFromClient: Boolean,
assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
@@ -679,8 +681,24 @@ class Log(@volatile var dir: File,
}
} else {
// we are taking the offsets we are given
- if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset <
nextOffsetMetadata.messageOffset)
- throw new IllegalArgumentException("Out of order offsets found in
" + records.records.asScala.map(_.offset))
+ if (!appendInfo.offsetsMonotonic)
+ throw new OffsetsOutOfOrderException(s"Out of order offsets found
in append to $topicPartition: " +
+
records.records.asScala.map(_.offset))
+
+ if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) {
+ // we may still be able to recover if the log is empty
+ // one example: fetching from log start offset on the leader which
is not batch aligned,
+ // which may happen as a result of AdminClient#deleteRecords()
+ // appendInfo.firstOffset maybe either first offset or last offset
of the first batch.
+ // get the actual first offset, which may require decompressing
the data
+ val firstOffset = records.batches.asScala.head.baseOffset()
+ throw new UnexpectedAppendOffsetException(
+ s"Unexpected offset in append to $topicPartition. First offset
or last offset of the first batch " +
+ s"${appendInfo.firstOffset} is less than the next offset
${nextOffsetMetadata.messageOffset}. " +
+ s"First 10 offsets in append:
${records.records.asScala.take(10).map(_.offset)}, last offset in" +
+ s" append: ${appendInfo.lastOffset}. Log start offset =
$logStartOffset",
+ firstOffset, appendInfo.lastOffset)
+ }
}
// update the epoch cache with the epoch stamped onto the message by
the leader
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 2fb04486..af5763a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -87,6 +87,7 @@ class ReplicaFetcherThread(name: String,
// process fetched data
def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long,
partitionData: PartitionData) {
val replica = replicaMgr.getReplicaOrException(topicPartition)
+ val partition = replicaMgr.getPartition(topicPartition).get
val records = partitionData.toRecords
maybeWarnIfOversizedRecords(records, topicPartition)
@@ -99,7 +100,7 @@ class ReplicaFetcherThread(name: String,
.format(replica.logEndOffset.messageOffset, topicPartition,
records.sizeInBytes, partitionData.highWatermark))
// Append the leader's messages to the log
- replica.log.get.appendAsFollower(records)
+ partition.appendRecordsToFollower(records)
if (logger.isTraceEnabled)
trace("Follower has replica log end offset %d after appending %d bytes
of messages for partition %s"
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
new file mode 100644
index 0000000..2798b5a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.UnexpectedAppendOffsetException
+import kafka.log.{LogConfig, LogManager, CleanerConfig}
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils, MockScheduler}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record._
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.Assertions.assertThrows
+import scala.collection.JavaConverters._
+
+class PartitionTest {
+
+ val brokerId = 101
+ val topicPartition = new TopicPartition("test-topic", 0)
+ val time = new MockTime()
+ val brokerTopicStats = new BrokerTopicStats
+ val metrics = new Metrics
+
+ var tmpDir: File = _
+ var logDir: File = _
+ var replicaManager: ReplicaManager = _
+ var logManager: LogManager = _
+ var logConfig: LogConfig = _
+
+ @Before
+ def setup(): Unit = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+ logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+ logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+ logConfig = LogConfig(logProps)
+
+ tmpDir = TestUtils.tempDir()
+ logDir = TestUtils.randomPartitionLogDir(tmpDir)
+ logManager = TestUtils.createLogManager(
+ logDirs = Seq(logDir), defaultConfig = logConfig,
CleanerConfig(enableCleaner = false), time)
+ logManager.startup()
+
+ val brokerProps = TestUtils.createBrokerConfig(brokerId,
TestUtils.MockZkConnect)
+ brokerProps.put("log.dir", logDir.getAbsolutePath)
+ val brokerConfig = KafkaConfig.fromProps(brokerProps)
+ replicaManager = new ReplicaManager(
+ config = brokerConfig, metrics, time, zkUtils = null, new
MockScheduler(time),
+ logManager, new AtomicBoolean(false),
QuotaFactory.instantiate(brokerConfig, metrics, time, "").follower,
+ brokerTopicStats, new MetadataCache(brokerId), new
LogDirFailureChannel(brokerConfig.logDirs.size))
+ }
+
+ @After
+ def tearDown(): Unit = {
+ brokerTopicStats.close()
+ metrics.close()
+
+ logManager.shutdown()
+ Utils.delete(tmpDir)
+ logManager.liveLogDirs.foreach(Utils.delete)
+ replicaManager.shutdown(checkpointHW = false)
+ }
+
+ @Test
+ def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
+ val log = logManager.getOrCreateLog(topicPartition, logConfig)
+ val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+ val partition = new Partition(topicPartition.topic,
topicPartition.partition, time, replicaManager)
+ partition.addReplicaIfNotExists(replica)
+ assertEquals(Some(replica), partition.getReplica(replica.brokerId))
+
+ val initialLogStartOffset = 5L
+ logManager.truncateFullyAndStartAt(topicPartition, initialLogStartOffset)
+ assertEquals(s"Log end offset after truncate fully and start at
$initialLogStartOffset:",
+ initialLogStartOffset, replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset after truncate fully and start at
$initialLogStartOffset:",
+ initialLogStartOffset, replica.logStartOffset)
+
+ // verify that we cannot append records that do not contain log start
offset even if the log is empty
+ assertThrows[UnexpectedAppendOffsetException] {
+ // append one record with offset = 3
+ partition.appendRecordsToFollower(createRecords(List(new
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L))
+ }
+ assertEquals(s"Log end offset should not change after failure to append",
initialLogStartOffset, replica.logEndOffset.messageOffset)
+
+ // verify that we can append records that contain log start offset, even
when first
+ // offset < log start offset if the log is empty
+ val newLogStartOffset = 4L
+ val records = createRecords(List(new SimpleRecord("k1".getBytes,
"v1".getBytes),
+ new SimpleRecord("k2".getBytes,
"v2".getBytes),
+ new SimpleRecord("k3".getBytes,
"v3".getBytes)),
+ baseOffset = newLogStartOffset)
+ partition.appendRecordsToFollower(records)
+ assertEquals(s"Log end offset after append of 3 records with base offset
$newLogStartOffset:", 7L, replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset after append of 3 records with base offset
$newLogStartOffset:", newLogStartOffset, replica.logStartOffset)
+
+ // and we can append more records after that
+ partition.appendRecordsToFollower(createRecords(List(new
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L))
+ assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L,
replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset not expected to change:",
newLogStartOffset, replica.logStartOffset)
+
+ // but we cannot append to offset < log start if the log is not empty
+ assertThrows[UnexpectedAppendOffsetException] {
+ val records2 = createRecords(List(new SimpleRecord("k1".getBytes,
"v1".getBytes),
+ new SimpleRecord("k2".getBytes,
"v2".getBytes)),
+ baseOffset = 3L)
+ partition.appendRecordsToFollower(records2)
+ }
+ assertEquals(s"Log end offset should not change after failure to append",
8L, replica.logEndOffset.messageOffset)
+
+ // we still can append to next offset
+ partition.appendRecordsToFollower(createRecords(List(new
SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L))
+ assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L,
replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset not expected to change:",
newLogStartOffset, replica.logStartOffset)
+ }
+
+ @Test
+ def testGetReplica(): Unit = {
+ val log = logManager.getOrCreateLog(topicPartition, logConfig)
+ val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+ val partition = new
+ Partition(topicPartition.topic, topicPartition.partition, time,
replicaManager)
+
+ assertEquals(None, partition.getReplica(brokerId))
+ assertThrows[ReplicaNotAvailableException] {
+ partition.getReplicaOrException(brokerId)
+ }
+
+ partition.addReplicaIfNotExists(replica)
+ assertEquals(replica, partition.getReplicaOrException(brokerId))
+ }
+
+ @Test
+ def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
+ val partition = new Partition(topicPartition.topic,
topicPartition.partition, time, replicaManager)
+ assertThrows[ReplicaNotAvailableException] {
+ partition.appendRecordsToFollower(
+ createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)),
baseOffset = 0L))
+ }
+ }
+
+ def createRecords(records: Iterable[SimpleRecord], baseOffset: Long,
partitionLeaderEpoch: Int = 0): MemoryRecords = {
+ val buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+ val builder = MemoryRecords.builder(
+ buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.LOG_APPEND_TIME,
+ baseOffset, time.milliseconds, partitionLeaderEpoch)
+ records.foreach(builder.append)
+ builder.build()
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3c8b01e..41f6ab7 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import java.util.Properties
import org.apache.kafka.common.errors._
-import kafka.common.KafkaException
+import kafka.common.{OffsetsOutOfOrderException,
UnexpectedAppendOffsetException, KafkaException}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import kafka.utils._
@@ -39,6 +39,7 @@ import org.easymock.EasyMock
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import org.scalatest.Assertions.{assertThrows, intercept, withClue}
class LogTest {
@@ -1862,13 +1863,72 @@ class LogTest {
assertTrue("Message payload should be null.", !head.hasValue)
}
- @Test(expected = classOf[IllegalArgumentException])
+ @Test
def testAppendWithOutOfOrderOffsetsThrowsException() {
- val log = createLog(logDir, LogConfig())
+ val log = createLog(logDir, LogConfig(), brokerTopicStats =
brokerTopicStats)
+
+ val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
+ val buffer = ByteBuffer.allocate(512)
+ for (offset <- appendOffsets) {
+ val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
CompressionType.NONE,
+ TimestampType.LOG_APPEND_TIME,
offset, mockTime.milliseconds(),
+ 1L, 0, 0, false, 0)
+ builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+ builder.close()
+ }
+ buffer.flip()
+ val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+ assertThrows[OffsetsOutOfOrderException] {
+ log.appendAsFollower(memoryRecords)
+ }
+ }
+
+ @Test
+ def testAppendBelowExpectedOffsetThrowsException() {
+ val log = createLog(logDir, LogConfig(), brokerTopicStats =
brokerTopicStats)
val records = (0 until 2).map(id => new
SimpleRecord(id.toString.getBytes)).toArray
records.foreach(record =>
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record),
leaderEpoch = 0))
- val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord(1.toString.getBytes))
- log.appendAsFollower(invalidRecord)
+
+ val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+ val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+ for (magic <- magicVals; compression <- compressionTypes) {
+ val invalidRecord = MemoryRecords.withRecords(magic, compression, new
SimpleRecord(1.toString.getBytes))
+ withClue(s"Magic=$magic, compressionType=$compression") {
+ assertThrows[UnexpectedAppendOffsetException] {
+ log.appendAsFollower(invalidRecord)
+ }
+ }
+ }
+ }
+
+ @Test
+ def testAppendEmptyLogBelowLogStartOffsetThrowsException() {
+ createEmptyLogs(logDir, 7)
+ val log = createLog(logDir, LogConfig(), brokerTopicStats =
brokerTopicStats)
+ assertEquals(7L, log.logStartOffset)
+ assertEquals(7L, log.logEndOffset)
+
+ val firstOffset = 4L
+ val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+ val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+ for (magic <- magicVals; compression <- compressionTypes) {
+ val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes,
"v1".getBytes),
+ new SimpleRecord("k2".getBytes,
"v2".getBytes),
+ new SimpleRecord("k3".getBytes,
"v3".getBytes)),
+ magicValue = magic, codec = compression,
+ baseOffset = firstOffset)
+
+ withClue(s"Magic=$magic, compressionType=$compression") {
+ val exception = intercept[UnexpectedAppendOffsetException] {
+ log.appendAsFollower(records = batch)
+ }
+ assertEquals(s"Magic=$magic, compressionType=$compression,
UnexpectedAppendOffsetException#firstOffset",
+ firstOffset, exception.firstOffset)
+ assertEquals(s"Magic=$magic, compressionType=$compression,
UnexpectedAppendOffsetException#lastOffset",
+ firstOffset + 2, exception.lastOffset)
+ }
+ }
}
@Test
--
To stop receiving notification emails like this one, please contact
[email protected].