[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-11-28 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-4451:
--
Fix Version/s: (was: 0.9.0.1)

> Recovering empty replica yields negative offsets in index of compact 
> partitions
> ---
>
> Key: KAFKA-4451
> URL: https://issues.apache.org/jira/browse/KAFKA-4451
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Michael Schiff
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index | head -n 10
> Dumping /kafka/attainment_event-0/.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-11-28 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-4451:
--
Affects Version/s: 0.9.0.1

> Recovering empty replica yields negative offsets in index of compact 
> partitions
> ---
>
> Key: KAFKA-4451
> URL: https://issues.apache.org/jira/browse/KAFKA-4451
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Michael Schiff
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index | head -n 10
> Dumping /kafka/attainment_event-0/.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-11-27 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15700688#comment-15700688
 ] 

Michael Schiff commented on KAFKA-4451:
---

After some further exploration:


{code:title=ReplicaFetcherThread.scala:131}
  replica.log.get.append(messageSet, assignOffsets = false)
{code}

{code:title=Log.scala:405}
// maybe roll the log if this segment is full
val segment = maybeRoll(messagesSize = validMessages.sizeInBytes,
maxTimestampInMessages = 
appendInfo.maxTimestamp)
{code}

maybeRoll depends on AbtractIndex.isFulll
{code:title=AbstractIndex.scala:75}
  /**
   * The maximum number of entries this index can hold
   */
  @volatile
  private[this] var _maxEntries = mmap.limit / entrySize

  /** The number of entries in this index */
  @volatile
  protected var _entries = mmap.position / entrySize

  /**
   * True iff there are no more slots available in this index
   */
  def isFull: Boolean = _entries >= _maxEntries
{code}

It appears the logic for whether or not to roll the segment depends on the 
absolute number of messages in the index and not the range of offsets stored in 
the index.  This allows the index to be overflown in the case of a heavily 
compacted topic.

