VIkram created KAFKA-9280:
-----------------------------

             Summary: Duplicate messages are observed in ACK mode ALL
                 Key: KAFKA-9280
                 URL: https://issues.apache.org/jira/browse/KAFKA-9280
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.2.1
            Reporter: VIkram


In ack mode ALL, leader is sending the message to consumer even before 
receiving the acknowledgements from other replicas. This can lead to 
*+duplicate messages+*.

 

Setup details:
 * 1 zookeeper, 5 brokers
 * Producer: Synchronous
 * Topic: 1 partition, replication factor - 3, min isr - 2

 

Say First replica (Leader), Second replica and Third replica are the three 
replicas of the topic.

 

*Sequence of events:*

a) All brokers are up and running.

b) Clients started running.

c) Kill second replica of the topic.

d) Kill the third replica. Now min isr will not be satisfied.

e) Bring up third replica. Min isr will be satisfied.

 

*Breakdown of step 'd':*
 # Just before producer sends next message, killed third replica with kill -9 
(Leader takes time ~5sec to detect that the broker is down).
 # Producer sent a message to leader.
 # Before the leader knows that third replica is down, it accepts the message 
from producer.
 # Leader forwards the message to third replica.
 # Before receiving ACK from third replica, leader sent the message to consumer.
 # Leader doesn't get an ACK from third replica.
 # Now leader detects that third replica is down and throws 
NOT_ENOUGH_REPLICAS_EXCEPTION.
 # Now leader stops accepting messages from producer.
 # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
after timeout (2min in our case) .
 # So far, producer believes that the message was not received by leader 
whereas the consumer actually received it.
 # Now producer retries sending the same message. (In our application it is the 
next integer we send).
 # Now when second/third replica is up, leader accepts the message and sends 
the same message to consumer. *Thus sending duplicates.*

 

 

*Logs:*
 # 2-3 seconds before producer sends next message, killed third replica with 
kill -9 (Leader takes time ~5sec to detect that the broker is down).

_{{{_

_> kill -9 49596_

_}}}_

 __ 
 # Producer sent a message to leader.

_{{{_

_[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = [], 
isReadOnly = false), key=null, value=p229-4, timestamp=null)_

_}}}_

 
 # Before the leader knows that third replica is down, it accepts the message 
from producer.
 # Leader forwards the message to third replica.
 # Before receiving ACK from third replica, leader sent the message to consumer.

_{{{_

 _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size = 
-1, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly 
= false), key = null, value = p229-4)_

_}}}_

 __ 
 # Leader doesn't get an ACK from third replica.
 # Now leader detects that third replica is down and throws 
NOT_ENOUGH_REPLICAS_EXCEPTION.

_{{{_

_[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition t229-0 (kafka.server.ReplicaManager)_

_org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for 
partition t229-0_

_}}}_

 
 # Now leader stops accepting messages from producer.
 # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
after timeout (2min in our case) .

_{{{_

 _java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
t229-0:120000 ms_

_has passed since batch creation_

        _at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_

        _at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_

        _at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_

_Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
record(s) for t229-0:120000 ms has passed since batch creation_

_}}}_

 
 # So far, producer believes that the message was not received by leader 
whereas the consumer actually received it.
 # Now producer retries sending the same message. (In our application it is the 
next integer we send).
 # Now when second/third replica is up, leader accepts the message and sends 
the same to consumer. Thus sending duplicates.

 

Ideally, in ACK mode all it is expected that leader sends message to consumer 
only after it receives ack from all other replicas. But this is not happening.

 

+*Question*+

1) In ack =all case, Does leader send message to consumer only after all 
in-sync followers receive the message?

(or)

will it send message to consumer and then wait for followers acknowledgement?

 

+*Observation*+

For a topic with replication factor > 1, We did a test to measure round trip 
time (client1 -> kafka -> client2 -> kafka -> client1) of messages with both 
acks = 1 and acks = all , and observed latency to be same in both cases. Is 
this expected?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to