[jira] [Updated] (KAFKA-16250) Consumer group coordinator should perform sanity check on the offset commits.
[ https://issues.apache.org/jira/browse/KAFKA-16250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-16250: --- Description: The current coordinator does not validate the offset commits before persisting it in the record. In a real case, though, I am not sure why the consumer generates the offset commits with a consumer offset valued at -2, the "illegal" consumer offset value caused confusion with the admin cli when describing the consumer group. The consumer offset field is marked "". was: The current coordinator does not validate the offset commits before persisting it in the record. In a real case, though, I am not sure why the consumer generates the offset commits with a consumer offset valued at -2, the "illegal" consumer offset value caused confusion with the admin cli when describing the consumer group. The consumer offset field is marked "-". > Consumer group coordinator should perform sanity check on the offset commits. > - > > Key: KAFKA-16250 > URL: https://issues.apache.org/jira/browse/KAFKA-16250 > Project: Kafka > Issue Type: Improvement >Reporter: Calvin Liu >Priority: Major > > The current coordinator does not validate the offset commits before > persisting it in the record. > In a real case, though, I am not sure why the consumer generates the offset > commits with a consumer offset valued at -2, the "illegal" consumer offset > value caused confusion with the admin cli when describing the consumer group. > The consumer offset field is marked "". > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-16217. Fix Version/s: (was: 3.6.3) Resolution: Fixed > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842049#comment-17842049 ] Calvin Liu commented on KAFKA-16217: Resolve the ticket for now. If we want to merge the fix into 3.6, we can open the ticket again. > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840502#comment-17840502 ] Calvin Liu commented on KAFKA-16217: [~kirktrue] The cherry-pick for the 3.7 is merged. For the 3.6.3 [~chia7712] mentioned we may not have the 3.6.3 release. https://github.com/apache/kafka/pull/15791 > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15585) DescribeTopic API
[ https://issues.apache.org/jira/browse/KAFKA-15585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15585. Resolution: Fixed > DescribeTopic API > - > > Key: KAFKA-15585 > URL: https://issues.apache.org/jira/browse/KAFKA-15585 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > Adding the new DescribeTopic API + the admin client and server-side handling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16540) Update partitions when the min isr config is updated.
Calvin Liu created KAFKA-16540: -- Summary: Update partitions when the min isr config is updated. Key: KAFKA-16540 URL: https://issues.apache.org/jira/browse/KAFKA-16540 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu If the min isr config is changed, we need to update the partitions accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15586) Clean shutdown detection, server side
[ https://issues.apache.org/jira/browse/KAFKA-15586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15586. Resolution: Fixed > Clean shutdown detection, server side > - > > Key: KAFKA-15586 > URL: https://issues.apache.org/jira/browse/KAFKA-15586 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > Upon the broker registration, if the broker has an unclean shutdown, it > should be removed from all the ELRs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16479) Add pagination supported describeTopic interface
Calvin Liu created KAFKA-16479: -- Summary: Add pagination supported describeTopic interface Key: KAFKA-16479 URL: https://issues.apache.org/jira/browse/KAFKA-16479 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu During the DescribeTopicPartitions API implementations, we found it awkward to place the pagination logic within the current admin client describe topic interface. So, in order to change the interface, we may need to have a boarder discussion like creating a KIP. Or even a step forward, to discuss a general client side pagination framework. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15583) High watermark can only advance if ISR size is larger than min ISR
[ https://issues.apache.org/jira/browse/KAFKA-15583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15583. Resolution: Fixed > High watermark can only advance if ISR size is larger than min ISR > -- > > Key: KAFKA-15583 > URL: https://issues.apache.org/jira/browse/KAFKA-15583 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > This is the new high watermark advancement requirement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16281) Possible IllegalState with KIP-996
[ https://issues.apache.org/jira/browse/KAFKA-16281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819007#comment-17819007 ] Calvin Liu commented on KAFKA-16281: [~alivshits] Jack has corrected that the issue is with KIP-996 not KIP-966 > Possible IllegalState with KIP-996 > -- > > Key: KAFKA-16281 > URL: https://issues.apache.org/jira/browse/KAFKA-16281 > Project: Kafka > Issue Type: Task > Components: kraft >Reporter: Jack Vanlightly >Priority: Major > > I have a TLA+ model of KIP-996 (pre-vote) and I have identified an > IllegalState exception that would occur with the existing > MaybeHandleCommonResponse behavior. > The issue stems from the fact that a leader, let's call it r1, can resign > (either due to a restart or check quorum) and then later initiate a pre-vote > where it ends up in the same epoch as before. When r1 receives a response > from r2 who believes that r1 is still the leader, the logic in > MaybeHandleCommonResponse tries to transition r1 to follower of itself, > causing an IllegalState exception to be raised. > This is an example history: > # r1 is the leader in epoch 1. > # r1 quorum resigns, or restarts and resigns. > # r1 experiences an election timeout and transitions to Prospective. > # r1 sends a pre vote request to its peers. > # r2 thinks r1 is still the leader, sends a vote response, not granting its > vote and setting leaderId=r1 and epoch=1. > # r1 receives the vote response and executes MaybeHandleCommonResponse which > tries to transition r1 to Follower of itself and an illegal state occurs. > The relevant else if statement in MaybeHandleCommonResponse is here: > [https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538] > In the TLA+ specification, I fixed this issue by adding a fourth condition to > this statement, that replica must not be in the Prospective state. > [https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336|https://github.com/Vanlightly/kafka-tlaplus/blob/421f170ba4bd8c5eceb36b88b47901ee3d9c3d2a/kraft/kip_996/kraft_kip_996_functions.tla#L336] > > Note, that I also had to implement the sending of the BeginQuorumEpoch > request by the leader to prevent a replica getting stuck in Prospective. If > the replica r2 has an election timeout but due to a transient connectivity > issue with the leader, but has also fallen behind slightly, then r2 will > remain stuck as a Prospective because none of its peers, who have > connectivity to the leader, will grant it a pre-vote. To enable r2 to become > a functional member again, the leader must give it a nudge with a > BeginQuorumEpoch request. The alternative (which I have also modeled) is for > a Prospective to transition to Follower when it receives a negative pre-vote > response with a non-null leaderId. This comes with a separate liveness issue > which I can discuss if this "transition to Follower" approach is interesting. > Either way, a stuck Prospective needs a way to transition to follower > eventually, if all other members have a stable leader. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16250) Consumer group coordinator should perform sanity check on the offset commits.
Calvin Liu created KAFKA-16250: -- Summary: Consumer group coordinator should perform sanity check on the offset commits. Key: KAFKA-16250 URL: https://issues.apache.org/jira/browse/KAFKA-16250 Project: Kafka Issue Type: Improvement Reporter: Calvin Liu The current coordinator does not validate the offset commits before persisting it in the record. In a real case, though, I am not sure why the consumer generates the offset commits with a consumer offset valued at -2, the "illegal" consumer offset value caused confusion with the admin cli when describing the consumer group. The consumer offset field is marked "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15665) Enforce ISR to have all target replicas when complete partition reassignment
[ https://issues.apache.org/jira/browse/KAFKA-15665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15665: --- Summary: Enforce ISR to have all target replicas when complete partition reassignment (was: Enforce min ISR when complete partition reassignment) > Enforce ISR to have all target replicas when complete partition reassignment > > > Key: KAFKA-15665 > URL: https://issues.apache.org/jira/browse/KAFKA-15665 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > Current partition reassignment can be completed when the new ISR is under min > ISR. We should fix this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815402#comment-17815402 ] Calvin Liu commented on KAFKA-16217: [~kirktrue] I have a UT which simulate the close issue [https://github.com/apache/kafka/pull/15336] Hope it helps to resolve the bug. > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Kirk True >Priority: Major > Labels: transactions > Fix For: 3.6.2, 3.7.1 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-16217: --- Description: The producer is stuck during the close. It keeps retrying to abort the transaction but it never succeeds. {code:java} [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] org.apache.kafka.clients.producer.internals.Sender run - [Producer clientId=producer-transaction-ben ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] Error in kafka producer I/O thread while aborting transaction: java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` because the previous call to `commitTransaction` timed out and must be retried at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) at java.base/java.lang.Thread.run(Thread.java:1583) at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) {code} With the additional log, I found the root cause. If the producer is in a bad transaction state(in my case, the TransactionManager.pendingTransition was set to commitTransaction and did not get cleaned), then the producer calls close and tries to abort the existing transaction, the producer will get stuck in the transaction abortion. It is related to the fix [https://github.com/apache/kafka/pull/13591]. was: The producer is stuck during the close. It keeps retrying to abort the transaction but it never succeeds. {code:java} [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] org.apache.kafka.clients.producer.internals.Sender run - [Producer clientId=producer-transaction-ben ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] Error in kafka producer I/O thread while aborting transaction: java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` because the previous call to `commitTransaction` timed out and must be retried at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) at java.base/java.lang.Thread.run(Thread.java:1583) at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) {code} With the additional log, I found the root cause. If the producer is in a bad transaction state(in my case, the TransactionManager.pendingTransition was set to commitTransaction and did not get cleaned), before the producer calls close and tries to abort the existing transaction, the producer will get stuck in the transaction abortion. It is related to the fix [https://github.com/apache/kafka/pull/13591]. > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Calvin Liu >Priority: Major > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at
[jira] [Created] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
Calvin Liu created KAFKA-16217: -- Summary: Transactional producer stuck in IllegalStateException during close Key: KAFKA-16217 URL: https://issues.apache.org/jira/browse/KAFKA-16217 Project: Kafka Issue Type: Bug Components: clients Reporter: Calvin Liu The producer is stuck during the close. It keeps retrying to abort the transaction but it never succeeds. {code:java} [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] org.apache.kafka.clients.producer.internals.Sender run - [Producer clientId=producer-transaction-ben ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] Error in kafka producer I/O thread while aborting transaction: java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` because the previous call to `commitTransaction` timed out and must be retried at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) at java.base/java.lang.Thread.run(Thread.java:1583) at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) {code} With the additional log, I found the root cause. If the producer is in a bad transaction state(in my case, the TransactionManager.pendingTransition was set to commitTransaction and did not get cleaned), before the producer calls close and tries to abort the existing transaction, the producer will get stuck in the transaction abortion. It is related to the fix [https://github.com/apache/kafka/pull/13591]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15873) Improve the performance of the DescribeTopicPartitions API
Calvin Liu created KAFKA-15873: -- Summary: Improve the performance of the DescribeTopicPartitions API Key: KAFKA-15873 URL: https://issues.apache.org/jira/browse/KAFKA-15873 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu The current API involves sorting, copying, checking topics which will be out of the response limit. We should think about how to improve the performance for this API as it will be a main API for querying partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15820) Add a metric to track the number of partitions under min ISR
Calvin Liu created KAFKA-15820: -- Summary: Add a metric to track the number of partitions under min ISR Key: KAFKA-15820 URL: https://issues.apache.org/jira/browse/KAFKA-15820 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15584) ELR leader election
[ https://issues.apache.org/jira/browse/KAFKA-15584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15584. Resolution: Fixed > ELR leader election > --- > > Key: KAFKA-15584 > URL: https://issues.apache.org/jira/browse/KAFKA-15584 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > With the ELR, here are the changes related to the leader election: > * ISR is allowed to be empty. > * ELR can be elected when ISR is empty > * When ISR and ELR are both empty, the lastKnownLeader can be uncleanly > elected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15762) ClusterConnectionStatesTest.testSingleIP is flaky
Calvin Liu created KAFKA-15762: -- Summary: ClusterConnectionStatesTest.testSingleIP is flaky Key: KAFKA-15762 URL: https://issues.apache.org/jira/browse/KAFKA-15762 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu Build / JDK 11 and Scala 2.13 / testSingleIP() – org.apache.kafka.clients.ClusterConnectionStatesTest {code:java} org.opentest4j.AssertionFailedError: expected: <1> but was: <2> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:527) at app//org.apache.kafka.clients.ClusterConnectionStatesTest.testSingleIP(ClusterConnectionStatesTest.java:267) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15761) ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky
Calvin Liu created KAFKA-15761: -- Summary: ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky Key: KAFKA-15761 URL: https://issues.apache.org/jira/browse/KAFKA-15761 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu Build / JDK 21 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest {code:java} java.lang.AssertionError: Failed to stop connector and tasks within 12ms at org.junit.Assert.fail(Assert.java:89)at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.runningConnectorAndTasksRestart(ConnectorRestartApiIntegrationTest.java:273) at org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector(ConnectorRestartApiIntegrationTest.java:231) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15760) org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated is flaky
Calvin Liu created KAFKA-15760: -- Summary: org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated is flaky Key: KAFKA-15760 URL: https://issues.apache.org/jira/browse/KAFKA-15760 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest {code:java} java.util.concurrent.TimeoutException: testTaskRequestWithOldStartMsGetsUpdated() timed out after 12 milliseconds at org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) at org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15759) DescribeClusterRequestTest is flaky
Calvin Liu created KAFKA-15759: -- Summary: DescribeClusterRequestTest is flaky Key: KAFKA-15759 URL: https://issues.apache.org/jira/browse/KAFKA-15759 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest {code:java} org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) at kafka.server.DescribeClusterRequestTest.$anonfun$testDescribeClusterRequest$4(DescribeClusterRequestTest.scala:99) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at kafka.server.DescribeClusterRequestTest.testDescribeClusterRequest(DescribeClusterRequestTest.scala:86) at kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(DescribeClusterRequestTest.scala:53) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.
[ https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15690: --- Description: EosIntegrationTest shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, processing threads = false] {code:java} org.junit.runners.model.TestTimedOutException: test timed out after 600 seconds at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:) at org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821) at org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779) at org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837) org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, topic=multiPartitionInputTopic, partition=1, offset=15, stacktrace=java.lang.RuntimeException: Detected we've been interrupted. at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) {code} shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing threads = false] {code:java} java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274) at org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, topic=multiPartitionInputTopic, partition=1, offset=15, stacktrace=java.lang.RuntimeException: Detected we've been interrupted. at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) {code} shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing threads = false] {code:java} org.opentest4j.AssertionFailedError: Condition not met within timeout 6. StreamsTasks did not request commit. ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) java.lang.IllegalStateException: Replica [Topic=__transaction_state,Partition=2,Replica=1] should be in the OfflineReplica,ReplicaDeletionStarted states before moving to ReplicaDeletionIneligible state. Instead it is in OnlineReplica state at kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442) at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164) at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164) at scala.collection.immutable.List.foreach(List.scala:333) {code} They are running long and may relate to timeout. was: Finding the following integration tests flaky. EosIntegrationTest { * shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, processing
[jira] [Created] (KAFKA-15700) FetchFromFollowerIntegrationTest is flaky
Calvin Liu created KAFKA-15700: -- Summary: FetchFromFollowerIntegrationTest is flaky Key: KAFKA-15700 URL: https://issues.apache.org/jira/browse/KAFKA-15700 Project: Kafka Issue Type: Bug Reporter: Calvin Liu It may relate to inappropriate timeout. testRackAwareRangeAssignor(String).quorum=zk {code:java} java.util.concurrent.TimeoutException at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$13(FetchFromFollowerIntegrationTest.scala:229) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$13$adapted(FetchFromFollowerIntegrationTest.scala:228) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15699: --- Description: It may relate to inappropriate test timeout testReplicateSourceDefault() {code:java} org.opentest4j.AssertionFailedError: `delete.retention.ms` should be different, because it's in exclude filter! ==> expected: not equal but was: <8640> at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code} testOffsetSyncsTopicsOnTarget() {code:java} java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out at 1698275715878 after 1 attempt(s) at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code} was: May relate to inaproparate testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget() } > MirrorConnectorsIntegrationBaseTest is flaky > > > Key: KAFKA-15699 > URL: https://issues.apache.org/jira/browse/KAFKA-15699 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Priority: Major > > It may relate to inappropriate test timeout > testReplicateSourceDefault() > {code:java} > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be > different, because it's in exclude filter! ==> expected: not equal but was: > <8640> > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code} > testOffsetSyncsTopicsOnTarget() > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, > deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out > at 1698275715878 after 1 attempt(s)at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) >at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15699: --- Description: May relate to inaproparate testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget() } > MirrorConnectorsIntegrationBaseTest is flaky > > > Key: KAFKA-15699 > URL: https://issues.apache.org/jira/browse/KAFKA-15699 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Priority: Major > > May relate to inaproparate > testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget() } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.
[ https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15690: --- Summary: EosIntegrationTest is flaky. (was: Flaky integration tests) > EosIntegrationTest is flaky. > > > Key: KAFKA-15690 > URL: https://issues.apache.org/jira/browse/KAFKA-15690 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Priority: Major > > Finding the following integration tests flaky. > EosIntegrationTest { > * > shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, > processing threads = false] > * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing > threads = false] > * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing > threads = false] > } > MirrorConnectorsIntegrationBaseTest { > * testReplicateSourceDefault() > * testOffsetSyncsTopicsOnTarget() > } > FetchFromFollowerIntegrationTest { > * testRackAwareRangeAssignor(String).quorum=zk > } > They are running long and may relate to timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky
Calvin Liu created KAFKA-15699: -- Summary: MirrorConnectorsIntegrationBaseTest is flaky Key: KAFKA-15699 URL: https://issues.apache.org/jira/browse/KAFKA-15699 Project: Kafka Issue Type: Bug Reporter: Calvin Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15690) Flaky integration tests
Calvin Liu created KAFKA-15690: -- Summary: Flaky integration tests Key: KAFKA-15690 URL: https://issues.apache.org/jira/browse/KAFKA-15690 Project: Kafka Issue Type: Bug Reporter: Calvin Liu Finding the following integration tests flaky. EosIntegrationTest { * shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, processing threads = false] * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing threads = false] * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing threads = false] } MirrorConnectorsIntegrationBaseTest { * testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget() } FetchFromFollowerIntegrationTest { * testRackAwareRangeAssignor(String).quorum=zk } They are running long and may relate to timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15665) Enforce min ISR when complete partition reassignment
Calvin Liu created KAFKA-15665: -- Summary: Enforce min ISR when complete partition reassignment Key: KAFKA-15665 URL: https://issues.apache.org/jira/browse/KAFKA-15665 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu Current partition reassignment can be completed when the new ISR is under min ISR. We should fix this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15581) Introduce ELR
[ https://issues.apache.org/jira/browse/KAFKA-15581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15581. Reviewer: David Arthur Resolution: Fixed > Introduce ELR > - > > Key: KAFKA-15581 > URL: https://issues.apache.org/jira/browse/KAFKA-15581 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > Introduce the PartitionRecord, PartitionChangeRecord and the basic ELR > handling in the controller -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15590) Replica.updateFetchStateOrThrow should also fence updates with stale leader epoch
[ https://issues.apache.org/jira/browse/KAFKA-15590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15590: --- Summary: Replica.updateFetchStateOrThrow should also fence updates with stale leader epoch (was: Replica.updateFetchState should also fence updates with stale leader epoch) > Replica.updateFetchStateOrThrow should also fence updates with stale leader > epoch > - > > Key: KAFKA-15590 > URL: https://issues.apache.org/jira/browse/KAFKA-15590 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Priority: Major > > This is a follow-up ticket for KAFKA-15221. > There is another type of race that a fetch request with stale leader epoch > can update the fetch state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15590) Replica.updateFetchState should also fence updates with stale leader epoch
Calvin Liu created KAFKA-15590: -- Summary: Replica.updateFetchState should also fence updates with stale leader epoch Key: KAFKA-15590 URL: https://issues.apache.org/jira/browse/KAFKA-15590 Project: Kafka Issue Type: Bug Reporter: Calvin Liu This is a follow-up ticket for KAFKA-15221. There is another type of race that a fetch request with stale leader epoch can update the fetch state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15586) Clean shutdown detection, server side
Calvin Liu created KAFKA-15586: -- Summary: Clean shutdown detection, server side Key: KAFKA-15586 URL: https://issues.apache.org/jira/browse/KAFKA-15586 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu Upon the broker registration, if the broker has an unclean shutdown, it should be removed from all the ELRs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15585) DescribeTopic API
Calvin Liu created KAFKA-15585: -- Summary: DescribeTopic API Key: KAFKA-15585 URL: https://issues.apache.org/jira/browse/KAFKA-15585 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu Adding the new DescribeTopic API + the admin client and server-side handling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15584) ELR leader election
Calvin Liu created KAFKA-15584: -- Summary: ELR leader election Key: KAFKA-15584 URL: https://issues.apache.org/jira/browse/KAFKA-15584 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu With the ELR, here are the changes related to the leader election: * ISR is allowed to be empty. * ELR can be elected when ISR is empty * When ISR and ELR are both empty, the lastKnownLeader can be uncleanly elected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15583) High watermark can only advance if ISR size is larger than min ISR
Calvin Liu created KAFKA-15583: -- Summary: High watermark can only advance if ISR size is larger than min ISR Key: KAFKA-15583 URL: https://issues.apache.org/jira/browse/KAFKA-15583 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu This is the new high watermark advancement requirement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15582) Clean shutdown detection, broker side
Calvin Liu created KAFKA-15582: -- Summary: Clean shutdown detection, broker side Key: KAFKA-15582 URL: https://issues.apache.org/jira/browse/KAFKA-15582 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu The clean shutdown file can now include the broker epoch before shutdown. During the broker start process, the broker should extract the broker epochs from the clean shutdown files. If successful, send the broker epoch through the broker registration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15581) Introduce ELR
Calvin Liu created KAFKA-15581: -- Summary: Introduce ELR Key: KAFKA-15581 URL: https://issues.apache.org/jira/browse/KAFKA-15581 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu Introduce the PartitionRecord, PartitionChangeRecord and the basic ELR handling in the controller -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15332) Eligible Leader Replicas
[ https://issues.apache.org/jira/browse/KAFKA-15332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15332: --- Description: A root ticket for the KIP-966 The KIP has accepted [https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas] The delivery is divided into 2 parts, ELR and unclean recovery. was: A root ticket for the KIP-966 The KIP has accepted [https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas] The delivery is divided to 2 parts, ELR and unclean recovery. > Eligible Leader Replicas > > > Key: KAFKA-15332 > URL: https://issues.apache.org/jira/browse/KAFKA-15332 > Project: Kafka > Issue Type: New Feature >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > A root ticket for the KIP-966 > The KIP has accepted > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas] > The delivery is divided into 2 parts, ELR and unclean recovery. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15332) Eligible Leader Replicas
[ https://issues.apache.org/jira/browse/KAFKA-15332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15332: --- Description: A root ticket for the KIP-966 The KIP has accepted [https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas] The delivery is divided to 2 parts, ELR and unclean recovery. was: A root ticket for the KIP-966 The KIP has accepted https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas > Eligible Leader Replicas > > > Key: KAFKA-15332 > URL: https://issues.apache.org/jira/browse/KAFKA-15332 > Project: Kafka > Issue Type: New Feature >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > A root ticket for the KIP-966 > The KIP has accepted > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas] > The delivery is divided to 2 parts, ELR and unclean recovery. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15580) KIP-966: Unclean Recovery
Calvin Liu created KAFKA-15580: -- Summary: KIP-966: Unclean Recovery Key: KAFKA-15580 URL: https://issues.apache.org/jira/browse/KAFKA-15580 Project: Kafka Issue Type: New Feature Reporter: Calvin Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15579) KIP-966: Eligible Leader Replicas
Calvin Liu created KAFKA-15579: -- Summary: KIP-966: Eligible Leader Replicas Key: KAFKA-15579 URL: https://issues.apache.org/jira/browse/KAFKA-15579 Project: Kafka Issue Type: New Feature Reporter: Calvin Liu Assignee: Calvin Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15332) Eligible Leader Replicas
[ https://issues.apache.org/jira/browse/KAFKA-15332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15332: --- Description: A root ticket for the KIP-966 The KIP has accepted https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas was:A root ticket for the KIP-966 > Eligible Leader Replicas > > > Key: KAFKA-15332 > URL: https://issues.apache.org/jira/browse/KAFKA-15332 > Project: Kafka > Issue Type: New Feature >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > A root ticket for the KIP-966 > The KIP has accepted > https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15221) Potential race condition between requests from rebooted followers
[ https://issues.apache.org/jira/browse/KAFKA-15221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762134#comment-17762134 ] Calvin Liu commented on KAFKA-15221: [~satish.duggana] The PR is pending review, it targets 3.6.0 and other 3.5.* versions depending on when we can finish the code review. cc [~dajac], [~david.mao] > Potential race condition between requests from rebooted followers > - > > Key: KAFKA-15221 > URL: https://issues.apache.org/jira/browse/KAFKA-15221 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > When the leader processes the fetch request, it does not acquire locks when > updating the replica fetch state. Then there can be a race between the fetch > requests from a rebooted follower. > T0, broker 1 sends a fetch to broker 0(leader). At the moment, broker 1 is > not in ISR. > T1, broker 1 crashes. > T2 broker 1 is back online and receives a new broker epoch. Also, it sends a > new Fetch request. > T3 broker 0 receives the old fetch requests and decides to expand the ISR. > T4 Right before broker 0 starts to fill the AlterPartitoin request, the new > fetch request comes in and overwrites the fetch state. Then broker 0 uses the > new broker epoch on the AlterPartition request. > In this way, the AlterPartition request can get around KIP-903 and wrongly > update the ISR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759179#comment-17759179 ] Calvin Liu commented on KAFKA-15353: [~mimaison] If I understand correctly, in ZK mode, the leader can send AlterPartition request to empty the ISR. But the leader will continue to serve as the leader. If the leader stays alive, the previous ISR members except itself will be added back. I guess the partition will stay in this wield "leader not in ISR" state until the leadership is changed. If the leader fails before any followers can be added to the ISR, an unclean leader election will be required. > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758188#comment-17758188 ] Calvin Liu edited comment on KAFKA-15353 at 8/25/23 10:50 PM: -- [~mimaison] >From my understanding, in Kraft mode # -It only happens in Kraft mode.- # The issue can happen during the roll where the broker has to downgrade the AlterPartition requests. # If the leader needs to resend the AlterPartition request, the new request will have an empty ISR. # The controller does not update the ISR field because it avoids persisting an empty ISR. However, the partition will become leaderless and offline. The problem can be resolved if any ISR broker gets fenced/rebooted. was (Author: JIRAUSER298384): [~mimaison] >From my understanding # -It only happens in Kraft mode.- # The issue can happen during the roll where the broker has to downgrade the AlterPartition requests. # If the leader needs to resend the AlterPartition request, the new request will have an empty ISR. # The controller does not update the ISR field because it avoids persisting an empty ISR. However, the partition will become leaderless and offline. The problem can be resolved if any ISR broker gets fenced/rebooted. > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758227#comment-17758227 ] Calvin Liu commented on KAFKA-15353: You are right, since IBP 2.7IV2, the AlterPartition request is used in ZK mode. > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758188#comment-17758188 ] Calvin Liu edited comment on KAFKA-15353 at 8/23/23 8:12 PM: - [~mimaison] >From my understanding # -It only happens in Kraft mode.- # The issue can happen during the roll where the broker has to downgrade the AlterPartition requests. # If the leader needs to resend the AlterPartition request, the new request will have an empty ISR. # The controller does not update the ISR field because it avoids persisting an empty ISR. However, the partition will become leaderless and offline. The problem can be resolved if any ISR broker gets fenced/rebooted. was (Author: JIRAUSER298384): [~mimaison] >From my understanding # It only happens in Kraft mode. # The issue can happen during the roll where the broker has to downgrade the AlterPartition requests. # If the leader needs to resend the AlterPartition request, the new request will have an empty ISR. # The controller does not update the ISR field because it avoids persisting an empty ISR. However, the partition will become leaderless and offline. The problem can be resolved if any ISR broker gets fenced/rebooted. > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758188#comment-17758188 ] Calvin Liu commented on KAFKA-15353: [~mimaison] >From my understanding # It only happens in Kraft mode. # The issue can happen during the roll where the broker has to downgrade the AlterPartition requests. # If the leader needs to resend the AlterPartition request, the new request will have an empty ISR. # The controller does not update the ISR field because it avoids persisting an empty ISR. However, the partition will become leaderless and offline. The problem can be resolved if any ISR broker gets fenced/rebooted. > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755648#comment-17755648 ] Calvin Liu commented on KAFKA-15353: Opened a PR https://github.com/apache/kafka/pull/14236 > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu reassigned KAFKA-15353: -- Assignee: Calvin Liu > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request
[ https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755614#comment-17755614 ] Calvin Liu commented on KAFKA-15353: Hey [~showuon], thanks for pointing it out! Will work on the fix. > Empty ISR returned from controller after AlterPartition request > --- > > Key: KAFKA-15353 > URL: https://issues.apache.org/jira/browse/KAFKA-15353 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > In > [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR], > (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we > bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field > instead of `NewIsr` one. And when building the request for older version, > we'll manually convert/downgrade the request into the older version for > backward compatibility > [here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96], > to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` > field, and then clear the `NewIsrWithEpochs` field. > > The problem is, when the AlterPartitionRequest sent out for the first time, > if there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the > retry, we'll build the AlterPartitionRequest again. But this time, the > request data is the one that already converted above. At this point, when we > try to extract the ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll > send out an AlterPartition request with empty ISR, and impacting the kafka > availability. > > From the log, I can see this: > {code:java} > [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated > to (under-min-isr) and version updated to 9 (kafka.cluster.Partition) > ... > [2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing > append operation on partition test_topic-1 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException: > The size of the current ISR Set() is insufficient to satisfy the min.isr > requirement of 2 for partition test_topic-1 {code} > > h4. *Impact:* > This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 > or later. During the rolling upgrade, there will be some nodes in v3.5.0, and > some are not. So, for the node in v3.5.0 will try to build an old version of > AlterPartitionRequest. And then, if it happen to have some transient error > during the AlterPartitionRequest send, the ISR will be empty and no producers > will be able to write data to the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15332) Eligible Leader Replicas
Calvin Liu created KAFKA-15332: -- Summary: Eligible Leader Replicas Key: KAFKA-15332 URL: https://issues.apache.org/jira/browse/KAFKA-15332 Project: Kafka Issue Type: New Feature Reporter: Calvin Liu Assignee: Calvin Liu A root ticket for the KIP-966 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15221) Potential race condition between requests from rebooted followers
Calvin Liu created KAFKA-15221: -- Summary: Potential race condition between requests from rebooted followers Key: KAFKA-15221 URL: https://issues.apache.org/jira/browse/KAFKA-15221 Project: Kafka Issue Type: Bug Affects Versions: 3.5.0 Reporter: Calvin Liu Assignee: Calvin Liu Fix For: 3.6.0, 3.5.1 When the leader processes the fetch request, it does not acquire locks when updating the replica fetch state. Then there can be a race between the fetch requests from a rebooted follower. T0, broker 1 sends a fetch to broker 0(leader). At the moment, broker 1 is not in ISR. T1, broker 1 crashes. T2 broker 1 is back online and receives a new broker epoch. Also, it sends a new Fetch request. T3 broker 0 receives the old fetch requests and decides to expand the ISR. T4 Right before broker 0 starts to fill the AlterPartitoin request, the new fetch request comes in and overwrites the fetch state. Then broker 0 uses the new broker epoch on the AlterPartition request. In this way, the AlterPartition request can get around KIP-903 and wrongly update the ISR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-14139. Resolution: Fixed > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Calvin Liu >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712165#comment-17712165 ] Calvin Liu commented on KAFKA-14139: It is resolved in https://issues.apache.org/jira/browse/KAFKA-14617 > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Calvin Liu >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684045#comment-17684045 ] Calvin Liu commented on KAFKA-14139: Hi Alex, just posted the KIP. Let me know if you have any comments. https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Calvin Liu >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17683497#comment-17683497 ] Calvin Liu commented on KAFKA-14139: Hi Alex, I am not aware of such simulation frameworks existed. Will take a look at the PR. Thanks! > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Calvin Liu >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682775#comment-17682775 ] Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:38 PM: -- Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It will have protocol changes to include the broker epoch in the AlterPartition and Fetch requests. Will share more details when the KIP is published. Sorry I did not assign the ticket to me earlier. Can you assign the ticket to me? was (Author: JIRAUSER298384): Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It will have protocol changes to include the broker epoch in the AlterPartition and Fetch requests. Will share more details when the KIP is published. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682775#comment-17682775 ] Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:13 PM: -- Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It will have protocol changes to include the broker epoch in the AlterPartition and Fetch requests. Will share more details when the KIP is published. was (Author: JIRAUSER298384): Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. Including the broker epoch in the AlterPartition and Fetch request is preferable. Will share more details when the KIP is published. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682775#comment-17682775 ] Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:01 PM: -- Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. Including the broker epoch in the AlterPartition and Fetch request is preferable. Will share more details when the KIP is published. was (Author: JIRAUSER298384): Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It is almost done, will keep you posted. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682775#comment-17682775 ] Calvin Liu commented on KAFKA-14139: Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It is almost done, will keep you posted. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14617) Replicas with stale broker epoch should not be allowed to join the ISR
[ https://issues.apache.org/jira/browse/KAFKA-14617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu reassigned KAFKA-14617: -- Assignee: Calvin Liu > Replicas with stale broker epoch should not be allowed to join the ISR > -- > > Key: KAFKA-14617 > URL: https://issues.apache.org/jira/browse/KAFKA-14617 > Project: Kafka > Issue Type: Improvement >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14617) Replicas with stale broker epoch should not be allowed to join the ISR
Calvin Liu created KAFKA-14617: -- Summary: Replicas with stale broker epoch should not be allowed to join the ISR Key: KAFKA-14617 URL: https://issues.apache.org/jira/browse/KAFKA-14617 Project: Kafka Issue Type: Improvement Reporter: Calvin Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)