[jira] [Commented] (KAFKA-9879) How kafka deletes tombstone messages?

2020-04-20 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087773#comment-17087773
 ] 

VIkram commented on KAFKA-9879:
---

Any update on this?

> How kafka deletes tombstone messages?
> -
>
> Key: KAFKA-9879
> URL: https://issues.apache.org/jira/browse/KAFKA-9879
> Project: Kafka
>  Issue Type: Bug
>Reporter: VIkram
>Priority: Major
>
> I was able to delete records in kafka using tombstone messages after few 
> attempts. However the algorithm (or logic) that kafka uses to delete these 
> tombstone messages is still unclear to me.
> From my observations, I could figure out that there is some relation between 
> last modified time of a segment and deletion of tombstone messages. I have 
> looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a 
> bit complex to understand.
> *Topic details*
>  
> {{Topic:reddyvel_13 PartitionCount:1 ReplicationFactor:3 
> Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1
>  Topic: reddyvel_13 Partition: 0 Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}}
> I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, 
> {{delete.retention.ms=1}}
> *Timeline of events*
>  * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}}
> +*cleaner log*+
> {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager 
> partition=reddyvel_13-0] Writing producer snapshot at offset 16623 
> (kafka.log.ProducerStateManager)
>  [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, 
> dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. 
> (kafka.log.Log)}}
> Compaction has been triggered immediately on this closed segment
> +*cleaner log*+ 
> {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 
> (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior 
> to Wed Dec 31 19:00:00 EST 1969)... (kafka.l
>  og.LogCleaner)}}
>  
>  * Sent few more messages along with few tombstones (to delete messages 
> present in first segment) and Second segment was closed at {{2020-04-02 
> 07:56:50,405}}
>  +*cleaner log*+
> {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager 
> partition=reddyvel_13-0] Writing producer snapshot at offset 33868 
> (kafka.log.ProducerStateManager)
>  [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, 
> dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. 
> (kafka.log.Log)}}
> Compaction has been triggered
> +*cleaner log*+ 
> {{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 
> (cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior 
> to Thu Apr 02 07:11:59 EDT 2020)... (kafka.l
>  og.LogCleaner)}}
>  
> Here, above log message says {{discarding tombstones prior to Thu Apr 02 
> 07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing 
> timestamp ({{2020-04-02 07:12:09,908}})   -   {{delete.retention.ms}} (10 
> seconds) of my topic. I'm not able to figure out the link between these.
> I want to understand at what time does kafka trigger deletion of tombstone 
> messages. Can someone explain the tombstone deletion algorithm in simpler 
> terms and the reasoning behind it?
>  
> It's not a bug but I need more information on this. I have posted this in 
> other forums like stackoverflow but did not get any reply. The kafka official 
> documentation doesn't have this information. If this is not the correct 
> platform for this, kindly guide me to the relevant platform.
>  
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9879) How kafka deletes tombstone messages?

2020-04-16 Thread VIkram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

VIkram updated KAFKA-9879:
--
Description: 
I was able to delete records in kafka using tombstone messages after few 
attempts. However the algorithm (or logic) that kafka uses to delete these 
tombstone messages is still unclear to me.

>From my observations, I could figure out that there is some relation between 
>last modified time of a segment and deletion of tombstone messages. I have 
>looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit 
>complex to understand.

*Topic details*

 

{{Topic:reddyvel_13 PartitionCount:1 ReplicationFactor:3 
Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1
 Topic: reddyvel_13 Partition: 0 Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}}

I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, 
{{delete.retention.ms=1}}

*Timeline of events*
 * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}}

+*cleaner log*+

{{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager)
 [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. 
(kafka.log.Log)}}

Compaction has been triggered immediately on this closed segment

+*cleaner log*+ 

{{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to 
Wed Dec 31 19:00:00 EST 1969)... (kafka.l
 og.LogCleaner)}}

 
 * Sent few more messages along with few tombstones (to delete messages present 
in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}}

 +*cleaner log*+

{{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager)
 [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. 
(kafka.log.Log)}}

Compaction has been triggered

+*cleaner log*+ 

{{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to 
Thu Apr 02 07:11:59 EDT 2020)... (kafka.l
 og.LogCleaner)}}

 