Protective logic for the same issue is present in LogCleaner.groupSegmentsBySize
{code:title=LogCleaner.scala:546}
  /**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from shrinking too much.
   *
   * @param segments The log segments to group
   * @param maxSize the maximum size in bytes for the total of all log data in 
a group
   * @param maxIndexSize the maximum size in bytes for the total of all index 
data in a group
   *
   * @return A list of grouped segments
   */
  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: 
Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
var grouped = List[List[LogSegment]]()
var segs = segments.toList
while(segs.nonEmpty) {
  var group = List(segs.head)
  var logSize = segs.head.size
  var indexSize = segs.head.index.sizeInBytes
  var timeIndexSize = segs.head.timeIndex.sizeInBytes
  segs = segs.tail
  while(segs.nonEmpty &&
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= 
Int.MaxValue) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.index.sizeInBytes
timeIndexSize += segs.head.timeIndex.sizeInBytes
segs = segs.tail
  }
  grouped ::= group.reverse
}
grouped.reverse
  }
{code}
specifically:
{code:title=LogCleaner.scala:559}
segs.head.index.lastOffset - group.last.index.baseOffset <= 
Int.MaxValue) {
{code}

> Recovering empty replica yields negative offsets in index of compact 
> partitions
> ---
>
> Key: KAFKA-4451
> URL: https://issues.apache.org/jira/browse/KAFKA-4451
> Project: Kafka
>  Issue Type: Bug
>Reporter: Michael Schiff
> Fix For: 0.9.0.1
>
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index | head -n 10
> Dumping /kafka/attainment_event-0/.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian 

[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-11-27 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-4451:
--
Description: 
Bringing up an empty broker.  

the partition for a compact topic is not split into multiple log files.  All 
data is written into a single log file, causing offsets to overflow.

A dump of the affected broker shortly after it started replicating:
{code}
michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 
/kafka/attainment_event-0/.index | head -n 10
Dumping /kafka/attainment_event-0/.index
offset: 1022071124 position: 1037612
offset: -1713432120 position: 1348740
offset: -886291423 position: 2397130
offset: -644750126 position: 3445630
offset: -57889876 position: 4493972
offset: 433950099 position: 5388461
offset: 1071769472 position: 6436837
offset: 1746859069 position: 7485367
offset: 2090359736 position: 8533822
...
{code}

and the dump of the same log file from the leader of this partition
{code}
michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 
/kafka/attainment_event-0/.index
[sudo] password for michael.schiff:
Dumping /kafka/attainment_event-0/.index
offset: 353690666 position: 262054
offset: 633140428 position: 523785
offset: 756537951 position: 785815
{code}

  was:
Bringing up an empty broker.  

the partition for compact topic is not split into multiple log files.  All data 
is written into a single log file, causing offsets to overflow.

A dump of the affected broker shortly after it started replicating:
{code}
michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 
/kafka/attainment_event-0/.index | head -n 10
Dumping /kafka/attainment_event-0/.index
offset: 1022071124 position: 1037612
offset: -1713432120 position: 1348740
offset: -886291423 position: 2397130
offset: -644750126 position: 3445630
offset: -57889876 position: 4493972
offset: 433950099 position: 5388461
offset: 1071769472 position: 6436837
offset: 1746859069 position: 7485367
offset: 2090359736 position: 8533822
...
{code}

and the dump of the same log file from the leader of this partition
{code}
michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 
/kafka/attainment_event-0/.index
[sudo] password for michael.schiff:
Dumping /kafka/attainment_event-0/.index
offset: 353690666 position: 262054
offset: 633140428 position: 523785
offset: 756537951 position: 785815
{code}


> Recovering empty replica yields negative offsets in index of compact 
> partitions
> ---
>
> Key: KAFKA-4451
> URL: https://issues.apache.org/jira/browse/KAFKA-4451
> Project: Kafka
>  Issue Type: Bug
>Reporter: Michael Schiff
> Fix For: 0.9.0.1
>
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index | head -n 10
> Dumping /kafka/attainment_event-0/.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-11-27 Thread Michael Schiff (JIRA)
Michael Schiff created KAFKA-4451:
-

 Summary: Recovering empty replica yields negative offsets in index 
of compact partitions
 Key: KAFKA-4451
 URL: https://issues.apache.org/jira/browse/KAFKA-4451
 Project: Kafka
  Issue Type: Bug
Reporter: Michael Schiff
 Fix For: 0.9.0.1


Bringing up an empty broker.  

the partition for compact topic is not split into multiple log files.  All data 
is written into a single log file, causing offsets to overflow.

A dump of the affected broker shortly after it started replicating:
{code}
michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 
/kafka/attainment_event-0/.index | head -n 10
Dumping /kafka/attainment_event-0/.index
offset: 1022071124 position: 1037612
offset: -1713432120 position: 1348740
offset: -886291423 position: 2397130
offset: -644750126 position: 3445630
offset: -57889876 position: 4493972
offset: 433950099 position: 5388461
offset: 1071769472 position: 6436837
offset: 1746859069 position: 7485367
offset: 2090359736 position: 8533822
...
{code}

and the dump of the same log file from the leader of this partition
{code}
michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 
/kafka/attainment_event-0/.index
[sudo] password for michael.schiff:
Dumping /kafka/attainment_event-0/.index
offset: 353690666 position: 262054
offset: 633140428 position: 523785
offset: 756537951 position: 785815
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185889#comment-15185889
 ] 

Michael Schiff commented on KAFKA-3323:
---

[~gwenshap] [~ijuma] https://github.com/tubemogul/kafka-logsplitter
Thank you guys!

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185540#comment-15185540
 ] 

Michael Schiff edited comment on KAFKA-3323 at 3/8/16 7:33 PM:
---

[~ijuma] I reopened to submit the patch, but this didn't seem like the correct 
action, as this utility is currently just a standalone program, not a part of 
the Kafka distribution. Should I convert this utility into something that can 
be run like the other utilities (e.g. dump log segments)?  Once everyone has 
upgraded, this tool will not be useful anymore, so I don't know that it belongs 
in the distribution.


was (Author: michael.schiff):
[~ijuma] I went to submit the patch, but this didn't seem like the correct 
action, as this utility is currently just a standalone program, not a part of 
the Kafka distribution. Should I convert this utility into something that can 
be run like the other utilities (e.g. dump log segments)?  Once everyone has 
upgraded, this tool will not be useful anymore, so I don't know that it belongs 
in the distribution.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185540#comment-15185540
 ] 

Michael Schiff commented on KAFKA-3323:
---

[~ijuma] I went to submit the patch, but this didn't seem like the correct 
action, as this utility is currently just a standalone program, not a part of 
the Kafka distribution. Should I convert this utility into something that can 
be run like the other utilities (e.g. dump log segments)?  Once everyone has 
upgraded, this tool will not be useful anymore, so I don't know that it belongs 
in the distribution.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Status: Open  (was: Patch Available)

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.1, 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Comment: was deleted

(was: Re-opening so that I can attach the log-splitter patch)

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Comment: was deleted

