Hi Jun,

Regarding 4) in your comment, after thinking it for a while I cannot come
up a way to it along with log compaction without adding new fields into the
current format on message set. Do you have a better way that do not require
protocol changes?

Guozhang

On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> I have updated the wiki page incorporating received comments. We can
> discuss some more details on:
>
> 1. How we want to do audit? Whether we want to have in-built auditing on
> brokers or even MMs or use  an audit consumer to fetch all messages from
> just brokers.
>
> 2. How we can avoid de-/re-compression on brokers and MMs with log
> compaction turned on.
>
> 3. How we can resolve unclean leader election resulted data inconsistency
> with control messages.
>
> Guozhang
>
> On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
>> Thanks for the detailed comments Jun! Some replies inlined.
>>
>> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote:
>>
>>> Hi, Guozhang,
>>>
>>> Thanks for the writeup.
>>>
>>> A few high level comments.
>>>
>>> 1. Associating (versioned) schemas to a topic can be a good thing
>>> overall.
>>> Yes, this could add a bit more management overhead in Kafka. However, it
>>> makes sure that the data format contract between a producer and a
>>> consumer
>>> is kept and managed in a central place, instead of in the application.
>>> The
>>> latter is probably easier to start with, but is likely to be brittle in
>>> the
>>> long run.
>>>
>>
>> I am actually not proposing to not support associated versioned schemas
>> for topics, but to not let some core Kafka functionalities like auditing
>> being depend on schemas. I think this alone can separate the schema
>> management from Kafka piping management (i.e. making sure every single
>> message is delivered, and within some latency, etc). Adding additional
>> auditing info into an existing schema will force Kafka to be aware of the
>> schema systems (Avro, JSON, etc).
>>
>>
>>>
>>> 2. Auditing can be a general feature that's useful for many applications.
>>> Such a feature can be implemented by extending the low level message
>>> format
>>> with a header. However, it can also be added as part of the schema
>>> management. For example, you can imagine a type of audited schema that
>>> adds
>>> additional auditing info to an existing schema automatically. Performance
>>> wise, it probably doesn't make a big difference whether the auditing info
>>> is added in the message header or the schema header.
>>>
>>>
>> See replies above.
>>
>>
>>> 3. We talked about avoiding the overhead of decompressing in both the
>>> broker and the mirror maker. We probably need to think through how this
>>> works with auditing. In the more general case where you want to audit
>>> every
>>> message, one has to do the decompression to get the individual message,
>>> independent of how the auditing info is stored. This means that if we
>>> want
>>> to audit the broker directly or the consumer in mirror maker, we have to
>>> pay the decompression cost anyway. Similarly, if we want to extend mirror
>>> maker to support some customized filtering/transformation logic, we also
>>> have to pay the decompression cost.
>>>
>>>
>> I see your point. For that I would prefer to have a MM implementation
>> that is able to do de-compress / re-compress ONLY if required, for example
>> by auditing, etc. I agree that we have not thought through whether we
>> should enable auditing on MM, and if yes how to do that, and we could
>> discuss about that in a different thread. Overall, this proposal is not
>> just for tackling de-compression on MM but about the feasibility of
>> extending Kafka message header for system properties / app properties.
>>
>>
>>> Some low level comments.
>>>
>>> 4. Broker offset reassignment (kafka-527):  This probably can be done
>>> with
>>> just a format change on the compressed message set.
>>>
>>> That is true. As I mentioned in the wiki each of the problems may be
>> resolvable separately but I am thinking about a general way to get all of
>> them.
>>
>>
>>> 5. MirrorMaker refactoring: We probably can think through how general we
>>> want mirror maker to be. If we want to it to be more general, we likely
>>> need to decompress every message just like in a normal consumer. There
>>> will
>>> definitely be overhead. However, as long as mirror maker is made
>>> scalable,
>>> we can overcome the overhead by just running more instances on more
>>> hardware resources. As for the proposed message format change, we
>>> probably
>>> need to think through it a bit more. The honor-ship flag seems a bit
>>> hacky
>>> to me.
>>>
>>>
>> Replied as part of 3). Sure we can discuss more about that, will update
>> the wiki for collected comments.
>>
>>
>>> 6. Adding a timestamp in each message can be a useful thing. It (1)
>>> allows
>>> log segments to be rolled more accurately; (2) allows finding an offset
>>> for
>>> a particular timestamp more accurately. I am thinking that the timestamp
>>> in
>>> the message should probably be the time when the leader receives the
>>> message. Followers preserve the timestamp set by leader. To avoid time
>>> going back during leader change, the leader can probably set the
>>> timestamp
>>> to be the  max of current time and the timestamp of the last message, if
>>> present. That timestamp can potentially be added to the index file to
>>> answer offsetBeforeTimestamp queries more efficiently.
>>>
>>>
>> Agreed.
>>
>>
>>> 7. Log compaction: It seems that you are suggesting an improvement to
>>> compact the active segment as well. This can be tricky and we need to
>>> figure out the details on how to do this. This improvement seems to be
>>> orthogonal to the message format change though.
>>>
>>>
>> I think the improvements is more effective with the timestamps as in 6),
>> we can discuss more about this.
>>
>>
>>> 8. Data inconsistency from unclean election: I am not sure if we need  to
>>> add a controlled message to the log during leadership change. The <leader
>>> generation, starting offset> map can be maintained in a separate
>>> checkpoint
>>> file. The follower just need to get that map from the leader during
>>> startup.
>>>
>>> What I was proposing is an alternative solution given that we have this
>> message header enhancement; with this we do not need to add another logic
>> for leadership map and checkpoint file, but just the logic on
>> replica-manager to handle this extra controlled message and remembering the
>> current leader epoch instead of a map.
>>
>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>> > Hello all,
>>> >
>>> > I put some thoughts on enhancing our current message metadata format to
>>> > solve a bunch of existing issues:
>>> >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>>> >
>>> > This wiki page is for kicking off some discussions about the
>>> feasibility of
>>> > adding more info into the message header, and if possible how we would
>>> add
>>> > them.
>>> >
>>> > -- Guozhang
>>> >
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to