The key point is:

if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {

The high watermark only moves when all the replicas in ISR have that
particular offset. Does that clarify it?

Ismael

On Thu, Jun 7, 2018 at 10:31 PM Carl Samuelson <c...@brightbobbin.com>
wrote:

> Hi
>
> Hopefully this is the correct email address and forum for this.
> I asked this question on stack overflow, but did not get an answer:
> https://stackoverflow.com/questions/50689177/kafka-ack-all-and-min-isr
>
> *Summary*
>
> The docs and code comments for Kafka suggest that when the producer setting
> acks is set to allthen an ack will only be sent to the producer when *all
> in-sync replicas have caught up*, but the code (Partition.Scala,
> checkEnoughReplicasReachOffset) seems to suggest that the ack is sent as
> soon as *min in-sync replicas have caught up*.
>
> *Details*
>
> The kafka docs have this:
>
> acks=all This means the leader will wait for the full set of in-sync
> replicas to acknowledge the record. source
> <https://kafka.apache.org/documentation/>
>
> Also, looking at the Kafka source code - partition.scala
> checkEnoughReplicasReachOffset() has the following comment (emphasis mine):
>
> Note that this method will only be called if requiredAcks = -1 and we are
> waiting for *all replicas*in ISR to be fully caught up to the (local)
> leader's offset corresponding to this produce request before we acknowledge
> the produce request.
>
> Finally, this answer <https://stackoverflow.com/a/45783921/746754> on
> Stack
> Overflow (emphasis mine again)
>
> Also the min in-sync replica setting specifies the minimum number of
> replicas that need to be in-sync for the partition to remain available for
> writes. When a producer specifies ack (-1 / all config) it will still wait
> for acks from *all in sync replicas* at that moment (independent of the
> setting for min in-sync replicas).
>
> But when I look at the code in Partition.Scala (note minIsr <
> curInSyncReplicas.size):
>
> def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean,
> Errors) = {
>   ...
>   val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>   if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
>     if (minIsr <= curInSyncReplicas.size)
>       (true, Errors.NONE)
>
> The code that calls this returns the ack:
>
> if (error != Errors.NONE || hasEnough) {
>   status.acksPending = false
>   status.responseStatus.error = error
> }
>
> So, the code looks like it returns an ack as soon as the in-sync replica
> set are greater than min in-sync replicas. However, the documentation and
> comments suggest that the ack is only sent once all in-sync replicas have
> caught up. What am I missing? At the very least, the comment above
> checkEnoughReplicasReachOffset looks like it should be changed.
> Regards,
>
> Carl
>

Reply via email to