[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034540#comment-17034540
 ] 

Stanislav Kozlovski commented on KAFKA-9280:
--------------------------------------------

[~vikram484] this shouldn't happen because the leader waits for a second fetch 
request that proves that the follower has that offset.

e.g high watermark is 1000. leader and follower are both at 1000. The follower 
dies but managed to send a fetch request - FetchRequest\{from=1000} in flight. 
Meanwhile the producer produces offset 1001 with acks=all. The leader will not 
acknowledge that produce request until all in-sync followers issue a 
FetchRequest with a `from` value of at least 1001.

Does that make sense?

> Duplicate messages are observed in ACK mode ALL
> -----------------------------------------------
>
>                 Key: KAFKA-9280
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9280
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.2.1
>            Reporter: VIkram
>            Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:120000 ms_
> _has passed since batch creation_
>         _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>         _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>         _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for t229-0:120000 ms has passed since batch creation_
> _}}}_
>  
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same to consumer. Thus sending duplicates.
>  
> Ideally, in ACK mode all it is expected that leader sends message to consumer 
> only after it receives ack from all other replicas. But this is not happening.
>  
> +*Question*+
> 1) In ack =all case, Does leader send message to consumer only after all 
> in-sync followers receive the message?
> (or)
> will it send message to consumer and then wait for followers acknowledgement?
>  
> +*Observation*+
> For a topic with replication factor > 1, We did a test to measure round trip 
> time (client1 -> kafka -> client2 -> kafka -> client1) of messages with both 
> acks = 1 and acks = all , and observed latency to be same in both cases. Is 
> this expected?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to