[ https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710713#comment-16710713 ]
ASF GitHub Bot commented on KAFKA-6388: --------------------------------------- hachikuji closed pull request #5986: KAFKA-6388: Recover from rolling an empty segment that already exists URL: https://github.com/apache/kafka/pull/5986 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 688736c7d66..c448805e0e6 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1548,8 +1548,8 @@ class Log(@volatile var dir: File, in the header. */ appendInfo.firstOffset match { - case Some(firstOffset) => roll(firstOffset) - case None => roll(maxOffsetInMessages - Integer.MAX_VALUE) + case Some(firstOffset) => roll(Some(firstOffset)) + case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE)) } } else { segment @@ -1562,22 +1562,45 @@ class Log(@volatile var dir: File, * * @return The newly rolled segment */ - def roll(expectedNextOffset: Long = 0): LogSegment = { + def roll(expectedNextOffset: Option[Long] = None): LogSegment = { maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { val start = time.hiResClockMs() lock synchronized { checkIfMemoryMappedBufferClosed() - val newOffset = math.max(expectedNextOffset, logEndOffset) + val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) val logFile = Log.logFile(dir, newOffset) - val offsetIdxFile = offsetIndexFile(dir, newOffset) - val timeIdxFile = timeIndexFile(dir, newOffset) - val txnIdxFile = transactionIndexFile(dir, newOffset) - for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { - warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") - Files.delete(file.toPath) - } - Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) + if (segments.containsKey(newOffset)) { + // segment with the same base offset already exists and loaded + if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { + // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an + // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). + warn(s"Trying to roll a new log segment with start offset $newOffset " + + s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + + s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + + s" size of offset index: ${activeSegment.offsetIndex.entries}.") + deleteSegment(activeSegment) + } else { + throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + + s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + + s"segment is ${segments.get(newOffset)}.") + } + } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) { + throw new KafkaException( + s"Trying to roll a new log segment for topic partition $topicPartition with " + + s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") + } else { + val offsetIdxFile = offsetIndexFile(dir, newOffset) + val timeIdxFile = timeIndexFile(dir, newOffset) + val txnIdxFile = transactionIndexFile(dir, newOffset) + + for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { + warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") + Files.delete(file.toPath) + } + + Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) + } // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot // offset align with the new segment offset since this ensures we can recover the segment by beginning @@ -1594,10 +1617,7 @@ class Log(@volatile var dir: File, fileAlreadyExists = false, initFileSize = initFileSize, preallocate = config.preallocate) - val prev = addSegment(segment) - if (prev != null) - throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with " + - s"start offset $newOffset while it already exists.") + addSegment(segment) // We need to update the segment base offset and append position data of the metadata when log rolls. // The next offset should not change. updateLogEndOffset(nextOffsetMetadata.messageOffset) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index ff5af6123e2..51477b627cb 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1118,7 +1118,7 @@ class LogCleanerTest extends JUnitSuite { log.appendAsFollower(record1) val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1) log.appendAsFollower(record2) - log.roll(Int.MaxValue/2) // starting a new log segment at offset Int.MaxValue/2 + log.roll(Some(Int.MaxValue/2)) // starting a new log segment at offset Int.MaxValue/2 val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue/2) log.appendAsFollower(record3) val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue.toLong + 1) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index fe2820cba1c..b4dc807a876 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -142,6 +142,52 @@ class LogTest { assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments) } + @Test + def testRollSegmentThatAlreadyExists() { + val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L) + + // create a log + val log = createLog(logDir, logConfig) + assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) + + // roll active segment with the same base offset of size zero should recreate the segment + log.roll(Some(0L)) + assertEquals("Expect 1 segment after roll() empty segment with base offset.", 1, log.numberOfSegments) + + // should be able to append records to active segment + val records = TestUtils.records( + List(new SimpleRecord(mockTime.milliseconds, "k1".getBytes, "v1".getBytes)), + baseOffset = 0L, partitionLeaderEpoch = 0) + log.appendAsFollower(records) + assertEquals("Expect one segment.", 1, log.numberOfSegments) + assertEquals(0L, log.activeSegment.baseOffset) + + // make sure we can append more records + val records2 = TestUtils.records( + List(new SimpleRecord(mockTime.milliseconds + 10, "k2".getBytes, "v2".getBytes)), + baseOffset = 1L, partitionLeaderEpoch = 0) + log.appendAsFollower(records2) + + assertEquals("Expect two records in the log", 2, log.logEndOffset) + assertEquals(0, readLog(log, 0, 100, Some(1)).records.batches.iterator.next().lastOffset) + assertEquals(1, readLog(log, 1, 100, Some(2)).records.batches.iterator.next().lastOffset) + + // roll so that active segment is empty + log.roll() + assertEquals("Expect base offset of active segment to be LEO", 2L, log.activeSegment.baseOffset) + assertEquals("Expect two segments.", 2, log.numberOfSegments) + + // manually resize offset index to force roll of an empty active segment on next append + log.activeSegment.offsetIndex.resize(0) + val records3 = TestUtils.records( + List(new SimpleRecord(mockTime.milliseconds + 12, "k3".getBytes, "v3".getBytes)), + baseOffset = 2L, partitionLeaderEpoch = 0) + log.appendAsFollower(records3) + assertTrue(log.activeSegment.offsetIndex.maxEntries > 1) + assertEquals(2, readLog(log, 2, 100, Some(3)).records.batches.iterator.next().lastOffset) + assertEquals("Expect two segments.", 2, log.numberOfSegments) + } + @Test(expected = classOf[OutOfOrderSequenceException]) def testNonSequentialAppend(): Unit = { // create a log @@ -828,17 +874,17 @@ class LogTest { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) - log.roll(1L) + log.roll(Some(1L)) assertEquals(Some(1L), log.latestProducerSnapshotOffset) assertEquals(Some(1L), log.oldestProducerSnapshotOffset) log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 0) - log.roll(2L) + log.roll(Some(2L)) assertEquals(Some(2L), log.latestProducerSnapshotOffset) assertEquals(Some(1L), log.oldestProducerSnapshotOffset) log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), leaderEpoch = 0) - log.roll(3L) + log.roll(Some(3L)) assertEquals(Some(3L), log.latestProducerSnapshotOffset) // roll triggers a flush at the starting offset of the new segment, we should retain all snapshots @@ -1282,7 +1328,7 @@ class LogTest { val logConfig = LogTest.createLogConfig() val log = createLog(logDir, logConfig) log.closeHandlers() - log.roll(1) + log.roll(Some(1L)) } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Error while trying to roll a segment that already exists > -------------------------------------------------------- > > Key: KAFKA-6388 > URL: https://issues.apache.org/jira/browse/KAFKA-6388 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 1.0.0 > Reporter: David Hay > Priority: Blocker > > Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in > our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2). > After spending 30 min or more spewing log messages like this: > {noformat} > [2017-12-19 16:44:28,998] INFO Replica loaded for partition > screening.save.results.screening.save.results.processor.error-43 with initial > high watermark 0 (kafka.cluster.Replica) > {noformat} > Eventually, the replica thread throws the error below (also referenced in the > original issue). If I remove that partition from the data directory and > bounce the broker, it eventually rebalances (assuming it doesn't hit a > different partition with the same error). > {noformat} > 2017-12-19 15:16:24,227] WARN Newly rolled segment file > 00000000000000000002.log already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 00000000000000000002.index already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 00000000000000000002.timeindex already exists; deleting it first > (kafka.log.Log) > [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions __consumer_offsets-20 > (kafka.server.ReplicaFetcherManager) > [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > sr.new.sr.new.processor.error-38 offset 2 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Caused by: kafka.common.KafkaException: Trying to roll a new log segment for > topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it > already exists. > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338) > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.roll(Log.scala:1297) > at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.append(Log.scala:624) > at kafka.log.Log.appendAsFollower(Log.scala:607) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184) > ... 13 more > [2017-12-19 15:16:24,302] INFO [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)