[ 
https://issues.apache.org/jira/browse/KAFKA-5494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apurva Mehta reassigned KAFKA-5494:
-----------------------------------

    Assignee: Apurva Mehta

> Idempotent producer should not require 
> max.in.flight.requests.per.connection=1 and acks=all
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5494
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5494
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.11.0.0
>            Reporter: Apurva Mehta
>            Assignee: Apurva Mehta
>              Labels: exactly-once
>
> Currently, the idempotent producer (and hence transactional producer) 
> requires max.in.flight.requests.per.connection=1.
> This was due to simplifying the implementation on the client and server. With 
> some additional work, we can satisfy the idempotent guarantees even with any 
> number of in flight requests. The changes on the client be summarized as 
> follows:
>  
> # We increment sequence numbers when batches are drained.
> # If for some reason, a batch fails with a retriable error, we know that all 
> future batches would fail with an out of order sequence exception. 
> # As such, the client should treat some OutOfOrderSequence errors as 
> retriable. In particular, we should maintain the 'last acked sequnece'. If 
> the batch succeeding the last ack'd sequence has an OutOfOrderSequence, that 
> is a fatal error. If a future batch fails with OutOfOrderSequence they should 
> be reenqeued.
> # With the changes above, the the producer queues should become priority 
> queues ordered by the sequence numbers. 
> # The partition is not ready unless the front of the queue has the next 
> expected sequence.
> With the changes above, we would get the benefits of multiple inflights in 
> normal cases. When there are failures, we automatically constrain to a single 
> inflight until we get back in sequence. 
> With multiple inflights, we now have the possibility of getting duplicates 
> for batches other than the last appended batch. In order to return the record 
> metadata (including offset) of the duplicates inside the log, we would 
> require a log scan at the tail to get the metadata at the tail. This can be 
> optimized by caching the metadata for the last 'n' batches. For instance, if 
> the default max.inflight is 5, we could cache the record metadata of the last 
> 5 batches, and fall back to a scan if the duplicate is not within those 5. 
> * *
> The reason to have acks=all is to protect against OutOfOrderSequence 
> exceptions in the case where the leader fails before replication happens. In 
> that case, the next batch sent by the producer would get an 
> OutOfOrderSequence because the new leader would not have the last message. 
> This may be OK: for applications which really care about avoiding duplicates, 
> they have to handle fatal errors of this sort anyway. In particular, the 
> recommendation is to close the producer in the callback on a fatal error and 
> then check the tail of the log for the last committed message, and then start 
> sending from there. 
> By making acks=all, this application logic would just be exercised more 
> frequently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to