[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Schiff updated KAFKA-3323: ---------------------------------- Comment: was deleted (was: Re-opening so that I can attach the log-splitter patch) > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.8.1.1, 0.8.2.1 > Reporter: Michael Schiff > Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > Once the Offset Index has negative offset values, the binary search for > position lookup is broken. This causes consumers of compact topics to skip > large offset intervals when bootstrapping. This has serious implications for > consumers of compact topics. > {code} > /** > * Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. > */ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** > * Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. > * We collect a group of such segments together into a single > * destination segment. This prevents segment sizes from shrinking too much. > * > * @param segments The log segments to group > * @param maxSize the maximum size in bytes for the total of all log data > in a group > * @param maxIndexSize the maximum size in bytes for the total of all index > data in a group > * > * @return A list of grouped segments > */ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)