That's a fair point. I've added some imagined job logic to the KIP, so
we can make sure the proposal stays in sync with the usages we're
discussing. (The logic is just a quick sketch for now -- I expect I'll
need to elaborate it as we get into more detail, or to address other

On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <> wrote:
> For 1, yes, when there is a transient leader change, it's guaranteed that a
> prefix of the messages in a request will be committed. However, it seems
> that the client needs to know what subset of messages are committed in
> order to resume the sending. Then the question is how.
> As Flavio indicated, for the use cases that you listed, it would be useful
> to figure out the exact logic by using this feature. For example, in the
> partition K/V store example, when we fail over to a new writer to the
> commit log, the zombie writer can publish new messages to the log after the
> new writer takes over, but before it publishes any message. We probably
> need to outline how this case can be handled properly.
> Thanks,
> Jun
> On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <> wrote:
>> Hi Jun,
>> Thanks for the close reading! Responses inline.
>> > Thanks for the write-up. The single producer use case you mentioned makes
>> > sense. It would be useful to include that in the KIP wiki.
>> Great -- I'll make sure that the wiki is clear about this.
>> > 1. What happens when the leader of the partition changes in the middle
>> of a
>> > produce request? In this case, the producer client is not sure whether
>> the
>> > request succeeds or not. If there is only a single message in the
>> request,
>> > the producer can just resend the request. If it sees an OffsetMismatch
>> > error, it knows that the previous send actually succeeded and can proceed
>> > with the next write. This is nice since it not only allows the producer
>> to
>> > proceed during transient failures in the broker, it also avoids
>> duplicates
>> > during producer resend. One caveat is when there are multiple messages in
>> > the same partition in a produce request. The issue is that in our current
>> > replication protocol, it's possible for some, but not all messages in the
>> > request to be committed. This makes resend a bit harder to deal with
>> since
>> > on receiving an OffsetMismatch error, it's not clear which messages have
>> > been committed. One possibility is to expect that compression is enabled,
>> > in which case multiple messages are compressed into a single message. I
>> was
>> > thinking that another possibility is for the broker to return the current
>> > high watermark when sending an OffsetMismatch error. Based on this info,
>> > the producer can resend the subset of messages that have not been
>> > committed. However, this may not work in a compacted topic since there
>> can
>> > be holes in the offset.
>> This is a excellent question. It's my understanding that at least a
>> *prefix* of messages will be committed (right?) -- which seems to be
>> enough for many cases. I'll try and come up with a more concrete
>> answer here.
>> > 2. Is this feature only intended to be used with ack = all? The client
>> > doesn't get the offset with ack = 0. With ack = 1, it's possible for a
>> > previously acked message to be lost during leader transition, which will
>> > make the client logic more complicated.
>> It's true that acks = 0 doesn't seem to be particularly useful; in all
>> the cases I've come across, the client eventually wants to know about
>> the mismatch error. However, it seems like there are some cases where
>> acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
>> losing messages during a leader transition just means you need to
>> rewind / restart the load, which is not especially catastrophic. For
>> many other interesting cases, acks = all is probably preferable.
>> > 3. How does the producer client know the offset to send the first
>> message?
>> > Do we need to expose an API in producer to get the current high
>> watermark?
>> You're right, it might be irritating to have to go through the
>> consumer API just for this. There are some cases where the offsets are
>> already available -- like the commit-log-for-KV-store example -- but
>> in general, being able to get the offsets from the producer interface
>> does sound convenient.
>> > We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
>> you
>> > can describe this KIP a bit then?
>> Sure, happy to join.
>> > Thanks,
>> >
>> > Jun
>> >
>> >
>> >
>> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <> wrote:
>> >
>> >> Just wanted to flag a little discussion that happened on the ticket:
>> >>
>> >>
>> >>
>> >> In particular, Yasuhiro Matsuda proposed an interesting variant on
>> >> this that performs the offset check on the message key (instead of
>> >> just the partition), with bounded space requirements, at the cost of
>> >> potentially some spurious failures. (ie. the produce request may fail
>> >> even if that particular key hasn't been updated recently.) This
>> >> addresses a couple of the drawbacks of the per-key approach mentioned
>> >> at the bottom of the KIP.
>> >>
>> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <> wrote:
>> >> > Hi all,
>> >> >
>> >> > So, perhaps it's worth adding a couple specific examples of where this
>> >> > feature is useful, to make this a bit more concrete:
>> >> >
>> >> > - Suppose I'm using Kafka as a commit log for a partitioned KV store,
>> >> > like Samza or Pistachio (?) do. We bootstrap the process state by
>> >> > reading from that partition, and log all state updates to that
>> >> > partition when we're running. Now imagine that one of my processes
>> >> > locks up -- GC or similar -- and the system transitions that partition
>> >> > over to another node. When the GC is finished, the old 'owner' of that
>> >> > partition might still be trying to write to the commit log at the same
>> >> > as the new one is. A process might detect this by noticing that the
>> >> > offset of the published message is bigger than it thought the upcoming
>> >> > offset was, which implies someone else has been writing to the log...
>> >> > but by then it's too late, and the commit log is already corrupt. With
>> >> > a 'conditional produce', one of those processes will have it's publish
>> >> > request refused -- so we've avoided corrupting the state.
>> >> >
>> >> > - Envision some copycat-like system, where we have some sharded
>> >> > postgres setup and we're tailing each shard into its own partition.
>> >> > Normally, it's fairly easy to avoid duplicates here: we can track
>> >> > which offset in the WAL corresponds to which offset in Kafka, and we
>> >> > know how many messages we've written to Kafka already, so the state is
>> >> > very simple. However, it is possible that for a moment -- due to
>> >> > rebalancing or operator error or some other thing -- two different
>> >> > nodes are tailing the same postgres shard at once! Normally this would
>> >> > introduce duplicate messages, but by specifying the expected offset,
>> >> > we can avoid this.
>> >> >
>> >> > So perhaps it's better to say that this is useful when a single
>> >> > producer is *expected*, but multiple producers are *possible*? (In the
>> >> > same way that the high-level consumer normally has 1 consumer in a
>> >> > group reading from a partition, but there are small windows where more
>> >> > than one might be reading at the same time.) This is also the spirit
>> >> > of the 'runtime cost' comment -- in the common case, where there is
>> >> > little to no contention, there's no performance overhead either. I
>> >> > mentioned this a little in the Motivation section -- maybe I should
>> >> > flesh that out a little bit?
>> >> >
>> >> > For me, the motivation to work this up was that I kept running into
>> >> > cases, like the above, where the existing API was almost-but-not-quite
>> >> > enough to give the guarantees I was looking for -- and the extension
>> >> > needed to handle those cases too was pretty small and natural-feeling.
>> >> >
>> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <>
>> >> wrote:
>> >> >> Good concept. I have a question though.
>> >> >>
>> >> >> Say there are two producers A and B. Both producers are producing to
>> >> same
>> >> >> partition.
>> >> >> - A sends a message with expected offset, x1
>> >> >> - Broker accepts is and sends an Ack
>> >> >> - B sends a message with expected offset, x1
>> >> >> - Broker rejects it, sends nack
>> >> >> - B sends message again with expected offset, x1+1
>> >> >> - Broker accepts it and sends Ack
>> >> >> I guess this is what this KIP suggests, right? If yes, then how does
>> >> this
>> >> >> ensure that same message will not be written twice when two producers
>> >> are
>> >> >> producing to same partition? Producer on receiving a nack will try
>> again
>> >> >> with next offset and will keep doing so till the message is accepted.
>> >> Am I
>> >> >> missing something?
>> >> >>
>> >> >> Also, you have mentioned on KIP, "it imposes little to no runtime
>> cost
>> >> in
>> >> >> memory or time", I think that is not true for time. With this
>> approach
>> >> >> producers' performance will reduce proportionally to number of
>> producers
>> >> >> writing to same partition. Please correct me if I am missing out
>> >> something.
>> >> >>
>> >> >>
>> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
>> >> >>> wrote:
>> >> >>
>> >> >>> If we have 2 producers producing to a partition, they can be out of
>> >> order,
>> >> >>> then how does one producer know what offset to expect as it does not
>> >> >>> interact with other producer?
>> >> >>>
>> >> >>> Can you give an example flow that explains how it works with single
>> >> >>> producer and with multiple producers?
>> >> >>>
>> >> >>>
>> >> >>> Thanks,
>> >> >>>
>> >> >>> Mayuresh
>> >> >>>
>> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira <
>> >> >>>> wrote:
>> >> >>>
>> >> >>> > I like this feature, it reminds me of conditional updates in
>> >> zookeeper.
>> >> >>> > I'm not sure if it'd be best to have some mechanism for fencing
>> >> rather
>> >> >>> than
>> >> >>> > a conditional write like you're proposing. The reason I'm saying
>> >> this is
>> >> >>> > that the conditional write applies to requests individually,
>> while it
>> >> >>> > sounds like you want to make sure that there is a single client
>> >> writing
>> >> >>> so
>> >> >>> > over multiple requests.
>> >> >>> >
>> >> >>> > -Flavio
>> >> >>> >
>> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin <> wrote:
>> >> >>> > >
>> >> >>> > > Hi there,
>> >> >>> > >
>> >> >>> > > I just added a KIP for a 'conditional publish' operation: a
>> simple
>> >> >>> > > CAS-like mechanism for the Kafka producer. The wiki page is
>> here:
>> >> >>> > >
>> >> >>> > >
>> >> >>> >
>> >> >>>
>> >>
>> >> >>> > >
>> >> >>> > > And there's some previous discussion on the ticket and the users
>> >> list:
>> >> >>> > >
>> >> >>> > >
>> >> >>> > >
>> >> >>> > >
>> >> >>> >
>> >> >>>
>> >>
>> >> >>> > >
>> >> >>> > > As always, comments and suggestions are very welcome.
>> >> >>> > >
>> >> >>> > > Thanks,
>> >> >>> > > Ben
>> >> >>> >
>> >> >>> >
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> -Regards,
>> >> >>> Mayuresh R. Gharat
>> >> >>> (862) 250-7125
>> >> >>>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >>
>> >> >> Regards,
>> >> >> Ashish
>> >>

Reply via email to