[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions
[ https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-4451: -- Fix Version/s: (was: 0.9.0.1) > Recovering empty replica yields negative offsets in index of compact > partitions > --- > > Key: KAFKA-4451 > URL: https://issues.apache.org/jira/browse/KAFKA-4451 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Michael Schiff > > Bringing up an empty broker. > the partition for a compact topic is not split into multiple log files. All > data is written into a single log file, causing offsets to overflow. > A dump of the affected broker shortly after it started replicating: > {code} > michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index | head -n 10 > Dumping /kafka/attainment_event-0/.index > offset: 1022071124 position: 1037612 > offset: -1713432120 position: 1348740 > offset: -886291423 position: 2397130 > offset: -644750126 position: 3445630 > offset: -57889876 position: 4493972 > offset: 433950099 position: 5388461 > offset: 1071769472 position: 6436837 > offset: 1746859069 position: 7485367 > offset: 2090359736 position: 8533822 > ... > {code} > and the dump of the same log file from the leader of this partition > {code} > michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index > [sudo] password for michael.schiff: > Dumping /kafka/attainment_event-0/.index > offset: 353690666 position: 262054 > offset: 633140428 position: 523785 > offset: 756537951 position: 785815 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions
[ https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-4451: -- Affects Version/s: 0.9.0.1 > Recovering empty replica yields negative offsets in index of compact > partitions > --- > > Key: KAFKA-4451 > URL: https://issues.apache.org/jira/browse/KAFKA-4451 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Michael Schiff > > Bringing up an empty broker. > the partition for a compact topic is not split into multiple log files. All > data is written into a single log file, causing offsets to overflow. > A dump of the affected broker shortly after it started replicating: > {code} > michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index | head -n 10 > Dumping /kafka/attainment_event-0/.index > offset: 1022071124 position: 1037612 > offset: -1713432120 position: 1348740 > offset: -886291423 position: 2397130 > offset: -644750126 position: 3445630 > offset: -57889876 position: 4493972 > offset: 433950099 position: 5388461 > offset: 1071769472 position: 6436837 > offset: 1746859069 position: 7485367 > offset: 2090359736 position: 8533822 > ... > {code} > and the dump of the same log file from the leader of this partition > {code} > michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index > [sudo] password for michael.schiff: > Dumping /kafka/attainment_event-0/.index > offset: 353690666 position: 262054 > offset: 633140428 position: 523785 > offset: 756537951 position: 785815 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions
[ https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15700688#comment-15700688 ] Michael Schiff commented on KAFKA-4451: --- After some further exploration: {code:title=ReplicaFetcherThread.scala:131} replica.log.get.append(messageSet, assignOffsets = false) {code} {code:title=Log.scala:405} // maybe roll the log if this segment is full val segment = maybeRoll(messagesSize = validMessages.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp) {code} maybeRoll depends on AbtractIndex.isFulll {code:title=AbstractIndex.scala:75} /** * The maximum number of entries this index can hold */ @volatile private[this] var _maxEntries = mmap.limit / entrySize /** The number of entries in this index */ @volatile protected var _entries = mmap.position / entrySize /** * True iff there are no more slots available in this index */ def isFull: Boolean = _entries >= _maxEntries {code} It appears the logic for whether or not to roll the segment depends on the absolute number of messages in the index and not the range of offsets stored in the index. This allows the index to be overflown in the case of a heavily compacted topic. Protective logic for the same issue is present in LogCleaner.groupSegmentsBySize {code:title=LogCleaner.scala:546} /** * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data. * We collect a group of such segments together into a single * destination segment. This prevents segment sizes from shrinking too much. * * @param segments The log segments to group * @param maxSize the maximum size in bytes for the total of all log data in a group * @param maxIndexSize the maximum size in bytes for the total of all index data in a group * * @return A list of grouped segments */ private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = { var grouped = List[List[LogSegment]]() var segs = segments.toList while(segs.nonEmpty) { var group = List(segs.head) var logSize = segs.head.size var indexSize = segs.head.index.sizeInBytes var timeIndexSize = segs.head.timeIndex.sizeInBytes segs = segs.tail while(segs.nonEmpty && logSize + segs.head.size <= maxSize && indexSize + segs.head.index.sizeInBytes <= maxIndexSize && timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) { group = segs.head :: group logSize += segs.head.size indexSize += segs.head.index.sizeInBytes timeIndexSize += segs.head.timeIndex.sizeInBytes segs = segs.tail } grouped ::= group.reverse } grouped.reverse } {code} specifically: {code:title=LogCleaner.scala:559} segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) { {code} > Recovering empty replica yields negative offsets in index of compact > partitions > --- > > Key: KAFKA-4451 > URL: https://issues.apache.org/jira/browse/KAFKA-4451 > Project: Kafka > Issue Type: Bug >Reporter: Michael Schiff > Fix For: 0.9.0.1 > > > Bringing up an empty broker. > the partition for a compact topic is not split into multiple log files. All > data is written into a single log file, causing offsets to overflow. > A dump of the affected broker shortly after it started replicating: > {code} > michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index | head -n 10 > Dumping /kafka/attainment_event-0/.index > offset: 1022071124 position: 1037612 > offset: -1713432120 position: 1348740 > offset: -886291423 position: 2397130 > offset: -644750126 position: 3445630 > offset: -57889876 position: 4493972 > offset: 433950099 position: 5388461 > offset: 1071769472 position: 6436837 > offset: 1746859069 position: 7485367 > offset: 2090359736 position: 8533822 > ... > {code} > and the dump of the same log file from the leader of this partition > {code} > michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index > [sudo] password for michael.schiff: > Dumping /kafka/attainment_event-0/.index > offset: 353690666 position: 262054 > offset: 633140428 position: 523785 > offset: 756537951 position: 785815 > {code} -- This message was sent by Atlassian
[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions
[ https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-4451: -- Description: Bringing up an empty broker. the partition for a compact topic is not split into multiple log files. All data is written into a single log file, causing offsets to overflow. A dump of the affected broker shortly after it started replicating: {code} michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/attainment_event-0/.index | head -n 10 Dumping /kafka/attainment_event-0/.index offset: 1022071124 position: 1037612 offset: -1713432120 position: 1348740 offset: -886291423 position: 2397130 offset: -644750126 position: 3445630 offset: -57889876 position: 4493972 offset: 433950099 position: 5388461 offset: 1071769472 position: 6436837 offset: 1746859069 position: 7485367 offset: 2090359736 position: 8533822 ... {code} and the dump of the same log file from the leader of this partition {code} michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/attainment_event-0/.index [sudo] password for michael.schiff: Dumping /kafka/attainment_event-0/.index offset: 353690666 position: 262054 offset: 633140428 position: 523785 offset: 756537951 position: 785815 {code} was: Bringing up an empty broker. the partition for compact topic is not split into multiple log files. All data is written into a single log file, causing offsets to overflow. A dump of the affected broker shortly after it started replicating: {code} michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/attainment_event-0/.index | head -n 10 Dumping /kafka/attainment_event-0/.index offset: 1022071124 position: 1037612 offset: -1713432120 position: 1348740 offset: -886291423 position: 2397130 offset: -644750126 position: 3445630 offset: -57889876 position: 4493972 offset: 433950099 position: 5388461 offset: 1071769472 position: 6436837 offset: 1746859069 position: 7485367 offset: 2090359736 position: 8533822 ... {code} and the dump of the same log file from the leader of this partition {code} michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/attainment_event-0/.index [sudo] password for michael.schiff: Dumping /kafka/attainment_event-0/.index offset: 353690666 position: 262054 offset: 633140428 position: 523785 offset: 756537951 position: 785815 {code} > Recovering empty replica yields negative offsets in index of compact > partitions > --- > > Key: KAFKA-4451 > URL: https://issues.apache.org/jira/browse/KAFKA-4451 > Project: Kafka > Issue Type: Bug >Reporter: Michael Schiff > Fix For: 0.9.0.1 > > > Bringing up an empty broker. > the partition for a compact topic is not split into multiple log files. All > data is written into a single log file, causing offsets to overflow. > A dump of the affected broker shortly after it started replicating: > {code} > michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index | head -n 10 > Dumping /kafka/attainment_event-0/.index > offset: 1022071124 position: 1037612 > offset: -1713432120 position: 1348740 > offset: -886291423 position: 2397130 > offset: -644750126 position: 3445630 > offset: -57889876 position: 4493972 > offset: 433950099 position: 5388461 > offset: 1071769472 position: 6436837 > offset: 1746859069 position: 7485367 > offset: 2090359736 position: 8533822 > ... > {code} > and the dump of the same log file from the leader of this partition > {code} > michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files > /kafka/attainment_event-0/.index > [sudo] password for michael.schiff: > Dumping /kafka/attainment_event-0/.index > offset: 353690666 position: 262054 > offset: 633140428 position: 523785 > offset: 756537951 position: 785815 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions
Michael Schiff created KAFKA-4451: - Summary: Recovering empty replica yields negative offsets in index of compact partitions Key: KAFKA-4451 URL: https://issues.apache.org/jira/browse/KAFKA-4451 Project: Kafka Issue Type: Bug Reporter: Michael Schiff Fix For: 0.9.0.1 Bringing up an empty broker. the partition for compact topic is not split into multiple log files. All data is written into a single log file, causing offsets to overflow. A dump of the affected broker shortly after it started replicating: {code} michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/attainment_event-0/.index | head -n 10 Dumping /kafka/attainment_event-0/.index offset: 1022071124 position: 1037612 offset: -1713432120 position: 1348740 offset: -886291423 position: 2397130 offset: -644750126 position: 3445630 offset: -57889876 position: 4493972 offset: 433950099 position: 5388461 offset: 1071769472 position: 6436837 offset: 1746859069 position: 7485367 offset: 2090359736 position: 8533822 ... {code} and the dump of the same log file from the leader of this partition {code} michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/attainment_event-0/.index [sudo] password for michael.schiff: Dumping /kafka/attainment_event-0/.index offset: 353690666 position: 262054 offset: 633140428 position: 523785 offset: 756537951 position: 785815 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185889#comment-15185889 ] Michael Schiff commented on KAFKA-3323: --- [~gwenshap] [~ijuma] https://github.com/tubemogul/kafka-logsplitter Thank you guys! > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1, 0.8.2.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185540#comment-15185540 ] Michael Schiff edited comment on KAFKA-3323 at 3/8/16 7:33 PM: --- [~ijuma] I reopened to submit the patch, but this didn't seem like the correct action, as this utility is currently just a standalone program, not a part of the Kafka distribution. Should I convert this utility into something that can be run like the other utilities (e.g. dump log segments)? Once everyone has upgraded, this tool will not be useful anymore, so I don't know that it belongs in the distribution. was (Author: michael.schiff): [~ijuma] I went to submit the patch, but this didn't seem like the correct action, as this utility is currently just a standalone program, not a part of the Kafka distribution. Should I convert this utility into something that can be run like the other utilities (e.g. dump log segments)? Once everyone has upgraded, this tool will not be useful anymore, so I don't know that it belongs in the distribution. > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1, 0.8.2.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185540#comment-15185540 ] Michael Schiff commented on KAFKA-3323: --- [~ijuma] I went to submit the patch, but this didn't seem like the correct action, as this utility is currently just a standalone program, not a part of the Kafka distribution. Should I convert this utility into something that can be run like the other utilities (e.g. dump log segments)? Once everyone has upgraded, this tool will not be useful anymore, so I don't know that it belongs in the distribution. > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1, 0.8.2.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-3323: -- Status: Open (was: Patch Available) > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.1, 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-3323: -- Comment: was deleted (was: Re-opening so that I can attach the log-splitter patch) > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1, 0.8.2.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-3323: -- Comment: was deleted (was: Log Splitter utility. For those with production log files that are affected by this issue, and cannot be dropped. This utility will split existing log segments into legal chunks) > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1, 0.8.2.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-3323: -- Affects Version/s: 0.8.2.1 Status: Patch Available (was: Reopened) Log Splitter utility. For those with production log files that are affected by this issue, and cannot be dropped. This utility will split existing log segments into legal chunks > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.1, 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff reopened KAFKA-3323: --- Re-opening so that I can attach the log-splitter patch > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185488#comment-15185488 ] Michael Schiff edited comment on KAFKA-3323 at 3/8/16 6:52 PM: --- [~ijuma] Yes, it is a duplicate. The accepted solution was the first one I mentioned in my comment: group segments to clean by segment size in bytes *and* by limiting the difference between the segment base offset and the max offset that goes into it. This however does not address the already broken log segments that I (and others) may have in production. For this I have written a utility to split log segments into legal chunks. If this seems useful I can submit it as a patch. was (Author: michael.schiff): [~ijuma] Yes, it is a duplicate. The accepted solution was the first one I mentioned in my comment: group segments to clean, by segment size in bytes and by limiting the difference between the segment base offset and the max offset that goes into it. This however does not address the already broken log segments that I (and others) may have in production. For this I have written a utility to split log segments into legal chunks. If this seems useful I can submit it as a patch. > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185488#comment-15185488 ] Michael Schiff commented on KAFKA-3323: --- [~ijuma] Yes, it is a duplicate. The accepted solution was the first one I mentioned in my comment: group segments to clean, by segment size in bytes and by limiting the difference between the segment base offset and the max offset that goes into it. This however does not address the already broken log segments that I (and others) may have in production. For this I have written a utility to split log segments into legal chunks. If this seems useful I can submit it as a patch. > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-3323: -- Description: Once the Offset Index has negative offset values, the binary search for position lookup is broken. This causes consumers of compact topics to skip large offset intervals when bootstrapping. This has serious implications for consumers of compact topics. {code} /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) this.mmap.putInt((offset - baseOffset).toInt) this.mmap.putInt(position) this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getAbsolutePath)) } } } {code} OffsetIndex.append assumes that (offset - baseOffset) can be represented as an integer without overflow. If the LogSegment is from a compacted topic, this assumption may not be valid. The result is a quiet integer overflow, which stores a negative value into the index. I believe that the issue is caused by the LogCleaner. Specifically, by the groupings produced by {code} /** * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data. * We collect a group of such segments together into a single * destination segment. This prevents segment sizes from shrinking too much. * * @param segments The log segments to group * @param maxSize the maximum size in bytes for the total of all log data in a group * @param maxIndexSize the maximum size in bytes for the total of all index data in a group * * @return A list of grouped segments */ private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] {code} Since this method is only concerned with grouping by size, without taking baseOffset and groupMaxOffset into account, it will produce groups that when cleaned into a single segment, have offsets that overflow. This is more likely for topics with low key cardinality, but high update volume, as you could wind up with very few cleaned records, but with very high offsets. was: Once the Offset Index has negative offset values, the binary search for position lookup is broken. This causes consumers of compact topics to skip large offset intervals when bootstrapping. This has serious implications for consumers of compact topics. {code} /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) this.mmap.putInt((offset - baseOffset).toInt) this.mmap.putInt(position) this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getAbsolutePath)) } } } {code} OffsetIndex.append assumes that (offset - baseOffset) can be represented as an integer without overflow. If the LogSegment is from a compacted topic, this assumption may not be valid. The result is a quiet integer overflow, which stores a negative value into the index. This breaks the binary search used to lookup offset positions -> large intervals of offsets are skipped by consumers who are bootstrapping themselves on the topic. I believe that the issue is caused by the LogCleaner. Specifically, by the groupings produced by {code} /** * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data. * We collect a group of such segments together into a single * destination segment. This prevents segment sizes from
[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-3323: -- Description: Once the Offset Index has negative offset values, the binary search for position lookup is broken. This causes consumers of compact topics to skip large offset intervals when bootstrapping. This has serious implications for consumers of compact topics. {code} /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) this.mmap.putInt((offset - baseOffset).toInt) this.mmap.putInt(position) this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getAbsolutePath)) } } } {code} OffsetIndex.append assumes that (offset - baseOffset) can be represented as an integer without overflow. If the LogSegment is from a compacted topic, this assumption may not be valid. The result is a quiet integer overflow, which stores a negative value into the index. This breaks the binary search used to lookup offset positions -> large intervals of offsets are skipped by consumers who are bootstrapping themselves on the topic. I believe that the issue is caused by the LogCleaner. Specifically, by the groupings produced by {code} /** * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data. * We collect a group of such segments together into a single * destination segment. This prevents segment sizes from shrinking too much. * * @param segments The log segments to group * @param maxSize the maximum size in bytes for the total of all log data in a group * @param maxIndexSize the maximum size in bytes for the total of all index data in a group * * @return A list of grouped segments */ private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] {code} Since this method is only concerned with grouping by size, without taking baseOffset and groupMaxOffset into account, it will produce groups that when cleaned into a single segment, have offsets that overflow. This is more likely for topics with low key cardinality, but high update volume, as you could wind up with very few cleaned records, but with very high offsets. was: {code} /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) this.mmap.putInt((offset - baseOffset).toInt) this.mmap.putInt(position) this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getAbsolutePath)) } } } {code} OffsetIndex.append assumes that (offset - baseOffset) can be represented as an integer without overflow. If the LogSegment is from a compacted topic, this assumption may not be valid. The result is a quiet integer overflow, which stores a negative value into the index. This breaks the binary search used to lookup offset positions -> large intervals of offsets are skipped by consumers who are bootstrapping themselves on the topic. I believe that the issue is caused by the LogCleaner. Specifically, by the groupings produced by {code} /** * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data. * We collect a group of such segments together into a single * destination segment. This prevents segment sizes from shrinking too much. * * @param segments The log segments to group * @param maxSize the
[jira] [Comment Edited] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178187#comment-15178187 ] Michael Schiff edited comment on KAFKA-3323 at 3/3/16 5:56 PM: --- Regarding solutions: One alternative is to group segments to clean, not according to segment size in bytes, but by limiting the difference between the segment base offset and the max offset that goes into it. I see two issues with this. The first is that we could wind up creating segments that are actually pretty small, size wise. The second issue is that, for people like me, who already have production topics with log segments with (offsets - baseOffset) > Int.MAX_VALUE, changing the grouping does not fix our issue. If you already have such log segments, they cannot be properly indexed as is. The second solution is to make indexes use long valued keys. This would mean changing the index format, forcing you to rebuild all of your existing indices. However, you can then continue grouping segments to clean the same way, ensuring good sized segments. It also means that people with segments that are already affected are not forced to drop their data. was (Author: michael.schiff): Regarding solutions: One alternative is to group segments to clean, not according to segment size in bytes, but by limiting the difference between the segment base offset and the max offset that goes into it. I see two issues with this. The first is that we could wind up creating segments that are actually pretty small, size wise. The second issue is that, for people like me, who already have production topics with log segments with (offsets - baseOffset) > Int.MAX_VALUE, changing the grouping does not fix our issue. If you already have such log segments, they cannot be properly indexed as is. The second solution is to make indexes use long valued keys. This would mean changing the index format, forcing you to rebuild all of your existing indices. However, you can then continue grouping segments to clean the same way, ensuring good sized segments. It also means that people with segments that are already incorrect are not forced to drop their data. > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. This breaks the binary search > used to lookup offset positions -> large intervals of offsets are skipped by > consumers who are bootstrapping themselves on the topic. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list
[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178187#comment-15178187 ] Michael Schiff commented on KAFKA-3323: --- Regarding solutions: One alternative is to group segments to clean, not according to segment size in bytes, but by limiting the difference between the segment base offset and the max offset that goes into it. I see two issues with this. The first is that we could wind up creating segments that are actually pretty small, size wise. The second issue is that, for people like me, who already have production topics with log segments with (offsets - baseOffset) > Int.MAX_VALUE, changing the grouping does not fix our issue. If you already have such log segments, they cannot be properly indexed as is. The second solution is to make indexes use long valued keys. This would mean changing the index format, forcing you to rebuild all of your existing indices. However, you can then continue grouping segments to clean the same way, ensuring good sized segments. It also means that people with segments that are already incorrect are not forced to drop their data. > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. This breaks the binary search > used to lookup offset positions -> large intervals of offsets are skipped by > consumers who are bootstrapping themselves on the topic. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Schiff updated KAFKA-3323: -- Attachment: log_dump.txt index_dump.txt Index and Log dump from the 0 segment of partition 0 of a compact topic. The index file is very small, and is quite obviously incorrect. {code} offset: -1733149320 position: 1307775 {code} Is the first incorrect entry. The corresponding entry in the Log dump shows an offset of {code}6856785272L{code}. This value overflows to the indexed offset. {code} scala> 6856785272L.toInt res1: Int = -1733149320 {code} > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > --- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1.1 >Reporter: Michael Schiff >Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > {code} > /** >* Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. >*/ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. This breaks the binary search > used to lookup offset positions -> large intervals of offsets are skipped by > consumers who are bootstrapping themselves on the topic. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** >* Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. >* We collect a group of such segments together into a single >* destination segment. This prevents segment sizes from shrinking too much. >* >* @param segments The log segments to group >* @param maxSize the maximum size in bytes for the total of all log data > in a group >* @param maxIndexSize the maximum size in bytes for the total of all index > data in a group >* >* @return A list of grouped segments >*/ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled
Michael Schiff created KAFKA-3323: - Summary: Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled Key: KAFKA-3323 URL: https://issues.apache.org/jira/browse/KAFKA-3323 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1.1 Reporter: Michael Schiff Assignee: Jay Kreps {code} /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) this.mmap.putInt((offset - baseOffset).toInt) this.mmap.putInt(position) this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getAbsolutePath)) } } } {code} OffsetIndex.append assumes that (offset - baseOffset) can be represented as an integer without overflow. If the LogSegment is from a compacted topic, this assumption may not be valid. The result is a quiet integer overflow, which stores a negative value into the index. This breaks the binary search used to lookup offset positions -> large intervals of offsets are skipped by consumers who are bootstrapping themselves on the topic. I believe that the issue is caused by the LogCleaner. Specifically, by the groupings produced by {code} /** * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data. * We collect a group of such segments together into a single * destination segment. This prevents segment sizes from shrinking too much. * * @param segments The log segments to group * @param maxSize the maximum size in bytes for the total of all log data in a group * @param maxIndexSize the maximum size in bytes for the total of all index data in a group * * @return A list of grouped segments */ private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] {code} Since this method is only concerned with grouping by size, without taking baseOffset and groupMaxOffset into account, it will produce groups that when cleaned into a single segment, have offsets that overflow. This is more likely for topics with low key cardinality, but high update volume, as you could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)