Here, above log message says {{discarding tombstones prior to Thu Apr 02 
07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing 
timestamp ({{2020-04-02 07:12:09,908}})   -   {{delete.retention.ms}} (10 
seconds) of my topic. I'm not able to figure out the link between these.

I want to understand at what time does kafka trigger deletion of tombstone 
messages. Can someone explain the tombstone deletion algorithm in simpler terms 
and the reasoning behind it?

 

It's not a bug but I need more information on this. I have posted this in other 
forums like stackoverflow but did not get any reply. The kafka official 
documentation doesn't have this information. If this is not the correct 
platform for this, kindly guide me to the relevant platform.

 

Thanks in advance.

  was:
I was able to delete records in kafka using tombstone messages after few 
attempts. However the algorithm (or logic) that kafka uses to delete these 
tombstone messages is still unclear to me.

>From my observations, I could figure out that there is some relation between 
>last modified time of a segment and deletion of tombstone messages. I have 
>looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit 
>complex to understand.

*Topic details*

 

{{Topic:reddyvel_13   PartitionCount:1ReplicationFactor:3 
Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1
Topic: reddyvel_13  Partition: 0Leader: 1   Replicas: 1,5,2 Isr: 1,5,2}}

I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, 
{{delete.retention.ms=1}}

*Timeline of events*
 * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}}

 

{{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager)
[2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. 
(kafka.log.Log)}}

Compaction has been triggered immediately on this closed segment

 

{{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to 
Wed Dec 31 19:00:00 EST 1969)... (kafka.l
og.LogCleaner)}}
 * Sent few more messages along with few tombstones (to delete messages present 
in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}}

 

{{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddy

[jira] [Created] (KAFKA-9879) How kafka deletes tombstone messages?

2020-04-16 Thread VIkram (Jira)
VIkram created KAFKA-9879:
-

 Summary: How kafka deletes tombstone messages?
 Key: KAFKA-9879
 URL: https://issues.apache.org/jira/browse/KAFKA-9879
 Project: Kafka
  Issue Type: Bug
Reporter: VIkram


I was able to delete records in kafka using tombstone messages after few 
attempts. However the algorithm (or logic) that kafka uses to delete these 
tombstone messages is still unclear to me.

>From my observations, I could figure out that there is some relation between 
>last modified time of a segment and deletion of tombstone messages. I have 
>looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit 
>complex to understand.

*Topic details*

 

{{Topic:reddyvel_13   PartitionCount:1ReplicationFactor:3 
Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1
Topic: reddyvel_13  Partition: 0Leader: 1   Replicas: 1,5,2 Isr: 1,5,2}}

I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, 
{{delete.retention.ms=1}}

*Timeline of events*
 * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}}

 

{{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager)
[2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. 
(kafka.log.Log)}}

Compaction has been triggered immediately on this closed segment

 

{{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to 
Wed Dec 31 19:00:00 EST 1969)... (kafka.l
og.LogCleaner)}}
 * Sent few more messages along with few tombstones (to delete messages present 
in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}}

 

{{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager)
[2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. 
(kafka.log.Log)}}

Compaction has been triggered

 

{{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to 
Thu Apr 02 07:11:59 EDT 2020)... (kafka.l
og.LogCleaner)}}

Here, above log message says {{discarding tombstones prior to Thu Apr 02 
07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing 
timestamp ({{2020-04-02 07:12:09,908}})   -   {{delete.retention.ms}} (10 
seconds) of my topic. I'm not able to figure out the link between these.

I want to understand at what time does kafka trigger deletion of tombstone 
messages. Can someone explain the tombstone deletion algorithm in simpler terms 
and the reasoning behind it?

 

It's not a bug but I need more information on this. I have posted this in other 
forums like stackoverflow but did not get any reply. The kafka official 
documentation doesn't have this information. If this is not the correct 
platform for this, kindly guide me to the relevant platform.

 

Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-13 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036125#comment-17036125
 ] 

VIkram commented on KAFKA-9280:
---

Will the consumer FetchRequest\{from=1000} gets served?

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for t229-0:12 ms has passed since batch creation_
> _}}}_
>  
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer re

[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-03 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028811#comment-17028811
 ] 

