[jira] [Issue Comment Deleted] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Comment: was deleted

(was: Re-opening so that I can attach the log-splitter patch)

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Comment: was deleted

(was: Log Splitter utility.  For those with production log files that are 
affected by this issue, and cannot be dropped.  This utility will split 
existing log segments into legal chunks)

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)