Thanks. What I'm looking for isn't that messages can't be interleaved, it's something like causal order.
We're updating a database where every node has their own copy of the database state (similar to RocksDB), by sending changes to that database via Kafka. In our application thread 1 is creating records in the database, and thread 2 is updating fields in existing database records. So thread 1 is doing something like PUT {id=someKey x=1} and thread 2 is doing UPDATE someKey WITH {x=2}. The reason I care about ordering is that PUTs will clobber the existing record if it's there, so having the PUT replayed after the UPDATE will clobber that update. The guarantee I'm after is that if thread 2 observes a PUT coming through Kafka, and issues an UPDATE for that record id, the observed PUT won't be appended to Kafka again later because the producer replayed it. If I understand you correctly, that's what idempotence guarantees. If the PUT is not appended to the log, it will be retried, but then thread 2 can't have observed it. If the PUT is appended to the log, and thread 1 just didn't see the ack, thread 2 may have observed the PUT, but then it will not be inserted into the log again on retry, so the UPDATE thread 2 does in response won't be clobbered. Regarding OutOfOrderSequenceException, thanks for the explanation. I'm not entirely sure I get it. Let me just describe a couple of cases and you can correct my understanding: If the producer has x=1, x=2 and x=3 in flight, and x=2 gets a sequence number error, it's probably because the x=1 write hasn't arrived at the broker yet, so the producer can just retry it internally without risking reordering, and it's only when sequence errors occur for the first in flight message where the producer needs to reset the epoch, right? If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a delivery timeout, does that provoke an (escaping from the producer) OutOfOrderSequenceException for x=2 if the broker never saw x=1? If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a sequence number error, is that always indicating broker-side data loss? My understanding is that sequence number gaps should only occur because there was a previous message (x=0) which was acked by the broker, and now the broker doesn't have that message anymore. This provokes an epoch bump in the producer and also lets the exception escape to the caller, so they can decide what to do about it, and are alerted to the data loss. If the caller decides to keep using the producer instance, why can't the producer reset the epoch and use the old sequence numbers when assigning new sequence numbers to the buffered messages? If the producer knows that in the old epoch, the order was x=1, x=2, x=3, then couldn't it use that knowledge after the epoch bump to keep the ordering of those messages the same? Is it fair to only expect to see OutOfOrderSequenceExceptions escape from the producer if the brokers lose data, or do I need to guard against it happening even if there is no data loss on the brokers? Den tors. 17. apr. 2025 kl. 04.02 skrev Matthias J. Sax <mj...@apache.org>: > >> The behavior I'm looking for is that if thread 1 sends message x=1, and > >> thread 2 sees message x=1 come through Kafka, thread 2 can't then write > >> message x=2 and have that be clobbered because thread 1's producer is > >> retrying the send since it didn't receive the ack in time. > > This is not how it works. Idempotency guarantees are for a single > producer only -- if you have two producer, both write into the topic > interleaved, and there is no guarantee about the order of messages > between both producers. > > For the first producer two error cases can happen: either x=1 was not > appended to the log at all, and no offset was "assigned" to the message. > For this case, the producer will retry, and if producer-2 writes in > between, the write of x=1 goes to a later offset. > > Or, the write was successful, but the ack was lost -- for this case, > there is no reason to block producer-2 either. Of course, producer-1 > might retry sending x=1 but the broker would detect this as an > idempotent write, and "drop the write on the floor", just re-sending the > ack to producer-1. > > > > > I don't understand why this is needed? > > Assume you write x=1, x=2, x=3, and x=1 does not make it an returns > `OutOfOrderSequenceException`. For this case x=2 and x=3 might still be > in the send buffer of the producer. If you don't close the producer, and > just retry sending x=1, it would be written after x=3, so not in the > original order. > > This happens, because when `OutOfOrderSequenceException` error happens, > the producer bumps its epoch, and resets sequence number to zero. This > allows the producer to keep sending data as the JavaDocs points out. The > already buffered data in the producer send buffer, would use this new > epoch and reset sequence numbers. > > By closing the producer though, you also throw away pending writes for > x=2 and x=3, and would call producer.send() for all three messages > again, and thus can again send in the intended order x=1, x=2, x=3. > > > Does this answer your questions? > > > -Matthias > > > > On 4/15/25 3:19 AM, Stig Rohde Døssing wrote: > > Hi, > > > > If I understand correctly, the idempotent producer should be able to > > guarantee that messages are not duplicated when the producer retries > sends, > > and as of https://issues.apache.org/jira/browse/KAFKA-5494, it should > allow > > for the producer to have multiple in flight requests at a time without > > risking that the messages are reordered because batches are retried out > of > > order. > > > > The behavior I'm looking for is that if thread 1 sends message x=1, and > > thread 2 sees message x=1 come through Kafka, thread 2 can't then write > > message x=2 and have that be clobbered because thread 1's producer is > > retrying the send since it didn't receive the ack in time. > > > > The KafkaProducer Javadoc explains that when using the idempotent > producer, > > "it is possible to continue sending after receiving an > > OutOfOrderSequenceException, but doing so can result in out of order > > delivery of pending messages. To ensure proper ordering, you should close > > the producer and create a new instance." > > > > I don't understand why this is needed? > > > > If I understand the feature correctly (please correct me), messages get > > sequence numbers, and requests fail if the messages are not received in > > order by the broker. Most out of order errors are handled internally by > the > > producer, either by completing the request or by retrying. The case it > > can't handle is if the first pending message has a gap to the last acked > > sequence number, e.g. if the last acked number is 10 and the first > message > > the producer has pending is 12. In that case there has been message loss, > > and the exception escapes from the producer to the caller. > > > > Is this the case the Javadoc note is referring to? > > > > Why is it necessary/helpful to terminate and replace the producer in > order > > to guarantee ordering when this kind of gap occurs? > > > >