[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file
[ https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349179#comment-16349179 ] ASF GitHub Bot commented on KAFKA-6492: --- becketqin closed pull request #4498: KAFKA-6492: Fix log truncation to empty segment URL: https://github.com/apache/kafka/pull/4498 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 45c820bff8d..5970f42f6d9 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -28,7 +28,7 @@ import kafka.utils._ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.LogOffsetPosition import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time} +import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.math._ @@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords, */ @nonthreadsafe def truncateTo(offset: Long): Int = { +// Do offset translation before truncating the index to avoid needless scanning +// in case we truncate the full index val mapping = translateOffset(offset) -if (mapping == null) - return 0 offsetIndex.truncateTo(offset) timeIndex.truncateTo(offset) txnIndex.truncateTo(offset) -// after truncation, reset and allocate more space for the (new currently active) index + +// After truncation, reset and allocate more space for the (new currently active) index offsetIndex.resize(offsetIndex.maxIndexSize) timeIndex.resize(timeIndex.maxIndexSize) -val bytesTruncated = log.truncateTo(mapping.position) -if(log.sizeInBytes == 0) { + +val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position) +if (log.sizeInBytes == 0) { created = time.milliseconds rollingBasedTimestamp = None } + bytesSinceLastIndexEntry = 0 if (maxTimestampSoFar >= 0) loadLargestTimestamp() diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 469b3cca40e..c45ed0d2986 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -22,7 +22,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils.checkEquals import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -36,13 +36,16 @@ class LogSegmentTest { var logDir: File = _ /* create a segment with the given base offset */ - def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = { + def createSegment(offset: Long, +indexIntervalBytes: Int = 10, +maxSegmentMs: Int = Int.MaxValue, +time: Time = Time.SYSTEM): LogSegment = { val ms = FileRecords.open(Log.logFile(logDir, offset)) val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) -val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue, - maxSegmentBytes = Int.MaxValue, Time.SYSTEM) +val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs, + maxSegmentBytes = Int.MaxValue, time) segments += seg seg } @@ -157,6 +160,47 @@ class LogSegmentTest { } } + @Test + def testTruncateEmptySegment() { +// This tests the scenario in which the follower truncates to an empty segment. In this +// case we must ensure that the index is resized so that the log segment is not mistakenly +// rolled due to a full index + +val maxSegmentMs = 30 +val time = new MockTime +val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) +seg.close() + +val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) +assertEquals(0, seg.timeIndex.sizeInBytes) +assertEquals(0, seg.offsetIndex.sizeInBytes) + +time.sleep(500) +reopened.truncateTo(57) +assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) +
[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file
[ https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347816#comment-16347816 ] Jason Gustafson commented on KAFKA-6492: [~becket_qin] Beat you to it. Let me know if the PR I submitted makes sense. > LogSemgent.truncateTo() should always resize the index file > --- > > Key: KAFKA-6492 > URL: https://issues.apache.org/jira/browse/KAFKA-6492 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2 >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.1.0 > > > The bug is the following: > # Initially on a follower broker there are two segments 0 and segment 1. > Segment 0 is empty (maybe due to log compaction) > # log is truncated to 0. > # LogSemgent.Truncate() will not find a message to truncate in segment 0, so > it will skip resizing the index/timeindex files. > # When a new message is fetched, Log.maybeRoll() will try to roll a new > segment because the index file of segment 0 is already full (max size is 0) > # After creating the new segment 0, the replica fetcher thread finds that > there is already a segment 0 exists. So it just throws exception and dies. > The fix would be let the broker make sure the index files of active segments > are always resized properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file
[ https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347670#comment-16347670 ] Jiangjie Qin commented on KAFKA-6492: - [~hachikuji] You are right. This issue will affect earlier versions as well. I'll submit a PR. > LogSemgent.truncateTo() should always resize the index file > --- > > Key: KAFKA-6492 > URL: https://issues.apache.org/jira/browse/KAFKA-6492 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.1.0 > > > The bug is the following: > # Initially on a follower broker there are two segments 0 and segment 1. > Segment 0 is empty (maybe due to log compaction) > # log is truncated to 0. > # LogSemgent.Truncate() will not find a message to truncate in segment 0, so > it will skip resizing the index/timeindex files. > # When a new message is fetched, Log.maybeRoll() will try to roll a new > segment because the index file of segment 0 is already full (max size is 0) > # After creating the new segment 0, the replica fetcher thread finds that > there is already a segment 0 exists. So it just throws exception and dies. > The fix would be let the broker make sure the index files of active segments > are always resized properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file
[ https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347612#comment-16347612 ] ASF GitHub Bot commented on KAFKA-6492: --- hachikuji opened a new pull request #4498: KAFKA-6492: Fix log truncation to empty segment URL: https://github.com/apache/kafka/pull/4498 This patch ensures that truncation to an empty segment forces resizing of the index file in order to prevent premature rolling. I have added unit tests which verify that appends are permitted following truncation to an empty segment. Without the fix, this test case reproduces the failure in which the rolled segment matches the current active segment. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > LogSemgent.truncateTo() should always resize the index file > --- > > Key: KAFKA-6492 > URL: https://issues.apache.org/jira/browse/KAFKA-6492 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.1.0 > > > The bug is the following: > # Initially on a follower broker there are two segments 0 and segment 1. > Segment 0 is empty (maybe due to log compaction) > # log is truncated to 0. > # LogSemgent.Truncate() will not find a message to truncate in segment 0, so > it will skip resizing the index/timeindex files. > # When a new message is fetched, Log.maybeRoll() will try to roll a new > segment because the index file of segment 0 is already full (max size is 0) > # After creating the new segment 0, the replica fetcher thread finds that > there is already a segment 0 exists. So it just throws exception and dies. > The fix would be let the broker make sure the index files of active segments > are always resized properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file
[ https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347394#comment-16347394 ] Jason Gustafson commented on KAFKA-6492: [~becket_qin] Does this only affect 1.0.0? As far as I can tell, it can happen on older versions as well. I have seen recently an instance of the failure during log rolling on 0.10.1. It was a compacted topic, but I cannot confirm whether it is the specific truncation scenario you describe above (I don't have all the historic logs). > LogSemgent.truncateTo() should always resize the index file > --- > > Key: KAFKA-6492 > URL: https://issues.apache.org/jira/browse/KAFKA-6492 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.1.0 > > > The bug is the following: > # Initially on a follower broker there are two segments 0 and segment 1. > Segment 0 is empty (maybe due to log compaction) > # log is truncated to 0. > # LogSemgent.Truncate() will not find a message to truncate in segment 0, so > it will skip resizing the index/timeindex files. > # When a new message is fetched, Log.maybeRoll() will try to roll a new > segment because the index file of segment 0 is already full (max size is 0) > # After creating the new segment 0, the replica fetcher thread finds that > there is already a segment 0 exists. So it just throws exception and dies. > The fix would be let the broker make sure the index files of active segments > are always resized properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)