[ 
https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-v4-changes-since-v3.patch
                KAFKA-506-phase-2-v4.patch

Here is a new patch that addresses these comments. I also did an incremental 
diff against the previous patch so you can see the specific changes for the 
below items (that is KAFKA-506-v4-changes-since-v3.patch)

Also rebased again.

40. I actually disagree. It is more code to add and subtract, but I think it 
makes more sense. This way we would say the append api returns "the first and 
last offset for the messages you appended" rather than "the first offset for 
the messages you appended and the offset of the next message that would be 
appended". This is not a huge deal so I can go either way, but I did think 
about it both ways and that was my rationale.

41. My thinking was that there were only two cases: re-creating a new, mutable 
index (at position 0) and opening a read-only index. In reality there are three 
cases: in addition to the previous two you can be re-opening an existing log 
that went through clean shutdown. I was not handling this properly and in fact 
was truncating the index on re-open, so the existing entries in the last 
segment would be unindexed. There are now two cases for mutable indexes. Recall 
that on clean-shutdown the index is always truncated to the max valid entry. So 
now when we open an index, if the file exists, I set the position to the end of 
the file. If the file doesn't exist I allocate it and start at position 0. The 
recovery process well still re-create the index if it runs, if the shutdown was 
clean then we will just roll to a new segment on the first append (since the 
index was truncated, it is now full).

43. I removed that feature since the iterator only has the offset not the file 
position. However after thinking about it I can add it back by just using 
MessageSet.entrySize(message) on each entry and use the sum of these to compare 
to the messageSet.sizeInBytes. Added that.

44. Changed the check to be the messageSet.sizeInBytes. This check was really 
meant to guard the case where we are at the end of the log and get an empty 
set. I think it was using validBytes because it needed to calculate the next 
offset. Now that calculation is gone, so I think it is okay to just use 
messageSet.sizeInBytes. This would result in a set with 0 valid bytes being 
enqueued, and then the error getting thrown to the consumer. The fetcher would 
likely continue to fetch this message set, but that should be bounded by the 
consumer queue size.

45. The behavior after this patch should be exactly the same as the current 
behavior, so my hope was to do this as a follow up patch.

Also: Found that I wasn't closing the index when the log was closed, and found 
a bug in the index re-creation logic in recovery; fixed both, and expanded 
tests for this.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, 
> KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, 
> KAFKA-506-phase-2-v4.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, 
> KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more 
> nuanced retention policy would allow dropping individual messages from a 
> segment file by recopying it. This is not currently possible because the 
> lookup structure we use to locate messages is based on the file offset 
> directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) 
> which would allow deleting individual messages (e.g. 2) without deleting the 
> entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already 
> doing data format changes.
> As part of this we would explicitly store the key field given by the producer 
> for partitioning (right now there is no way for the consumer to find the 
> value used for partitioning).
> This combination of features would allow a key-based retention policy that 
> would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state 
> maintained by a process doing some kind of near-real-time processing. The 
> process could log out its local state changes and be able to restore from 
> this log in the event of a failure. However I think this is a broadly useful 
> feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than 
> the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment 
> that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format 
> changes, but retains the physical offset. The second adds the lookup 
> structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One 
> surprising thing is that the offset is actually the offset of the next 
> message. I think there are actually several uses for the current offset. I 
> would propose making this hold the current message offset since with logical 
> offsets the next offset is always just current_offset+1. Note that since we 
> no longer require messages to be dense, it is not true that if the next 
> offset is N the current offset is N-1 (because N-1 may have been deleted). 
> Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an 
> exception if there are zero messages in the set. This is used to detect 
> fetches that are smaller than a single message size. I think this behavior is 
> misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I 
> moved the CRC to the first field and made it cover the entire message 
> contents (previously it only covered the payload), (2) I dropped support for 
> Magic=0, effectively making the attributes field required, which simplifies 
> the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to