I missed yesterday's KIP hangout. I'm currently working on another KIP for
enriched metadata of messages. Guozhang has already created a wiki page
before (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
We plan to fill the relative offset to the offset field in the batch sent
by producer to avoid broker side re-compression. The message offset would
become batch base offset + relative offset. I guess maybe the expected
offset in KIP-27 can be only set for base offset? Would that affect certain
use cases?

For Jun's comments, I am not sure I completely get it. I think the producer
only sends one batch per partition in a request. So either that batch is
appended or not. Why a batch would be partially committed?

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in> wrote:

> That's a fair point. I've added some imagined job logic to the KIP, so
> we can make sure the proposal stays in sync with the usages we're
> discussing. (The logic is just a quick sketch for now -- I expect I'll
> need to elaborate it as we get into more detail, or to address other
> concerns...)
>
> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io> wrote:
> > For 1, yes, when there is a transient leader change, it's guaranteed
> that a
> > prefix of the messages in a request will be committed. However, it seems
> > that the client needs to know what subset of messages are committed in
> > order to resume the sending. Then the question is how.
> >
> > As Flavio indicated, for the use cases that you listed, it would be
> useful
> > to figure out the exact logic by using this feature. For example, in the
> > partition K/V store example, when we fail over to a new writer to the
> > commit log, the zombie writer can publish new messages to the log after
> the
> > new writer takes over, but before it publishes any message. We probably
> > need to outline how this case can be handled properly.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in> wrote:
> >
> >> Hi Jun,
> >>
> >> Thanks for the close reading! Responses inline.
> >>
> >> > Thanks for the write-up. The single producer use case you mentioned
> makes
> >> > sense. It would be useful to include that in the KIP wiki.
> >>
> >> Great -- I'll make sure that the wiki is clear about this.
> >>
> >> > 1. What happens when the leader of the partition changes in the middle
> >> of a
> >> > produce request? In this case, the producer client is not sure whether
> >> the
> >> > request succeeds or not. If there is only a single message in the
> >> request,
> >> > the producer can just resend the request. If it sees an OffsetMismatch
> >> > error, it knows that the previous send actually succeeded and can
> proceed
> >> > with the next write. This is nice since it not only allows the
> producer
> >> to
> >> > proceed during transient failures in the broker, it also avoids
> >> duplicates
> >> > during producer resend. One caveat is when there are multiple
> messages in
> >> > the same partition in a produce request. The issue is that in our
> current
> >> > replication protocol, it's possible for some, but not all messages in
> the
> >> > request to be committed. This makes resend a bit harder to deal with
> >> since
> >> > on receiving an OffsetMismatch error, it's not clear which messages
> have
> >> > been committed. One possibility is to expect that compression is
> enabled,
> >> > in which case multiple messages are compressed into a single message.
> I
> >> was
> >> > thinking that another possibility is for the broker to return the
> current
> >> > high watermark when sending an OffsetMismatch error. Based on this
> info,
> >> > the producer can resend the subset of messages that have not been
> >> > committed. However, this may not work in a compacted topic since there
> >> can
> >> > be holes in the offset.
> >>
> >> This is a excellent question. It's my understanding that at least a
> >> *prefix* of messages will be committed (right?) -- which seems to be
> >> enough for many cases. I'll try and come up with a more concrete
> >> answer here.
> >>
> >> > 2. Is this feature only intended to be used with ack = all? The client
> >> > doesn't get the offset with ack = 0. With ack = 1, it's possible for a
> >> > previously acked message to be lost during leader transition, which
> will
> >> > make the client logic more complicated.
> >>
> >> It's true that acks = 0 doesn't seem to be particularly useful; in all
> >> the cases I've come across, the client eventually wants to know about
> >> the mismatch error. However, it seems like there are some cases where
> >> acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
> >> losing messages during a leader transition just means you need to
> >> rewind / restart the load, which is not especially catastrophic. For
> >> many other interesting cases, acks = all is probably preferable.
> >>
> >> > 3. How does the producer client know the offset to send the first
> >> message?
> >> > Do we need to expose an API in producer to get the current high
> >> watermark?
> >>
> >> You're right, it might be irritating to have to go through the
> >> consumer API just for this. There are some cases where the offsets are
> >> already available -- like the commit-log-for-KV-store example -- but
> >> in general, being able to get the offsets from the producer interface
> >> does sound convenient.
> >>
> >> > We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
> >> you
> >> > can describe this KIP a bit then?
> >>
> >> Sure, happy to join.
> >>
> >> > Thanks,
> >> >
> >> > Jun
> >> >
> >> >
> >> >
> >> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <b...@kirw.in> wrote:
> >> >
> >> >> Just wanted to flag a little discussion that happened on the ticket:
> >> >>
> >> >>
> >>
> https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
> >> >>
> >> >> In particular, Yasuhiro Matsuda proposed an interesting variant on
> >> >> this that performs the offset check on the message key (instead of
> >> >> just the partition), with bounded space requirements, at the cost of
> >> >> potentially some spurious failures. (ie. the produce request may fail
> >> >> even if that particular key hasn't been updated recently.) This
> >> >> addresses a couple of the drawbacks of the per-key approach mentioned
> >> >> at the bottom of the KIP.
> >> >>
> >> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <b...@kirw.in> wrote:
> >> >> > Hi all,
> >> >> >
> >> >> > So, perhaps it's worth adding a couple specific examples of where
> this
> >> >> > feature is useful, to make this a bit more concrete:
> >> >> >
> >> >> > - Suppose I'm using Kafka as a commit log for a partitioned KV
> store,
> >> >> > like Samza or Pistachio (?) do. We bootstrap the process state by
> >> >> > reading from that partition, and log all state updates to that
> >> >> > partition when we're running. Now imagine that one of my processes
> >> >> > locks up -- GC or similar -- and the system transitions that
> partition
> >> >> > over to another node. When the GC is finished, the old 'owner' of
> that
> >> >> > partition might still be trying to write to the commit log at the
> same
> >> >> > as the new one is. A process might detect this by noticing that the
> >> >> > offset of the published message is bigger than it thought the
> upcoming
> >> >> > offset was, which implies someone else has been writing to the
> log...
> >> >> > but by then it's too late, and the commit log is already corrupt.
> With
> >> >> > a 'conditional produce', one of those processes will have it's
> publish
> >> >> > request refused -- so we've avoided corrupting the state.
> >> >> >
> >> >> > - Envision some copycat-like system, where we have some sharded
> >> >> > postgres setup and we're tailing each shard into its own partition.
> >> >> > Normally, it's fairly easy to avoid duplicates here: we can track
> >> >> > which offset in the WAL corresponds to which offset in Kafka, and
> we
> >> >> > know how many messages we've written to Kafka already, so the
> state is
> >> >> > very simple. However, it is possible that for a moment -- due to
> >> >> > rebalancing or operator error or some other thing -- two different
> >> >> > nodes are tailing the same postgres shard at once! Normally this
> would
> >> >> > introduce duplicate messages, but by specifying the expected
> offset,
> >> >> > we can avoid this.
> >> >> >
> >> >> > So perhaps it's better to say that this is useful when a single
> >> >> > producer is *expected*, but multiple producers are *possible*? (In
> the
> >> >> > same way that the high-level consumer normally has 1 consumer in a
> >> >> > group reading from a partition, but there are small windows where
> more
> >> >> > than one might be reading at the same time.) This is also the
> spirit
> >> >> > of the 'runtime cost' comment -- in the common case, where there is
> >> >> > little to no contention, there's no performance overhead either. I
> >> >> > mentioned this a little in the Motivation section -- maybe I should
> >> >> > flesh that out a little bit?
> >> >> >
> >> >> > For me, the motivation to work this up was that I kept running into
> >> >> > cases, like the above, where the existing API was
> almost-but-not-quite
> >> >> > enough to give the guarantees I was looking for -- and the
> extension
> >> >> > needed to handle those cases too was pretty small and
> natural-feeling.
> >> >> >
> >> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <asi...@cloudera.com
> >
> >> >> wrote:
> >> >> >> Good concept. I have a question though.
> >> >> >>
> >> >> >> Say there are two producers A and B. Both producers are producing
> to
> >> >> same
> >> >> >> partition.
> >> >> >> - A sends a message with expected offset, x1
> >> >> >> - Broker accepts is and sends an Ack
> >> >> >> - B sends a message with expected offset, x1
> >> >> >> - Broker rejects it, sends nack
> >> >> >> - B sends message again with expected offset, x1+1
> >> >> >> - Broker accepts it and sends Ack
> >> >> >> I guess this is what this KIP suggests, right? If yes, then how
> does
> >> >> this
> >> >> >> ensure that same message will not be written twice when two
> producers
> >> >> are
> >> >> >> producing to same partition? Producer on receiving a nack will try
> >> again
> >> >> >> with next offset and will keep doing so till the message is
> accepted.
> >> >> Am I
> >> >> >> missing something?
> >> >> >>
> >> >> >> Also, you have mentioned on KIP, "it imposes little to no runtime
> >> cost
> >> >> in
> >> >> >> memory or time", I think that is not true for time. With this
> >> approach
> >> >> >> producers' performance will reduce proportionally to number of
> >> producers
> >> >> >> writing to same partition. Please correct me if I am missing out
> >> >> something.
> >> >> >>
> >> >> >>
> >> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
> >> >> >> gharatmayures...@gmail.com> wrote:
> >> >> >>
> >> >> >>> If we have 2 producers producing to a partition, they can be out
> of
> >> >> order,
> >> >> >>> then how does one producer know what offset to expect as it does
> not
> >> >> >>> interact with other producer?
> >> >> >>>
> >> >> >>> Can you give an example flow that explains how it works with
> single
> >> >> >>> producer and with multiple producers?
> >> >> >>>
> >> >> >>>
> >> >> >>> Thanks,
> >> >> >>>
> >> >> >>> Mayuresh
> >> >> >>>
> >> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira <
> >> >> >>> fpjunque...@yahoo.com.invalid> wrote:
> >> >> >>>
> >> >> >>> > I like this feature, it reminds me of conditional updates in
> >> >> zookeeper.
> >> >> >>> > I'm not sure if it'd be best to have some mechanism for fencing
> >> >> rather
> >> >> >>> than
> >> >> >>> > a conditional write like you're proposing. The reason I'm
> saying
> >> >> this is
> >> >> >>> > that the conditional write applies to requests individually,
> >> while it
> >> >> >>> > sounds like you want to make sure that there is a single client
> >> >> writing
> >> >> >>> so
> >> >> >>> > over multiple requests.
> >> >> >>> >
> >> >> >>> > -Flavio
> >> >> >>> >
> >> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin <b...@kirw.in> wrote:
> >> >> >>> > >
> >> >> >>> > > Hi there,
> >> >> >>> > >
> >> >> >>> > > I just added a KIP for a 'conditional publish' operation: a
> >> simple
> >> >> >>> > > CAS-like mechanism for the Kafka producer. The wiki page is
> >> here:
> >> >> >>> > >
> >> >> >>> > >
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
> >> >> >>> > >
> >> >> >>> > > And there's some previous discussion on the ticket and the
> users
> >> >> list:
> >> >> >>> > >
> >> >> >>> > > https://issues.apache.org/jira/browse/KAFKA-2260
> >> >> >>> > >
> >> >> >>> > >
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
> >> >> >>> > >
> >> >> >>> > > As always, comments and suggestions are very welcome.
> >> >> >>> > >
> >> >> >>> > > Thanks,
> >> >> >>> > > Ben
> >> >> >>> >
> >> >> >>> >
> >> >> >>>
> >> >> >>>
> >> >> >>> --
> >> >> >>> -Regards,
> >> >> >>> Mayuresh R. Gharat
> >> >> >>> (862) 250-7125
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >>
> >> >> >> Regards,
> >> >> >> Ashish
> >> >>
> >>
>

Reply via email to