(was: Log Splitter utility.  For those with production log files that are 
affected by this issue, and cannot be dropped.  This utility will split 
existing log segments into legal chunks)

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Affects Version/s: 0.8.2.1
   Status: Patch Available  (was: Reopened)

Log Splitter utility.  For those with production log files that are affected by 
this issue, and cannot be dropped.  This utility will split existing log 
segments into legal chunks

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.1, 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff reopened KAFKA-3323:
---

Re-opening so that I can attach the log-splitter patch

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185488#comment-15185488
 ] 

Michael Schiff edited comment on KAFKA-3323 at 3/8/16 6:52 PM:
---

[~ijuma] Yes, it is a duplicate.  The accepted solution was the first one I 
mentioned in my comment: group segments to clean by segment size in bytes *and* 
by limiting the difference between the segment base offset and the max offset 
that goes into it.

This however does not address the already broken log segments that I (and 
others) may have in production.  For this I have written a utility to split log 
segments into legal chunks.  If this seems useful I can submit it as a patch.


was (Author: michael.schiff):
[~ijuma] Yes, it is a duplicate.  The accepted solution was the first one I 
mentioned in my comment: group segments to clean, by segment size in bytes and 
by limiting the difference between the segment base offset and the max offset 
that goes into it.

This however does not address the already broken log segments that I (and 
others) may have in production.  For this I have written a utility to split log 
segments into legal chunks.  If this seems useful I can submit it as a patch.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185488#comment-15185488
 ] 

Michael Schiff commented on KAFKA-3323:
---

[~ijuma] Yes, it is a duplicate.  The accepted solution was the first one I 
mentioned in my comment: group segments to clean, by segment size in bytes and 
by limiting the difference between the segment base offset and the max offset 
that goes into it.

This however does not address the already broken log segments that I (and 
others) may have in production.  For this I have written a utility to split log 
segments into legal chunks.  If this seems useful I can submit it as a patch.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-03 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Description: 
Once the Offset Index has negative offset values, the binary search for 
position lookup is broken. This causes consumers of compact topics to skip 
large offset intervals when bootstrapping.  This has serious implications for 
consumers of compact topics.

{code}
 /**
   * Append an entry for the given offset/location pair to the index. This 
entry must have a larger offset than all subsequent entries.
   */
  def append(offset: Long, position: Int) {
inLock(lock) {
  require(!isFull, "Attempt to append to a full index (size = " + size + 
").")
  if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
this.mmap.putInt((offset - baseOffset).toInt)
this.mmap.putInt(position)
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file 
position in index is " + mmap.position + ".")
  } else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
  .format(offset, entries, lastOffset, file.getAbsolutePath))
  }
}
  }
{code}

OffsetIndex.append assumes that (offset - baseOffset) can be represented as an 
integer without overflow. If the LogSegment is from a compacted topic, this 
assumption may not be valid. The result is a quiet integer overflow, which 
stores a negative value into the index.

I believe that the issue is caused by the LogCleaner. Specifically, by the 
groupings produced by 
{code}
/**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from shrinking too much.
   *
   * @param segments The log segments to group
   * @param maxSize the maximum size in bytes for the total of all log data in 
a group
   * @param maxIndexSize the maximum size in bytes for the total of all index 
data in a group
   *
   * @return A list of grouped segments
   */
  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: 
Int, maxIndexSize: Int): List[Seq[LogSegment]]
{code}

Since this method is only concerned with grouping by size, without taking 
baseOffset and groupMaxOffset into account, it will produce groups that when 
cleaned into a single segment, have offsets that overflow. This is more likely 
for topics with low key cardinality, but high update volume, as you could wind 
up with very few cleaned records, but with very high offsets.

  was:
Once the Offset Index has negative offset values, the binary search for 
position lookup is broken. This causes consumers of compact topics to skip 
large offset intervals when bootstrapping.  This has serious implications for 
consumers of compact topics.

{code}
 /**
   * Append an entry for the given offset/location pair to the index. This 
entry must have a larger offset than all subsequent entries.
   */
  def append(offset: Long, position: Int) {
inLock(lock) {
  require(!isFull, "Attempt to append to a full index (size = " + size + 
").")
  if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
this.mmap.putInt((offset - baseOffset).toInt)
this.mmap.putInt(position)
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file 
position in index is " + mmap.position + ".")
  } else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
  .format(offset, entries, lastOffset, file.getAbsolutePath))
  }
}
  }
{code}

