[jira] [Commented] (KAFKA-9879) How kafka deletes tombstone messages?
[ https://issues.apache.org/jira/browse/KAFKA-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087773#comment-17087773 ] VIkram commented on KAFKA-9879: --- Any update on this? > How kafka deletes tombstone messages? > - > > Key: KAFKA-9879 > URL: https://issues.apache.org/jira/browse/KAFKA-9879 > Project: Kafka > Issue Type: Bug >Reporter: VIkram >Priority: Major > > I was able to delete records in kafka using tombstone messages after few > attempts. However the algorithm (or logic) that kafka uses to delete these > tombstone messages is still unclear to me. > From my observations, I could figure out that there is some relation between > last modified time of a segment and deletion of tombstone messages. I have > looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a > bit complex to understand. > *Topic details* > > {{Topic:reddyvel_13 PartitionCount:1 ReplicationFactor:3 > Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1 > Topic: reddyvel_13 Partition: 0 Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}} > I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, > {{delete.retention.ms=1}} > *Timeline of events* > * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}} > +*cleaner log*+ > {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager > partition=reddyvel_13-0] Writing producer snapshot at offset 16623 > (kafka.log.ProducerStateManager) > [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, > dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. > (kafka.log.Log)}} > Compaction has been triggered immediately on this closed segment > +*cleaner log*+ > {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 > (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior > to Wed Dec 31 19:00:00 EST 1969)... (kafka.l > og.LogCleaner)}} > > * Sent few more messages along with few tombstones (to delete messages > present in first segment) and Second segment was closed at {{2020-04-02 > 07:56:50,405}} > +*cleaner log*+ > {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager > partition=reddyvel_13-0] Writing producer snapshot at offset 33868 > (kafka.log.ProducerStateManager) > [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, > dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. > (kafka.log.Log)}} > Compaction has been triggered > +*cleaner log*+ > {{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 > (cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior > to Thu Apr 02 07:11:59 EDT 2020)... (kafka.l > og.LogCleaner)}} > > Here, above log message says {{discarding tombstones prior to Thu Apr 02 > 07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing > timestamp ({{2020-04-02 07:12:09,908}}) - {{delete.retention.ms}} (10 > seconds) of my topic. I'm not able to figure out the link between these. > I want to understand at what time does kafka trigger deletion of tombstone > messages. Can someone explain the tombstone deletion algorithm in simpler > terms and the reasoning behind it? > > It's not a bug but I need more information on this. I have posted this in > other forums like stackoverflow but did not get any reply. The kafka official > documentation doesn't have this information. If this is not the correct > platform for this, kindly guide me to the relevant platform. > > Thanks in advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9879) How kafka deletes tombstone messages?
[ https://issues.apache.org/jira/browse/KAFKA-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] VIkram updated KAFKA-9879: -- Description: I was able to delete records in kafka using tombstone messages after few attempts. However the algorithm (or logic) that kafka uses to delete these tombstone messages is still unclear to me. >From my observations, I could figure out that there is some relation between >last modified time of a segment and deletion of tombstone messages. I have >looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit >complex to understand. *Topic details* {{Topic:reddyvel_13 PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1 Topic: reddyvel_13 Partition: 0 Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}} I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, {{delete.retention.ms=1}} *Timeline of events* * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}} +*cleaner log*+ {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager) [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. (kafka.log.Log)}} Compaction has been triggered immediately on this closed segment +*cleaner log*+ {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to Wed Dec 31 19:00:00 EST 1969)... (kafka.l og.LogCleaner)}} * Sent few more messages along with few tombstones (to delete messages present in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}} +*cleaner log*+ {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager) [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. (kafka.log.Log)}} Compaction has been triggered +*cleaner log*+ {{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020)... (kafka.l og.LogCleaner)}} Here, above log message says {{discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing timestamp ({{2020-04-02 07:12:09,908}}) - {{delete.retention.ms}} (10 seconds) of my topic. I'm not able to figure out the link between these. I want to understand at what time does kafka trigger deletion of tombstone messages. Can someone explain the tombstone deletion algorithm in simpler terms and the reasoning behind it? It's not a bug but I need more information on this. I have posted this in other forums like stackoverflow but did not get any reply. The kafka official documentation doesn't have this information. If this is not the correct platform for this, kindly guide me to the relevant platform. Thanks in advance. was: I was able to delete records in kafka using tombstone messages after few attempts. However the algorithm (or logic) that kafka uses to delete these tombstone messages is still unclear to me. >From my observations, I could figure out that there is some relation between >last modified time of a segment and deletion of tombstone messages. I have >looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit >complex to understand. *Topic details* {{Topic:reddyvel_13 PartitionCount:1ReplicationFactor:3 Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1 Topic: reddyvel_13 Partition: 0Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}} I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, {{delete.retention.ms=1}} *Timeline of events* * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}} {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager) [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. (kafka.log.Log)}} Compaction has been triggered immediately on this closed segment {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to Wed Dec 31 19:00:00 EST 1969)... (kafka.l og.LogCleaner)}} * Sent few more messages along with few tombstones (to delete messages present in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}} {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddy
[jira] [Created] (KAFKA-9879) How kafka deletes tombstone messages?
VIkram created KAFKA-9879: - Summary: How kafka deletes tombstone messages? Key: KAFKA-9879 URL: https://issues.apache.org/jira/browse/KAFKA-9879 Project: Kafka Issue Type: Bug Reporter: VIkram I was able to delete records in kafka using tombstone messages after few attempts. However the algorithm (or logic) that kafka uses to delete these tombstone messages is still unclear to me. >From my observations, I could figure out that there is some relation between >last modified time of a segment and deletion of tombstone messages. I have >looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit >complex to understand. *Topic details* {{Topic:reddyvel_13 PartitionCount:1ReplicationFactor:3 Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1 Topic: reddyvel_13 Partition: 0Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}} I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, {{delete.retention.ms=1}} *Timeline of events* * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}} {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager) [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. (kafka.log.Log)}} Compaction has been triggered immediately on this closed segment {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to Wed Dec 31 19:00:00 EST 1969)... (kafka.l og.LogCleaner)}} * Sent few more messages along with few tombstones (to delete messages present in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}} {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager) [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. (kafka.log.Log)}} Compaction has been triggered {{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020)... (kafka.l og.LogCleaner)}} Here, above log message says {{discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing timestamp ({{2020-04-02 07:12:09,908}}) - {{delete.retention.ms}} (10 seconds) of my topic. I'm not able to figure out the link between these. I want to understand at what time does kafka trigger deletion of tombstone messages. Can someone explain the tombstone deletion algorithm in simpler terms and the reasoning behind it? It's not a bug but I need more information on this. I have posted this in other forums like stackoverflow but did not get any reply. The kafka official documentation doesn't have this information. If this is not the correct platform for this, kindly guide me to the relevant platform. Thanks in advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036125#comment-17036125 ] VIkram commented on KAFKA-9280: --- Will the consumer FetchRequest\{from=1000} gets served? > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for t229-0:12 ms has passed since batch creation_ > _}}}_ > > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer re
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028811#comment-17028811 ] VIkram commented on KAFKA-9280: --- [~bchen225242] Any update on this. I still face this issue > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for t229-0:12 ms has passed since batch creation_ > _}}}_ > > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now produce
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991419#comment-16991419 ] VIkram commented on KAFKA-9280: --- Let's say high watermark for topic partition is 1000 and leader, follower replicas have same messages exactly. In this scenario, producer sends a message to leader and other replicas and consumer sends a fetch request to leader. Is there a possibility here, where a consumer fetch request will be served before other replicas fetch request? > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _C
[jira] [Created] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
VIkram created KAFKA-9280: - Summary: Duplicate messages are observed in ACK mode ALL Key: KAFKA-9280 URL: https://issues.apache.org/jira/browse/KAFKA-9280 Project: Kafka Issue Type: Bug Affects Versions: 2.2.1 Reporter: VIkram In ack mode ALL, leader is sending the message to consumer even before receiving the acknowledgements from other replicas. This can lead to *+duplicate messages+*. Setup details: * 1 zookeeper, 5 brokers * Producer: Synchronous * Topic: 1 partition, replication factor - 3, min isr - 2 Say First replica (Leader), Second replica and Third replica are the three replicas of the topic. *Sequence of events:* a) All brokers are up and running. b) Clients started running. c) Kill second replica of the topic. d) Kill the third replica. Now min isr will not be satisfied. e) Bring up third replica. Min isr will be satisfied. *Breakdown of step 'd':* # Just before producer sends next message, killed third replica with kill -9 (Leader takes time ~5sec to detect that the broker is down). # Producer sent a message to leader. # Before the leader knows that third replica is down, it accepts the message from producer. # Leader forwards the message to third replica. # Before receiving ACK from third replica, leader sent the message to consumer. # Leader doesn't get an ACK from third replica. # Now leader detects that third replica is down and throws NOT_ENOUGH_REPLICAS_EXCEPTION. # Now leader stops accepting messages from producer. # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION after timeout (2min in our case) . # So far, producer believes that the message was not received by leader whereas the consumer actually received it. # Now producer retries sending the same message. (In our application it is the next integer we send). # Now when second/third replica is up, leader accepts the message and sends the same message to consumer. *Thus sending duplicates.* *Logs:* # 2-3 seconds before producer sends next message, killed third replica with kill -9 (Leader takes time ~5sec to detect that the broker is down). _{{{_ _> kill -9 49596_ _}}}_ __ # Producer sent a message to leader. _{{{_ _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ _}}}_ # Before the leader knows that third replica is down, it accepts the message from producer. # Leader forwards the message to third replica. # Before receiving ACK from third replica, leader sent the message to consumer. _{{{_ _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size = -1, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = p229-4)_ _}}}_ __ # Leader doesn't get an ACK from third replica. # Now leader detects that third replica is down and throws NOT_ENOUGH_REPLICAS_EXCEPTION. _{{{_ _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing append operation on partition t229-0 (kafka.server.ReplicaManager)_ _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition t229-0_ _}}}_ # Now leader stops accepting messages from producer. # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION after timeout (2min in our case) . _{{{_ _java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t229-0:12 ms_ _has passed since batch creation_ _at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ _at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ _at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t229-0:12 ms has passed since batch creation_ _}}}_ # So far, producer believes that the message was not received by leader whereas the consumer actually received it. # Now producer retries sending the same message. (In our application it is the next integer we send). # Now when second/third replica is up, leader accepts the message and sends the same to consumer. Thus sending duplicates. Ideally, in ACK mode all it is expected that leader sends message to consumer only after it receives ack from all other replicas. But this is not happening. +*Question*+ 1) In ack =all case, Does leader send message to consumer only after all in