[ 
https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456744#comment-13456744
 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Thanks for patch v1. Overall, the log format change is reasonable. Some 
comments:

1. MessageAndOffset: nextOffset is not correct for compressed messages. 
Currently, in the high-level consumer, after iterating each message, the 
consume offset is moved to the offset of the next message. So, if one consumes 
a message and then commits the offset, the committed offset points to the next 
message to be consumed. We could probably change the protocol to move the 
consumer offset to the offset of the current message. Then, the caller will 
need to commit the offset first and then consumes the message to get the same 
semantics.

2. Message:
2.1 The comment of the message has a bug. Payload should have (N- K - 10) bytes.
2.2 In constructor, should we assert that offset is btw 0 and bytes.length-1? 
Also, just to be clear that offset and size are for the payload, should we 
rename bytes, offset and size to something like payload, payloadOffset and 
payloadSize?
2.3 computeChecksum(): can use MagicOffset for both starting offset and length
2.4 remove unused import

3. MessageSet: Fix the comment in second line "A The format".

4. ByteBufferMessageSet: remove unused comment

5. Log:
5.1 append(): For verifying message size, we need to use the shallow iterator 
since a compressed message has to be smaller than the configured max message 
size.
5.2 append(): Compressed messages are forced to be decompressed and then 
compressed again. This will introduce some CPU overhead. What's the increase in 
CPU utilization if incoming messages are compressed? Also, for 
replicaFetchThread, it can just put the data fetched from the leader directly 
into the log without recomputing the offsets. Could we add a flag in append to 
bypass regenerating the offsets?
5.3 trimInvalidBytes(): There is a bug in the following statement: 
messages.size should be messages.sizeInBytes.
    if(messageSetValidBytes == messages.size) {

6. javaapi.ByteBufferMessageSet: Java users shouldn't really be using buffer. 
So, we don't need the bean property.

7. PartitionData: Do we need to override equal and hash since this is already a 
case class?

8. ZkUtils.conditionalUpdatePersistenPath(): This method expects exception due 
to version conflict. So there is no need to log the exception.

9. SyncProducerTest: remove unused imports

10. How do we handle the case that a consumer uses too small a fetch size?
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more 
> nuanced retention policy would allow dropping individual messages from a 
> segment file by recopying it. This is not currently possible because the 
> lookup structure we use to locate messages is based on the file offset 
> directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) 
> which would allow deleting individual messages (e.g. 2) without deleting the 
> entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already 
> doing data format changes.
> As part of this we would explicitly store the key field given by the producer 
> for partitioning (right now there is no way for the consumer to find the 
> value used for partitioning).
> This combination of features would allow a key-based retention policy that 
> would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state 
> maintained by a process doing some kind of near-real-time processing. The 
> process could log out its local state changes and be able to restore from 
> this log in the event of a failure. However I think this is a broadly useful 
> feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than 
> the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment 
> that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format 
> changes, but retains the physical offset. The second adds the lookup 
> structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One 
> surprising thing is that the offset is actually the offset of the next 
> message. I think there are actually several uses for the current offset. I 
> would propose making this hold the current message offset since with logical 
> offsets the next offset is always just current_offset+1. Note that since we 
> no longer require messages to be dense, it is not true that if the next 
> offset is N the current offset is N-1 (because N-1 may have been deleted). 
> Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an 
> exception if there are zero messages in the set. This is used to detect 
> fetches that are smaller than a single message size. I think this behavior is 
> misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I 
> moved the CRC to the first field and made it cover the entire message 
> contents (previously it only covered the payload), (2) I dropped support for 
> Magic=0, effectively making the attributes field required, which simplifies 
> the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to