[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710713#comment-16710713
 ] 

ASF GitHub Bot commented on KAFKA-6388:
---------------------------------------

hachikuji closed pull request #5986: KAFKA-6388: Recover from rolling an empty 
segment that already exists
URL: https://github.com/apache/kafka/pull/5986
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 688736c7d66..c448805e0e6 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1548,8 +1548,8 @@ class Log(@volatile var dir: File,
         in the header.
       */
       appendInfo.firstOffset match {
-        case Some(firstOffset) => roll(firstOffset)
-        case None => roll(maxOffsetInMessages - Integer.MAX_VALUE)
+        case Some(firstOffset) => roll(Some(firstOffset))
+        case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE))
       }
     } else {
       segment
@@ -1562,22 +1562,45 @@ class Log(@volatile var dir: File,
    *
    * @return The newly rolled segment
    */
-  def roll(expectedNextOffset: Long = 0): LogSegment = {
+  def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
     maybeHandleIOException(s"Error while rolling log segment for 
$topicPartition in dir ${dir.getParent}") {
       val start = time.hiResClockMs()
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
-        val newOffset = math.max(expectedNextOffset, logEndOffset)
+        val newOffset = math.max(expectedNextOffset.getOrElse(0L), 
logEndOffset)
         val logFile = Log.logFile(dir, newOffset)
-        val offsetIdxFile = offsetIndexFile(dir, newOffset)
-        val timeIdxFile = timeIndexFile(dir, newOffset)
-        val txnIdxFile = transactionIndexFile(dir, newOffset)
-        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if 
file.exists) {
-          warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
-          Files.delete(file.toPath)
-        }
 
-        
Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
+        if (segments.containsKey(newOffset)) {
+          // segment with the same base offset already exists and loaded
+          if (activeSegment.baseOffset == newOffset && activeSegment.size == 
0) {
+            // We have seen this happen (see KAFKA-6388) after shouldRoll() 
returns true for an
+            // active segment of size zero because of one of the indexes is 
"full" (due to _maxEntries == 0).
+            warn(s"Trying to roll a new log segment with start offset 
$newOffset " +
+                 s"=max(provided offset = $expectedNextOffset, LEO = 
$logEndOffset) while it already " +
+                 s"exists and is active with size 0. Size of time index: 
${activeSegment.timeIndex.entries}," +
+                 s" size of offset index: 
${activeSegment.offsetIndex.entries}.")
+            deleteSegment(activeSegment)
+          } else {
+            throw new KafkaException(s"Trying to roll a new log segment for 
topic partition $topicPartition with start offset $newOffset" +
+                                     s" =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
+                                     s"segment is ${segments.get(newOffset)}.")
+          }
+        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
+          throw new KafkaException(
+            s"Trying to roll a new log segment for topic partition 
$topicPartition with " +
+            s"start offset $newOffset =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active 
segment $activeSegment")
+        } else {
+          val offsetIdxFile = offsetIndexFile(dir, newOffset)
+          val timeIdxFile = timeIndexFile(dir, newOffset)
+          val txnIdxFile = transactionIndexFile(dir, newOffset)
+
+          for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) 
if file.exists) {
+            warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
+            Files.delete(file.toPath)
+          }
+
+          
Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
+        }
 
         // take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
         // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
@@ -1594,10 +1617,7 @@ class Log(@volatile var dir: File,
           fileAlreadyExists = false,
           initFileSize = initFileSize,
           preallocate = config.preallocate)
-        val prev = addSegment(segment)
-        if (prev != null)
-          throw new KafkaException(s"Trying to roll a new log segment for 
topic partition $topicPartition with " +
-            s"start offset $newOffset while it already exists.")
+        addSegment(segment)
         // We need to update the segment base offset and append position data 
of the metadata when log rolls.
         // The next offset should not change.
         updateLogEndOffset(nextOffsetMetadata.messageOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ff5af6123e2..51477b627cb 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1118,7 +1118,7 @@ class LogCleanerTest extends JUnitSuite {
     log.appendAsFollower(record1)
     val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1)
     log.appendAsFollower(record2)
-    log.roll(Int.MaxValue/2) // starting a new log segment at offset 
Int.MaxValue/2
+    log.roll(Some(Int.MaxValue/2)) // starting a new log segment at offset 
Int.MaxValue/2
     val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, 
Int.MaxValue/2)
     log.appendAsFollower(record3)
     val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, 
Int.MaxValue.toLong + 1)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index fe2820cba1c..b4dc807a876 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -142,6 +142,52 @@ class LogTest {
     assertEquals("Appending an empty message set should not roll log even if 
sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
+  @Test
+  def testRollSegmentThatAlreadyExists() {
+    val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+
+    // create a log
+    val log = createLog(logDir, logConfig)
+    assertEquals("Log begins with a single empty segment.", 1, 
log.numberOfSegments)
+
+    // roll active segment with the same base offset of size zero should 
recreate the segment
+    log.roll(Some(0L))
+    assertEquals("Expect 1 segment after roll() empty segment with base 
offset.", 1, log.numberOfSegments)
+
+    // should be able to append records to active segment
+    val records = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds, "k1".getBytes, 
"v1".getBytes)),
+      baseOffset = 0L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records)
+    assertEquals("Expect one segment.", 1, log.numberOfSegments)
+    assertEquals(0L, log.activeSegment.baseOffset)
+
+    // make sure we can append more records
+    val records2 = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds + 10, "k2".getBytes, 
"v2".getBytes)),
+      baseOffset = 1L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records2)
+
+    assertEquals("Expect two records in the log", 2, log.logEndOffset)
+    assertEquals(0, readLog(log, 0, 100, 
Some(1)).records.batches.iterator.next().lastOffset)
+    assertEquals(1, readLog(log, 1, 100, 
Some(2)).records.batches.iterator.next().lastOffset)
+
+    // roll so that active segment is empty
+    log.roll()
+    assertEquals("Expect base offset of active segment to be LEO", 2L, 
log.activeSegment.baseOffset)
+    assertEquals("Expect two segments.", 2, log.numberOfSegments)
+
+    // manually resize offset index to force roll of an empty active segment 
on next append
+    log.activeSegment.offsetIndex.resize(0)
+    val records3 = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds + 12, "k3".getBytes, 
"v3".getBytes)),
+      baseOffset = 2L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records3)
+    assertTrue(log.activeSegment.offsetIndex.maxEntries > 1)
+    assertEquals(2, readLog(log, 2, 100, 
Some(3)).records.batches.iterator.next().lastOffset)
+    assertEquals("Expect two segments.", 2, log.numberOfSegments)
+  }
+
   @Test(expected = classOf[OutOfOrderSequenceException])
   def testNonSequentialAppend(): Unit = {
     // create a log
@@ -828,17 +874,17 @@ class LogTest {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 
0)
-    log.roll(1L)
+    log.roll(Some(1L))
     assertEquals(Some(1L), log.latestProducerSnapshotOffset)
     assertEquals(Some(1L), log.oldestProducerSnapshotOffset)
 
     log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 
0)
-    log.roll(2L)
+    log.roll(Some(2L))
     assertEquals(Some(2L), log.latestProducerSnapshotOffset)
     assertEquals(Some(1L), log.oldestProducerSnapshotOffset)
 
     log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), leaderEpoch = 
0)
-    log.roll(3L)
+    log.roll(Some(3L))
     assertEquals(Some(3L), log.latestProducerSnapshotOffset)
 
     // roll triggers a flush at the starting offset of the new segment, we 
should retain all snapshots
@@ -1282,7 +1328,7 @@ class LogTest {
     val logConfig = LogTest.createLogConfig()
     val log = createLog(logDir,  logConfig)
     log.closeHandlers()
-    log.roll(1)
+    log.roll(Some(1L))
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Error while trying to roll a segment that already exists
> --------------------------------------------------------
>
>                 Key: KAFKA-6388
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6388
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 1.0.0
>            Reporter: David Hay
>            Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 00000000000000000002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 00000000000000000002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 00000000000000000002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
>         at scala.Option.foreach(Option.scala:257)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.roll(Log.scala:1297)
>         at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.append(Log.scala:624)
>         at kafka.log.Log.appendAsFollower(Log.scala:607)
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
>         ... 13 more
> [2017-12-19 15:16:24,302] INFO [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to