OffsetIndex.append assumes that (offset - baseOffset) can be represented as an 
integer without overflow. If the LogSegment is from a compacted topic, this 
assumption may not be valid. The result is a quiet integer overflow, which 
stores a negative value into the index. This breaks the binary search used to 
lookup offset positions -> large intervals of offsets are skipped by consumers 
who are bootstrapping themselves on the topic.

I believe that the issue is caused by the LogCleaner. Specifically, by the 
groupings produced by 
{code}
/**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from 

[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-03 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Description: 
Once the Offset Index has negative offset values, the binary search for 
position lookup is broken. This causes consumers of compact topics to skip 
large offset intervals when bootstrapping.  This has serious implications for 
consumers of compact topics.

{code}
 /**
   * Append an entry for the given offset/location pair to the index. This 
entry must have a larger offset than all subsequent entries.
   */
  def append(offset: Long, position: Int) {
inLock(lock) {
  require(!isFull, "Attempt to append to a full index (size = " + size + 
").")
  if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
this.mmap.putInt((offset - baseOffset).toInt)
this.mmap.putInt(position)
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file 
position in index is " + mmap.position + ".")
  } else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
  .format(offset, entries, lastOffset, file.getAbsolutePath))
  }
}
  }
{code}

OffsetIndex.append assumes that (offset - baseOffset) can be represented as an 
integer without overflow. If the LogSegment is from a compacted topic, this 
assumption may not be valid. The result is a quiet integer overflow, which 
stores a negative value into the index. This breaks the binary search used to 
lookup offset positions -> large intervals of offsets are skipped by consumers 
who are bootstrapping themselves on the topic.

I believe that the issue is caused by the LogCleaner. Specifically, by the 
groupings produced by 
{code}
/**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from shrinking too much.
   *
   * @param segments The log segments to group
   * @param maxSize the maximum size in bytes for the total of all log data in 
a group
   * @param maxIndexSize the maximum size in bytes for the total of all index 
data in a group
   *
   * @return A list of grouped segments
   */
  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: 
Int, maxIndexSize: Int): List[Seq[LogSegment]]
{code}

Since this method is only concerned with grouping by size, without taking 
baseOffset and groupMaxOffset into account, it will produce groups that when 
cleaned into a single segment, have offsets that overflow. This is more likely 
for topics with low key cardinality, but high update volume, as you could wind 
up with very few cleaned records, but with very high offsets.

  was:
{code}
 /**
   * Append an entry for the given offset/location pair to the index. This 
entry must have a larger offset than all subsequent entries.
   */
  def append(offset: Long, position: Int) {
inLock(lock) {
  require(!isFull, "Attempt to append to a full index (size = " + size + 
").")
  if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
this.mmap.putInt((offset - baseOffset).toInt)
this.mmap.putInt(position)
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file 
position in index is " + mmap.position + ".")
  } else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
  .format(offset, entries, lastOffset, file.getAbsolutePath))
  }
}
  }
{code}

OffsetIndex.append assumes that (offset - baseOffset) can be represented as an 
integer without overflow. If the LogSegment is from a compacted topic, this 
assumption may not be valid. The result is a quiet integer overflow, which 
stores a negative value into the index. This breaks the binary search used to 
lookup offset positions -> large intervals of offsets are skipped by consumers 
who are bootstrapping themselves on the topic.

