[ 
https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2.patch

This patch is incremental from the previous one. I will rebase and provide an 
up-to-date patch that covers both phases, but this shows the new work required 
to support logical offsets.

I think I have addressed most of the comments on the original patch, except:
1. I have put off any performance optimization (avoiding recompression for 
replicas, memory-mapping the log, etc). I would like to break this into a 
separate JIRA and write a reasonable standalone Log benchmark that covers these 
cases and then work against that. I have several other cleanups I would like to 
do as well: (1) get rid of SegmentList, (2) move more functionality in Log into 
LogSegment.
2. I am not yet storing the key in the message, this may change the produce api 
slightly so i think this should be a seperate JIRA too.
3. Neha--I change most of the uses of magical numbers except where the concrete 
number is more clear.

Here is a description of the new changes.
- Offset now always refers to a logical log offset. I have tried to change any 
instances where offset meant file offset to instead use the terminology 
"position". References to file positions should only occur in Log.scala and 
classes internal to that.
- As in the previous patch MessageAndOffset gives three things: (1) the 
message, (2) the offset of THAT message, and (3) a helper method to calculate 
the next offset.
- Log.append() is responsible for maintaining the logEndOffset and using it to 
assign offsets to the messageset before appending to the log.
- Offsets are now assigned to compressed messages too. One nuance is that the 
offset of the wrapper message is equal to the last offset of the messages it 
contains. This will be more clear in the discussion of the offset search 
changes.
- Log.read now accepts a new argument maxOffset, which is the largest (logical) 
offset that will be returned in addition to the maxSize which limits the size 
in bytes.
- I have changed Log.read to now support sparse offsets. That is, it is valid 
to have missing offsets. This sparseness is needed both for the key-retention 
but also for the correct handling of compressed messages. I will describe the 
read path in more detail below.
- I moved FileMessageSet to the package kafka.log as already much of its 
functionality was specific to the log implementation.
- I changed FetchPurgatory back to use a simple counter for accumulated bytes. 
It was previously re-calculating the available bytes, but because this now is a 
more expensive operation, and because this calculation is redone for each 
topic/partition produce (i.e. potentially 200 times per produce request), I 
think this is better. This is less accurate, but since long poll is a heuristic 
anyway I think that is okay.
- I changed the default suffix of .kafka files to .log and added a new .index 
file that contains a sparse index of offset=>file_position to help efficiently 
resolve logical offsets.
- Entries are added to this index at a configurable frequency, controlled by a 
new configuration log.index.interval.bytes which defaults to 4096
- I removed numerous instances of byte calculations. I think this is a good 
thing for code quality.

Here is a description of the new read path.
1. First log tries to find the correct segment to read from using the existing 
binary search on log segments. I modified this search slightly in two ways. 
First we had a corner case bug which only occurred if you have two files with 
successive offsets (unlikely now, impossible before). Second, I now no longer 
check ranges but instead return the largest segment file less than or equal to 
the requested offset.
2. Once the segment is found we check the index on that segment. The index 
returns the largest offset less than or equal to the requested offset and the 
associated file position in the log file. This position represents a least 
upper bound on the position in the file, and it is the position from which we 
begin a linear search checking each message. The index itself is just a sorted 
sequence of (offset, position) pairs. Complete details are in the header 
comments on kafka.log.OffsetIndex.scala. It is not required that all messages 
have an entry in the OffsetIndex, instead there is a confgurable frequency in 
terms of bytes which is set in LogSegment. So, for example, we might have an 
entry every 4096 bytes. This frequency is approximate, since a single message 
may be larger than that.
3. Once we have a greatest lower bound on the location we use 
FileMessageSet.searchFor to search for the position of the first message with 
an offset at least as large as the target offset. This search just skips 
through the file checking the offset only.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-v1-draft.patch, 
> KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more 
> nuanced retention policy would allow dropping individual messages from a 
> segment file by recopying it. This is not currently possible because the 
> lookup structure we use to locate messages is based on the file offset 
> directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) 
> which would allow deleting individual messages (e.g. 2) without deleting the 
> entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already 
> doing data format changes.
> As part of this we would explicitly store the key field given by the producer 
> for partitioning (right now there is no way for the consumer to find the 
> value used for partitioning).
> This combination of features would allow a key-based retention policy that 
> would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state 
> maintained by a process doing some kind of near-real-time processing. The 
> process could log out its local state changes and be able to restore from 
> this log in the event of a failure. However I think this is a broadly useful 
> feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than 
> the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment 
> that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format 
> changes, but retains the physical offset. The second adds the lookup 
> structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One 
> surprising thing is that the offset is actually the offset of the next 
> message. I think there are actually several uses for the current offset. I 
> would propose making this hold the current message offset since with logical 
> offsets the next offset is always just current_offset+1. Note that since we 
> no longer require messages to be dense, it is not true that if the next 
> offset is N the current offset is N-1 (because N-1 may have been deleted). 
> Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an 
> exception if there are zero messages in the set. This is used to detect 
> fetches that are smaller than a single message size. I think this behavior is 
> misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I 
> moved the CRC to the first field and made it cover the entire message 
> contents (previously it only covered the payload), (2) I dropped support for 
> Magic=0, effectively making the attributes field required, which simplifies 
> the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to