Hi Jun,

Sorry for the delay on your comments in the wiki page as well as this
thread; quite swamped now. I will get back to you as soon as I find some
time.

Guozhang

On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <jun...@gmail.com> wrote:

> Thinking about this a bit more. For adding the auditing support, I am not
> sure if we need to change the message format by adding the application
> tags. An alternative way to do that is to add it in the producer client.
> For example, for each message payload (doesn't matter what the
> serialization mechanism is) that a producer receives, the producer can just
> add a header before the original payload. The header will contain all
> needed fields (e.g. timestamp, host, etc) for the purpose of auditing. This
> way, we don't need to change the message format and the auditing info can
> be added independent of the serialization mechanism of the message. The
> header can use a different serialization mechanism for better efficiency.
> For example, if we use Avro to serialize the header, the encoded bytes
> won't include the field names in the header. This is potentially more
> efficient than representing those fields as application tags in the message
> where the tags have to be explicitly store in every message.
>
> To make it easier for the client to add and make use of this kind of
> auditing support, I was imagining that we can add a ProducerFactory in the
> new java client. The ProducerFactory will create an instance of Producer
> based on a config property. By default, the current KafkaProducer will be
> returned. However, a user can plug in a different implementation of
> Producer that does auditing. For example, an implementation of an
> AuditProducer.send() can take the original ProducerRecord, add the header
> to the value byte array and then forward the record to an underlying
> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
> client. If a user plugs in an implementation of the AuditingConsumer, the
> consumer will then be audited automatically.
>
> Thanks,
>
> Jun
>
> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Regarding 4) in your comment, after thinking it for a while I cannot come
> > up a way to it along with log compaction without adding new fields into
> the
> > current format on message set. Do you have a better way that do not
> require
> > protocol changes?
> >
> > Guozhang
> >
> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > I have updated the wiki page incorporating received comments. We can
> > > discuss some more details on:
> > >
> > > 1. How we want to do audit? Whether we want to have in-built auditing
> on
> > > brokers or even MMs or use  an audit consumer to fetch all messages
> from
> > > just brokers.
> > >
> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
> > > compaction turned on.
> > >
> > > 3. How we can resolve unclean leader election resulted data
> inconsistency
> > > with control messages.
> > >
> > > Guozhang
> > >
> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > >> Thanks for the detailed comments Jun! Some replies inlined.
> > >>
> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote:
> > >>
> > >>> Hi, Guozhang,
> > >>>
> > >>> Thanks for the writeup.
> > >>>
> > >>> A few high level comments.
> > >>>
> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
> > >>> overall.
> > >>> Yes, this could add a bit more management overhead in Kafka. However,
> > it
> > >>> makes sure that the data format contract between a producer and a
> > >>> consumer
> > >>> is kept and managed in a central place, instead of in the
> application.
> > >>> The
> > >>> latter is probably easier to start with, but is likely to be brittle
> in
> > >>> the
> > >>> long run.
> > >>>
> > >>
> > >> I am actually not proposing to not support associated versioned
> schemas
> > >> for topics, but to not let some core Kafka functionalities like
> auditing
> > >> being depend on schemas. I think this alone can separate the schema
> > >> management from Kafka piping management (i.e. making sure every single
> > >> message is delivered, and within some latency, etc). Adding additional
> > >> auditing info into an existing schema will force Kafka to be aware of
> > the
> > >> schema systems (Avro, JSON, etc).
> > >>
> > >>
> > >>>
> > >>> 2. Auditing can be a general feature that's useful for many
> > applications.
> > >>> Such a feature can be implemented by extending the low level message
> > >>> format
> > >>> with a header. However, it can also be added as part of the schema
> > >>> management. For example, you can imagine a type of audited schema
> that
> > >>> adds
> > >>> additional auditing info to an existing schema automatically.
> > Performance
> > >>> wise, it probably doesn't make a big difference whether the auditing
> > info
> > >>> is added in the message header or the schema header.
> > >>>
> > >>>
> > >> See replies above.
> > >>
> > >>
> > >>> 3. We talked about avoiding the overhead of decompressing in both the
> > >>> broker and the mirror maker. We probably need to think through how
> this
> > >>> works with auditing. In the more general case where you want to audit
> > >>> every
> > >>> message, one has to do the decompression to get the individual
> message,
> > >>> independent of how the auditing info is stored. This means that if we
> > >>> want
> > >>> to audit the broker directly or the consumer in mirror maker, we have
> > to
> > >>> pay the decompression cost anyway. Similarly, if we want to extend
> > mirror
> > >>> maker to support some customized filtering/transformation logic, we
> > also
> > >>> have to pay the decompression cost.
> > >>>
> > >>>
> > >> I see your point. For that I would prefer to have a MM implementation
> > >> that is able to do de-compress / re-compress ONLY if required, for
> > example
> > >> by auditing, etc. I agree that we have not thought through whether we
> > >> should enable auditing on MM, and if yes how to do that, and we could
> > >> discuss about that in a different thread. Overall, this proposal is
> not
> > >> just for tackling de-compression on MM but about the feasibility of
> > >> extending Kafka message header for system properties / app properties.
> > >>
> > >>
> > >>> Some low level comments.
> > >>>
> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be done
> > >>> with
> > >>> just a format change on the compressed message set.
> > >>>
> > >>> That is true. As I mentioned in the wiki each of the problems may be
> > >> resolvable separately but I am thinking about a general way to get all
> > of
> > >> them.
> > >>
> > >>
> > >>> 5. MirrorMaker refactoring: We probably can think through how general
> > we
> > >>> want mirror maker to be. If we want to it to be more general, we
> likely
> > >>> need to decompress every message just like in a normal consumer.
> There
> > >>> will
> > >>> definitely be overhead. However, as long as mirror maker is made
> > >>> scalable,
> > >>> we can overcome the overhead by just running more instances on more
> > >>> hardware resources. As for the proposed message format change, we
> > >>> probably
> > >>> need to think through it a bit more. The honor-ship flag seems a bit
> > >>> hacky
> > >>> to me.
> > >>>
> > >>>
> > >> Replied as part of 3). Sure we can discuss more about that, will
> update
> > >> the wiki for collected comments.
> > >>
> > >>
> > >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
> > >>> allows
> > >>> log segments to be rolled more accurately; (2) allows finding an
> offset
> > >>> for
> > >>> a particular timestamp more accurately. I am thinking that the
> > timestamp
> > >>> in
> > >>> the message should probably be the time when the leader receives the
> > >>> message. Followers preserve the timestamp set by leader. To avoid
> time
> > >>> going back during leader change, the leader can probably set the
> > >>> timestamp
> > >>> to be the  max of current time and the timestamp of the last message,
> > if
> > >>> present. That timestamp can potentially be added to the index file to
> > >>> answer offsetBeforeTimestamp queries more efficiently.
> > >>>
> > >>>
> > >> Agreed.
> > >>
> > >>
> > >>> 7. Log compaction: It seems that you are suggesting an improvement to
> > >>> compact the active segment as well. This can be tricky and we need to
> > >>> figure out the details on how to do this. This improvement seems to
> be
> > >>> orthogonal to the message format change though.
> > >>>
> > >>>
> > >> I think the improvements is more effective with the timestamps as in
> 6),
> > >> we can discuss more about this.
> > >>
> > >>
> > >>> 8. Data inconsistency from unclean election: I am not sure if we need
> > to
> > >>> add a controlled message to the log during leadership change. The
> > <leader
> > >>> generation, starting offset> map can be maintained in a separate
> > >>> checkpoint
> > >>> file. The follower just need to get that map from the leader during
> > >>> startup.
> > >>>
> > >>> What I was proposing is an alternative solution given that we have
> this
> > >> message header enhancement; with this we do not need to add another
> > logic
> > >> for leadership map and checkpoint file, but just the logic on
> > >> replica-manager to handle this extra controlled message and
> remembering
> > the
> > >> current leader epoch instead of a map.
> > >>
> > >>
> > >>> Thanks,
> > >>>
> > >>> Jun
> > >>>
> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hello all,
> > >>> >
> > >>> > I put some thoughts on enhancing our current message metadata
> format
> > to
> > >>> > solve a bunch of existing issues:
> > >>> >
> > >>> >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> > >>> >
> > >>> > This wiki page is for kicking off some discussions about the
> > >>> feasibility of
> > >>> > adding more info into the message header, and if possible how we
> > would
> > >>> add
> > >>> > them.
> > >>> >
> > >>> > -- Guozhang
> > >>> >
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to