I believe that the issue is caused by the LogCleaner. Specifically, by the 
groupings produced by 
{code}
/**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from shrinking too much.
   *
   * @param segments The log segments to group
   * @param maxSize the 

[jira] [Comment Edited] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-03 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178187#comment-15178187
 ] 

Michael Schiff edited comment on KAFKA-3323 at 3/3/16 5:56 PM:
---

Regarding solutions:

One alternative is to group segments to clean, not according to segment size in 
bytes, but by limiting the difference between the segment base offset and the 
max offset that goes into it.

I see two issues with this. The first is that we could wind up creating 
segments that are actually pretty small, size wise.  The second issue is that, 
for people like me, who already have production topics with log segments with 
(offsets - baseOffset) > Int.MAX_VALUE, changing the grouping does not fix our 
issue. If you already have such log segments, they cannot be properly indexed 
as is.

The second solution is to make indexes use long valued keys.  This would mean 
changing the index format, forcing you to rebuild all of your existing indices. 
However, you can then continue grouping segments to clean the same way, 
ensuring good sized segments.  It also means that people with segments that are 
already affected are not forced to drop their data.


was (Author: michael.schiff):
Regarding solutions:

One alternative is to group segments to clean, not according to segment size in 
bytes, but by limiting the difference between the segment base offset and the 
max offset that goes into it.

I see two issues with this. The first is that we could wind up creating 
segments that are actually pretty small, size wise.  The second issue is that, 
for people like me, who already have production topics with log segments with 
(offsets - baseOffset) > Int.MAX_VALUE, changing the grouping does not fix our 
issue. If you already have such log segments, they cannot be properly indexed 
as is.

The second solution is to make indexes use long valued keys.  This would mean 
changing the index format, forcing you to rebuild all of your existing indices. 
However, you can then continue grouping segments to clean the same way, 
ensuring good sized segments.  It also means that people with segments that are 
already incorrect are not forced to drop their data.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index. This breaks the binary search 
> used to lookup offset positions -> large intervals of offsets are skipped by 
> consumers who are bootstrapping themselves on the topic.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list 

[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-03 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178187#comment-15178187
 ] 

Michael Schiff commented on KAFKA-3323:
---

Regarding solutions:

One alternative is to group segments to clean, not according to segment size in 
bytes, but by limiting the difference between the segment base offset and the 
max offset that goes into it.

I see two issues with this. The first is that we could wind up creating 
segments that are actually pretty small, size wise.  The second issue is that, 
for people like me, who already have production topics with log segments with 
(offsets - baseOffset) > Int.MAX_VALUE, changing the grouping does not fix our 
issue. If you already have such log segments, they cannot be properly indexed 
as is.

The second solution is to make indexes use long valued keys.  This would mean 
changing the index format, forcing you to rebuild all of your existing indices. 
However, you can then continue grouping segments to clean the same way, 
ensuring good sized segments.  It also means that people with segments that are 
already incorrect are not forced to drop their data.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index. This breaks the binary search 
> used to lookup offset positions -> large intervals of offsets are skipped by 
> consumers who are bootstrapping themselves on the topic.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-02 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
--
Attachment: log_dump.txt
index_dump.txt

Index and Log dump from the 0 segment of partition 0 of a compact topic.

The index file is very small, and is quite obviously incorrect.

{code}
offset: -1733149320 position: 1307775
{code}
Is the first incorrect entry. The corresponding entry in the Log dump shows an 
offset of {code}6856785272L{code}.  This value overflows to the indexed offset.
{code}
scala> 6856785272L.toInt
res1: Int = -1733149320
{code}

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index. This breaks the binary search 
> used to lookup offset positions -> large intervals of offsets are skipped by 
> consumers who are bootstrapping themselves on the topic.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-02 Thread Michael Schiff (JIRA)
Michael Schiff created KAFKA-3323:
-

 Summary: Negative offsets in Log Segment Index files due to 
Integer overflow when compaction is enabled 
 Key: KAFKA-3323
 URL: https://issues.apache.org/jira/browse/KAFKA-3323
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.1.1
Reporter: Michael Schiff
Assignee: Jay Kreps


{code}
 /**
   * Append an entry for the given offset/location pair to the index. This 
entry must have a larger offset than all subsequent entries.
   */
  def append(offset: Long, position: Int) {
inLock(lock) {
  require(!isFull, "Attempt to append to a full index (size = " + size + 
").")
  if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
this.mmap.putInt((offset - baseOffset).toInt)
this.mmap.putInt(position)
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file 
position in index is " + mmap.position + ".")
  } else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
  .format(offset, entries, lastOffset, file.getAbsolutePath))
  }
}
  }
{code}

OffsetIndex.append assumes that (offset - baseOffset) can be represented as an 
integer without overflow. If the LogSegment is from a compacted topic, this 
assumption may not be valid. The result is a quiet integer overflow, which 
stores a negative value into the index. This breaks the binary search used to 
lookup offset positions -> large intervals of offsets are skipped by consumers 
who are bootstrapping themselves on the topic.

I believe that the issue is caused by the LogCleaner. Specifically, by the 
groupings produced by 
{code}
/**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from shrinking too much.
   *
   * @param segments The log segments to group
   * @param maxSize the maximum size in bytes for the total of all log data in 
a group
   * @param maxIndexSize the maximum size in bytes for the total of all index 
data in a group
   *
   * @return A list of grouped segments
   */
  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: 
Int, maxIndexSize: Int): List[Seq[LogSegment]]
{code}

Since this method is only concerned with grouping by size, without taking 
baseOffset and groupMaxOffset into account, it will produce groups that when 
cleaned into a single segment, have offsets that overflow. This is more likely 
for topics with low key cardinality, but high update volume, as you could wind 
up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)