VIkram commented on KAFKA-9280:
---

[~bchen225242]  Any update on this. I still face this issue

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for t229-0:12 ms has passed since batch creation_
> _}}}_
>  
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now produce

[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2019-12-09 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991419#comment-16991419
 ] 

VIkram commented on KAFKA-9280:
---

Let's say high watermark for topic partition is 1000 and leader, follower 
replicas have same messages exactly. In this scenario, producer sends a message 
to leader and other replicas and consumer sends a fetch request to leader. Is 
there a possibility here, where a consumer fetch request will be served before 
other replicas fetch request? 

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _C

[jira] [Created] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2019-12-06 Thread VIkram (Jira)
VIkram created KAFKA-9280:
-

 Summary: Duplicate messages are observed in ACK mode ALL
 Key: KAFKA-9280
 URL: https://issues.apache.org/jira/browse/KAFKA-9280
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.1
Reporter: VIkram


In ack mode ALL, leader is sending the message to consumer even before 
receiving the acknowledgements from other replicas. This can lead to 
*+duplicate messages+*.

 

Setup details:
 * 1 zookeeper, 5 brokers
 * Producer: Synchronous
 * Topic: 1 partition, replication factor - 3, min isr - 2

 

Say First replica (Leader), Second replica and Third replica are the three 
replicas of the topic.

 

*Sequence of events:*

a) All brokers are up and running.

b) Clients started running.

c) Kill second replica of the topic.

d) Kill the third replica. Now min isr will not be satisfied.

e) Bring up third replica. Min isr will be satisfied.

 

*Breakdown of step 'd':*
 # Just before producer sends next message, killed third replica with kill -9 
(Leader takes time ~5sec to detect that the broker is down).
 # Producer sent a message to leader.
 # Before the leader knows that third replica is down, it accepts the message 
from producer.
 # Leader forwards the message to third replica.
 # Before receiving ACK from third replica, leader sent the message to consumer.
 # Leader doesn't get an ACK from third replica.
 # Now leader detects that third replica is down and throws 
NOT_ENOUGH_REPLICAS_EXCEPTION.
 # Now leader stops accepting messages from producer.
 # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
after timeout (2min in our case) .
 # So far, producer believes that the message was not received by leader 
whereas the consumer actually received it.
 # Now producer retries sending the same message. (In our application it is the 
next integer we send).
 # Now when second/third replica is up, leader accepts the message and sends 
the same message to consumer. *Thus sending duplicates.*

 

 

*Logs:*
 # 2-3 seconds before producer sends next message, killed third replica with 
kill -9 (Leader takes time ~5sec to detect that the broker is down).

_{{{_

_> kill -9 49596_

_}}}_

 __ 
 # Producer sent a message to leader.

_{{{_

_[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = [], 
isReadOnly = false), key=null, value=p229-4, timestamp=null)_

_}}}_

 
 # Before the leader knows that third replica is down, it accepts the message 
from producer.
 # Leader forwards the message to third replica.
 # Before receiving ACK from third replica, leader sent the message to consumer.

_{{{_

 _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size = 
-1, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly 
= false), key = null, value = p229-4)_

_}}}_

 __ 
 # Leader doesn't get an ACK from third replica.
 # Now leader detects that third replica is down and throws 
NOT_ENOUGH_REPLICAS_EXCEPTION.

_{{{_

_[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition t229-0 (kafka.server.ReplicaManager)_

_org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for 
partition t229-0_

_}}}_

 
 # Now leader stops accepting messages from producer.
 # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
after timeout (2min in our case) .

_{{{_

 _java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
t229-0:12 ms_

_has passed since batch creation_

    _at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_

    _at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_

    _at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_

_Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
record(s) for t229-0:12 ms has passed since batch creation_

_}}}_

 
 # So far, producer believes that the message was not received by leader 
whereas the consumer actually received it.
 # Now producer retries sending the same message. (In our application it is the 
next integer we send).
 # Now when second/third replica is up, leader accepts the message and sends 
the same to consumer. Thus sending duplicates.

 

Ideally, in ACK mode all it is expected that leader sends message to consumer 
only after it receives ack from all other replicas. But this is not happening.

 

+*Question*+

1) In ack =all case, Does leader send message to consumer only after all 
in