[ 
https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-neha-post-review.patch

Hi Neha, here are some comments on your comments and a patch that addresses the 
comments we are in agreement on.

1. Log
1.2, 1.3 True. This problem exists in both OffsetIndex and Log, though I don't 
think either are actually possible. In Log this requires one to have 2 billion 
segment files, though, which is not physically possible; in OffsetIndex one 
would need to have ~2 billion entries in an index, which isn't possible as the 
message overhead would fill up the log segment first. I am going to leave it 
alone in Log since that code I want to delete asap anyway. I fixed it in the 
OffsetIndex since that code is meant to last.
1.4. This logic is a little odd, I will fix it, but actually this reminds me of 
a bigger problem. If file.delete() fails on the log file, the presence of that 
log file will effectively corrupt the log on restart (since we will have a file 
with the given offset but will also start another log with a parallel offset 
that we actually append to--on restart the bad file will mask part of the new 
file). Obviously if file.delete() fails things are pretty fucked and there is 
nothing we can do in software to recover. So what I would like to do is throw 
KafkaStorageException and have Partition.makeFollower() shut down the server. 
What would happen in the leadership transfer if I did that?
1.5 Filed a JIRA for this.

LogManager
2.1 Deleted numPartitions (not related to this patch, I don't think)

FileMessageSet
3.1 Good catch, fixed.
3.2 Right, so I return the offset specifically to be able to differentiate the 
case where I found the exact location versus the next message. This is 
important for things like truncate. I always return the offset and 
corresponding file position of the first offset that meets the >= criteria. So 
either I am confused, or I think it works the way you are saying it should.
3.3 Well, but the code actually reads and Int and Long out of the resulting 
buffer, so if MessageSet.LogOverhead != 12 there is a bug, so we aren't 
abstracting anything just adding a layer of obfuscation. But, yes, it should be 
consistent, so changed it.

LogSegment
4. LogSegment
4.1 I don't want to allocate an object for each call as this method is internal 
to LogSegment. I will make it private to emphasize that.
4.2 I agree, though we have had the 2gb limit for a while now so this isn't 
new. We repurposed KAFKA-556 for this.

5. ConsumerIterator
Agreed. Broke this into a separate issue since current state is no worse than 
0.7.x. JIRA is KAFKA-546.

6. ReplicaFetcherThread
Agreed this was discussed above. JIRA is KAFKA-557.

7. Only IDEA detects this, which I don't have. So can't help on this.

8. ByteBufferMessageSet
8.2 Fixed

9. OffsetIndex 
9.1 Fixed
9.2 This is true but I think it would be more convoluted. Simple test and exits 
make it so you don't have to add another layer of nesting.

10 Agreed, of the various things on my plate I think this is the most 
important. Any issues here are resolvable, but we need to first get the data.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-neha-post-review.patch, 
> KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, 
> KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, 
> KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, 
> KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, 
> KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more 
> nuanced retention policy would allow dropping individual messages from a 
> segment file by recopying it. This is not currently possible because the 
> lookup structure we use to locate messages is based on the file offset 
> directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) 
> which would allow deleting individual messages (e.g. 2) without deleting the 
> entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already 
> doing data format changes.
> As part of this we would explicitly store the key field given by the producer 
> for partitioning (right now there is no way for the consumer to find the 
> value used for partitioning).
> This combination of features would allow a key-based retention policy that 
> would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state 
> maintained by a process doing some kind of near-real-time processing. The 
> process could log out its local state changes and be able to restore from 
> this log in the event of a failure. However I think this is a broadly useful 
> feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than 
> the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment 
> that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format 
> changes, but retains the physical offset. The second adds the lookup 
> structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One 
> surprising thing is that the offset is actually the offset of the next 
> message. I think there are actually several uses for the current offset. I 
> would propose making this hold the current message offset since with logical 
> offsets the next offset is always just current_offset+1. Note that since we 
> no longer require messages to be dense, it is not true that if the next 
> offset is N the current offset is N-1 (because N-1 may have been deleted). 
> Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an 
> exception if there are zero messages in the set. This is used to detect 
> fetches that are smaller than a single message size. I think this behavior is 
> misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I 
> moved the CRC to the first field and made it cover the entire message 
> contents (previously it only covered the payload), (2) I dropped support for 
> Magic=0, effectively making the attributes field required, which simplifies 
> the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to