[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776989#comment-17776989
 ] 

Luke Chen commented on KAFKA-15620:
---

[~ckamal], thanks for the info.
[~h...@pinterest.com], I've backported into 3.6 branch. It'll be included in 
v3.6.1 release. Thanks.

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 leader epochs

[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention breach

2023-10-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15479:
--
Fix Version/s: 3.6.1

> Remote log segments should be considered once for retention breach
> --
>
> Key: KAFKA-15479
> URL: https://issues.apache.org/jira/browse/KAFKA-15479
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> When a remote log segment contains multiple epoch, then it gets considered 
> for multiple times during breach by retention size/time/start-offset. This 
> will affect the deletion by remote log retention size as it deletes the 
> number of segments lesser than expected. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-17 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776182#comment-17776182
 ] 

Luke Chen commented on KAFKA-15620:
---

[~h...@pinterest.com], thanks for reporting this issue. I'll check it tomorrow.

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 leader epochs in this segment: 
> *segmentLeaderEpochs=\{5=6387830,

[jira] [Commented] (KAFKA-15619) Deleted topics will come back again

2023-10-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775996#comment-17775996
 ] 

Luke Chen commented on KAFKA-15619:
---

So, in this case, I think it works as designed. For the test, it might need to 
wait until consumer/producer closed, then delete the topic, otherwise, it's 
possible the topic will be AUTO created. WDYT?

> Deleted topics will come back again
> ---
>
> Key: KAFKA-15619
> URL: https://issues.apache.org/jira/browse/KAFKA-15619
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Deng Ziming
>Priority: Major
>
> Deleted topics will come back again in Apache Spark structured streaming 
> stress test after upgrade Kafka from 3.4.0 to 3.5.0, related ticket is: 
> https://issues.apache.org/jira/browse/SPARK-45529 , the test randomly 
> starts/stops/adds data/add partitions/delete topic/add topic/checks the 
> result in a loop, I finally found that a deleted topic will come back again 
> after some time.
> By constantly reseting the head of branch-3.5 and using {{gradlew install}} 
> to repackage and rerunning of the stress test, I have basically inferred that 
> this bug comes from [https://github.com/apache/kafka/pull/12590]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15619) Deleted topics will come back again

2023-10-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775985#comment-17775985
 ] 

Luke Chen commented on KAFKA-15619:
---

[~dengziming], I'd like to get more info why the 
[PR|https://github.com/apache/kafka/pull/12590] will make the deleted topics 
back? Because we're trying to close the fetch session, and while it's closing, 
it found the topic is deleted, so AUTO create it? Does the broker enable 
"auto.create.topics.enable"?

> Deleted topics will come back again
> ---
>
> Key: KAFKA-15619
> URL: https://issues.apache.org/jira/browse/KAFKA-15619
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Deng Ziming
>Priority: Major
>
> Deleted topics will come back again in Spark structured streaming test after 
> upgrade Kafka from 3.4.0 to 3.5.0, related ticket is:  
> https://issues.apache.org/jira/browse/SPARK-45529
>  
> I have basically inferred that this bug comes from 
> https://github.com/apache/kafka/pull/12590



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775725#comment-17775725
 ] 

Luke Chen commented on KAFKA-15609:
---

Wow, another edge case! Nice find!
So you mean, when a segment rolled, before the data flushed, the RSM is copying 
the incomplete segment/index into remote tier, right? Agree we should force 
flush before copying data. I've checked, if we flush the offset multiple times, 
only the 1st time will acquire lock and do real flush operation. So it won't 
increase overhead to CPU. 

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15507) adminClient should not throw retriable exception when closing instance

2023-10-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15507.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> adminClient should not throw retriable exception when closing instance
> --
>
> Key: KAFKA-15507
> URL: https://issues.apache.org/jira/browse/KAFKA-15507
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.7.0
>
>
> When adminClient is closing the instance, it'll first set 
> `hardShutdownTimeMs` to a positive timeout value, and then wait until 
> existing threads to complete within the timeout. However, within this 
> waiting, when new caller tries to invoke new commend in adminClient, it'll 
> immediately get an 
> {code:java}
> TimeoutException("The AdminClient thread is not accepting new calls.")
> {code}
> There are some issues with the design:
> 1. Since the `TimeoutException` is a retriable exception, the caller will 
> enter a tight loop and keep trying it
> 2. The error message is confusing. What does "the adminClient is not 
> accepting new calls" mean?
> We should improve it by throwing a non-retriable error (ex: 
> IllegalStateException), then, the error message should clearly describe the 
> adminClient is closing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14369) Docs - KRAFT controller authentication example

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14369:
--
Labels: 4.0-blocker  (was: )

> Docs - KRAFT controller authentication example
> --
>
> Key: KAFKA-14369
> URL: https://issues.apache.org/jira/browse/KAFKA-14369
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Affects Versions: 3.3.1
>Reporter: Domenic Bove
>Priority: Minor
>  Labels: 4.0-blocker
>
> The [Kafka Listener docs 
> |https://kafka.apache.org/documentation/#listener_configuration]mention how 
> to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller 
> listener, but it is not a working example and I found that I was missing this 
> property: 
> {code:java}
> sasl.mechanism.controller.protocol {code}
> when attempting to do SASL_PLAINTEXT on the controller listener. I see that 
> property here: 
> [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol]
> But nowhere else. 
> I wonder if a complete working example would be better. Here are my working 
> configs for sasl plain on the controller
> {code:java}
> process.roles=controller
> listeners=CONTROLLER://:9093 
> node.id=1
> controller.quorum.voters=1@localhost:9093
> controller.listener.names=CONTROLLER
> listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT
> listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="admin" password="admin-secret" user_admin="admin-secret" 
> user_alice="alice-secret";
> listener.name.controller.sasl.enabled.mechanisms=PLAIN
> listener.name.controller.sasl.mechanism=PLAIN
> sasl.enabled.mechanisms=PLAIN
> sasl.mechanism.controller.protocol=PLAIN{code}
> Or maybe just a callout of that property in the existing docs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15489) split brain in KRaft cluster

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15489:
--
Labels: 4.0-blocker  (was: )

> split brain in KRaft cluster 
> -
>
> Key: KAFKA-15489
> URL: https://issues.apache.org/jira/browse/KAFKA-15489
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>  Labels: 4.0-blocker
>
> I found in the current KRaft implementation, when network partition happened 
> between the current controller leader and the other controller nodes, the 
> "split brain" issue will happen. It causes 2 leaders will exist in the 
> controller cluster, and 2 inconsistent sets of metadata will return to the 
> clients.
>  
> *Root cause*
> In 
> [KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote],
>  we said A voter will begin a new election under three conditions:
> 1. If it fails to receive a FetchResponse from the current leader before 
> expiration of quorum.fetch.timeout.ms
> 2. If it receives a EndQuorumEpoch request from the current leader
> 3. If it fails to receive a majority of votes before expiration of 
> quorum.election.timeout.ms after declaring itself a candidate.
> And that's exactly what the current KRaft's implementation.
>  
> However, when the leader is isolated from the network partition, there's no 
> way for it to resign from the leadership and start a new election. So the 
> leader will always be the leader even though all other nodes are down. And 
> this makes the split brain issue possible.
> When reading further in the KIP-595, I found we indeed considered this 
> situation and have solution for that. in [this 
> section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout],
>  it said:
> {quote}In the pull-based model, however, say a new leader has been elected 
> with a new epoch and everyone has learned about it except the old leader 
> (e.g. that leader was not in the voters anymore and hence not receiving the 
> BeginQuorumEpoch as well), then that old leader would not be notified by 
> anyone about the new leader / epoch and become a pure "zombie leader", as 
> there is no regular heartbeats being pushed from leader to the follower. This 
> could lead to stale information being served to the observers and clients 
> inside the cluster.
> {quote}
> {quote}To resolve this issue, we will piggy-back on the 
> "quorum.fetch.timeout.ms" config, such that if the leader did not receive 
> Fetch requests from a majority of the quorum for that amount of time, it 
> would begin a new election and start sending VoteRequest to voter nodes in 
> the cluster to understand the latest quorum. If it couldn't connect to any 
> known voter, the old leader shall keep starting new elections and bump the 
> epoch.
> {quote}
>  
> But we missed this implementation in current KRaft.
>  
> *The flow is like this:*
> 1. 3 controller nodes, A(leader), B(follower), C(follower)
> 2. network partition happened between [A] and [B, C].
> 3. B and C starts new election since fetch timeout expired before receiving 
> fetch response from leader A.
> 4. B (or C) is elected as a leader in new epoch, while A is still the leader 
> in old epoch.
> 5. broker D creates a topic "new", and updates to leader B.
> 6. broker E describe topic "new", but got nothing because it is connecting to 
> the old leader A.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15369) Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add Controller Registration

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15369:
--
Fix Version/s: (was: 3.6.0)

> Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add 
> Controller Registration
> ---
>
> Key: KAFKA-15369
> URL: https://issues.apache.org/jira/browse/KAFKA-15369
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15230) ApiVersions data between controllers is not reliable

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15230:
--
Labels:   (was: 4.0-blocker)

> ApiVersions data between controllers is not reliable
> 
>
> Key: KAFKA-15230
> URL: https://issues.apache.org/jira/browse/KAFKA-15230
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: Colin McCabe
>Priority: Critical
> Fix For: 3.7.0
>
>
> While testing ZK migrations, I noticed a case where the controller was not 
> starting the migration due to the missing ApiVersions data from other 
> controllers. This was unexpected because the quorum was running and the 
> followers were replicating the metadata log as expected. After examining a 
> heap dump of the leader, it was in fact the case that the ApiVersions map of 
> NodeApiVersions was empty.
>  
> After further investigation and offline discussion with [~jsancio], we 
> realized that after the initial leader election, the connection from the Raft 
> leader to the followers will become idle and eventually timeout and close. 
> This causes NetworkClient to purge the NodeApiVersions data for the closed 
> connections.
>  
> There are two main side effects of this behavior: 
> 1) If migrations are not started within the idle timeout period (10 minutes, 
> by default), then they will not be able to be started. After this timeout 
> period, I was unable to restart the controllers in such a way that the leader 
> had active connections with all followers.
> 2) Dynamically updating features, such as "metadata.version", is not 
> guaranteed to be safe
>  
> There is a partial workaround for the migration issue. If we set "
> connections.max.idle.ms" to -1, the Raft leader will never disconnect from 
> the followers. However, if a follower restarts, the leader will not 
> re-establish a connection.
>  
> The feature update issue has no safe workarounds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15230) ApiVersions data between controllers is not reliable

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15230:
--
Labels: 4.0-blocker  (was: )

> ApiVersions data between controllers is not reliable
> 
>
> Key: KAFKA-15230
> URL: https://issues.apache.org/jira/browse/KAFKA-15230
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: Colin McCabe
>Priority: Critical
>  Labels: 4.0-blocker
> Fix For: 3.7.0
>
>
> While testing ZK migrations, I noticed a case where the controller was not 
> starting the migration due to the missing ApiVersions data from other 
> controllers. This was unexpected because the quorum was running and the 
> followers were replicating the metadata log as expected. After examining a 
> heap dump of the leader, it was in fact the case that the ApiVersions map of 
> NodeApiVersions was empty.
>  
> After further investigation and offline discussion with [~jsancio], we 
> realized that after the initial leader election, the connection from the Raft 
> leader to the followers will become idle and eventually timeout and close. 
> This causes NetworkClient to purge the NodeApiVersions data for the closed 
> connections.
>  
> There are two main side effects of this behavior: 
> 1) If migrations are not started within the idle timeout period (10 minutes, 
> by default), then they will not be able to be started. After this timeout 
> period, I was unable to restart the controllers in such a way that the leader 
> had active connections with all followers.
> 2) Dynamically updating features, such as "metadata.version", is not 
> guaranteed to be safe
>  
> There is a partial workaround for the migration issue. If we set "
> connections.max.idle.ms" to -1, the Raft leader will never disconnect from 
> the followers. However, if a follower restarts, the leader will not 
> re-establish a connection.
>  
> The feature update issue has no safe workarounds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14927) Dynamic configs not validated when using kafka-configs and --add-config-file

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14927:
--
Labels: 4.0-blocker  (was: )

> Dynamic configs not validated when using kafka-configs and --add-config-file
> 
>
> Key: KAFKA-14927
> URL: https://issues.apache.org/jira/browse/KAFKA-14927
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: Justin Daines
>Assignee: José Armando García Sancio
>Priority: Minor
>  Labels: 4.0-blocker
>
> Using {{kafka-configs}} should validate dynamic configurations before 
> applying. It is possible to send a file with invalid configurations. 
> For example a file containing the following:
> {code:java}
> {
>   "routes": {
>     "crn:///kafka=*": {
>       "management": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "describe": {
>         "allowed": "",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "authentication": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authn"
>       },
>       "authorize": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authz"
>       },
>       "interbroker": {
>         "allowed": "",
>         "denied": ""
>       }
>     },
>     "crn:///kafka=*/group=*": {
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     },
>     "crn:///kafka=*/topic=*": {
>       "produce": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       },
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     }
>   },
>   "destinations": {
>     "topics": {
>       "confluent-audit-log-events": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authn": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authz": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events_audit": {
>         "retention_ms": 777600
>       }
>     }
>   },
>   "default_topics": {
>     "allowed": "confluent-audit-log-events_audit",
>     "denied": "confluent-audit-log-events"
>   },
>   "excluded_principals": [
>     "User:schemaregistryUser",
>     "User:ANONYMOUS",
>     "User:appSA",
>     "User:admin",
>     "User:connectAdmin",
>     "User:connectorSubmitter",
>     "User:connectorSA",
>     "User:schemaregistryUser",
>     "User:ksqlDBAdmin",
>     "User:ksqlDBUser",
>     "User:controlCenterAndKsqlDBServer",
>     "User:controlcenterAdmin",
>     "User:restAdmin",
>     "User:appSA",
>     "User:clientListen",
>     "User:superUser"
>   ]
> } {code}
> {code:java}
> kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
> --entity-default --alter --add-config-file audit-log.json {code}
> Yields the following dynamic configs:
> {code:java}
> Default configs for brokers in the cluster are:
>   "destinations"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
>   "confluent-audit-log-events-denied-authn"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
>   "routes"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
>   "User=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
>   },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
>   "excluded_principals"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
>   "confluent-audit-log-events_audit"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
>   "authorize"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
>   "default_topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
>   "topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
>   ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
>   "interbroker"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
>   "produce"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"produce"=null}
>   "denied"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"

[jira] [Updated] (KAFKA-15369) Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add Controller Registration

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15369:
--
Fix Version/s: 3.6.0

> Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add 
> Controller Registration
> ---
>
> Key: KAFKA-15369
> URL: https://issues.apache.org/jira/browse/KAFKA-15369
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13105) Expose a method in KafkaConfig to write the configs to a logger

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13105:
--
Labels: 4.0-blocker  (was: )

> Expose a method in KafkaConfig to write the configs to a logger
> ---
>
> Key: KAFKA-13105
> URL: https://issues.apache.org/jira/browse/KAFKA-13105
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Minor
>  Labels: 4.0-blocker
>
> We should expose a method in KafkaConfig to write the configs to a logger. 
> Currently there is no good way to write them out except creating a new 
> KafkaConfig object with doLog = true, which is unintuitive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13835) Fix two bugs related to dynamic broker configs in KRaft

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13835:
--
Labels: 4.0-blocker  (was: )

> Fix two bugs related to dynamic broker configs in KRaft
> ---
>
> Key: KAFKA-13835
> URL: https://issues.apache.org/jira/browse/KAFKA-13835
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Critical
>  Labels: 4.0-blocker
>
> The first bug is that we were calling reloadUpdatedFilesWithoutConfigChange 
> when a topic configuration was changed, but not when a broker configuration 
> was changed. This was backwards -- this function must be called only for 
> BROKER configs, and never for TOPIC configs. (Also, this function is called 
> only for specific broker configs, not for cluster configs.)
> The second bug is that there were several configurations such as 
> `max.connections` which were related to broker listeners, but which did not 
> involve creating or removing new listeners. We can and should support these 
> configurations in KRaft, since no additional work is needed to support them. 
> Only adding or removing listeners is unsupported. This PR adds support for 
> these by fixing the configuration change validation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14941:
--
Labels: 4.0-blocker  (was: )

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Priority: Major
>  Labels: 4.0-blocker
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14057) Support dynamic reconfiguration in KRaft remote controllers

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14057:
--
Labels: 4.0-blocker  (was: )

> Support dynamic reconfiguration in KRaft remote controllers
> ---
>
> Key: KAFKA-14057
> URL: https://issues.apache.org/jira/browse/KAFKA-14057
> Project: Kafka
>  Issue Type: Task
>Reporter: Ron Dagostino
>Assignee: Colin McCabe
>Priority: Major
>  Labels: 4.0-blocker
>
> We currently do not support dynamic reconfiguration of KRaft remote 
> controllers.  We only wire up brokers and react to metadata log changes 
> there.  We do no such wiring or reacting in a node where 
> process.roles=controller.  Related to 
> https://issues.apache.org/jira/browse/KAFKA-14051.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14349:
--
Labels: 4.0-blocker kip-500  (was: kip-500)

> Support dynamically resizing the KRaft controller's thread pools
> 
>
> Key: KAFKA-14349
> URL: https://issues.apache.org/jira/browse/KAFKA-14349
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: 4.0-blocker, kip-500
>
> Support dynamically resizing the KRaft controller's request handler and 
> network handler thread pools. See {{DynamicBrokerConfig.scala}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15513:
--
Labels: 4.0-blocker  (was: )

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>  Labels: 4.0-blocker
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller memebership changes

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14094:
--
Labels: 4.0-blocker  (was: )

> KIP-853: KRaft controller memebership changes
> -
>
> Key: KAFKA-14094
> URL: https://issues.apache.org/jira/browse/KAFKA-14094
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: 4.0-blocker
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14127) KIP-858: Handle JBOD broker disk failure in KRaft

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14127:
--
Labels: 4.0-blocker kip-500 kraft  (was: kip-500 kraft)

> KIP-858: Handle JBOD broker disk failure in KRaft
> -
>
> Key: KAFKA-14127
> URL: https://issues.apache.org/jira/browse/KAFKA-14127
> Project: Kafka
>  Issue Type: Improvement
>  Components: jbod, kraft
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>  Labels: 4.0-blocker, kip-500, kraft
> Fix For: 3.7.0
>
>
> Supporting configurations with multiple storage directories in KRaft mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14113) KIP-856: KRaft Disk Failure Recovery

2023-10-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14113:
--
Labels: 4.0-blocker  (was: )

> KIP-856: KRaft Disk Failure Recovery
> 
>
> Key: KAFKA-14113
> URL: https://issues.apache.org/jira/browse/KAFKA-14113
> Project: Kafka
>  Issue Type: Improvement
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: 4.0-blocker
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-10-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772048#comment-17772048
 ] 

Luke Chen commented on KAFKA-14941:
---

Great! I'm working on the list, will let you know when completed. 

REF: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263427911

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Priority: Major
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15537) Unsafe metadata.version downgrade is not supported

2023-10-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771842#comment-17771842
 ] 

Luke Chen commented on KAFKA-15537:
---

[~fvaleri], thanks for raising this issue. Since you've dived into this issue, 
are you interested in submitting a PR for this issue?

> Unsafe metadata.version downgrade is not supported
> --
>
> Key: KAFKA-15537
> URL: https://issues.apache.org/jira/browse/KAFKA-15537
> Project: Kafka
>  Issue Type: Bug
>Reporter: Federico Valeri
>Priority: Major
>
> In KIP-778 we introduced the "unsafe" downgrade functionality in case one of 
> the metadata versions between current and target have changes, as defined in 
> MetadataVersion. This is a lossy downgrade where each node rebuilds its 
> metadata snapshots, omitting the metadata changes. Currently, this is not 
> supported, as shown by the following command.
> {code}
> bin/kafka-features.sh --bootstrap-server :9092 downgrade --metadata 3.4 
> --unsafe
> Could not downgrade metadata.version to 8. Invalid metadata.version 8.
> Unsafe metadata downgrade is not supported in this version.
> 1 out of 1 operation(s) failed.
> {code}
> Also note that documentation says it's possible: "Note that the cluster 
> metadata version cannot be downgraded to a pre-production 3.0.x, 3.1.x, or 
> 3.2.x version once it has been upgraded. However, it is possible to downgrade 
> to production versions such as 3.3-IV0, 3.3-IV1, etc."
> The error message could be improved: "Unsafe metadata downgrade is not 
> supported in this Kafka version."



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15536) dynamically resize remoteIndexCache

2023-10-03 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15536:
-

 Summary: dynamically resize remoteIndexCache
 Key: KAFKA-15536
 URL: https://issues.apache.org/jira/browse/KAFKA-15536
 Project: Kafka
  Issue Type: Improvement
  Components: Tiered-Storage
Affects Versions: 3.6.0
Reporter: Luke Chen
Assignee: hudeqi


context:
https://github.com/apache/kafka/pull/14243#discussion_r1320630057



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-10-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771404#comment-17771404
 ] 

Luke Chen commented on KAFKA-14941:
---

[~bikash30851], are you still interested in this ticket? 

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Priority: Major
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15498) Upgrade Snappy-Java to 1.1.10.4

2023-10-02 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771051#comment-17771051
 ] 

Luke Chen commented on KAFKA-15498:
---

Note: In v3.7.0, Snappy-Java is upgraded to 1.1.10.5 in this PR: 
https://github.com/apache/kafka/pull/14458

> Upgrade Snappy-Java to 1.1.10.4
> ---
>
> Key: KAFKA-15498
> URL: https://issues.apache.org/jira/browse/KAFKA-15498
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1, 3.5.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Snappy-java published a new vulnerability
> <[https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv]>
> that will cause OOM error in the server.
> Kafka is also impacted by this vulnerability since it's like CVE-2023-34455
> <[https://nvd.nist.gov/vuln/detail/CVE-2023-34455]>.
> We'd better bump the snappy-java version to bypass this vulnerability.
> PR <[https://github.com/apache/kafka/pull/14434]> is created to run the CI
> build.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15489) split brain in KRaft cluster

2023-09-27 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15489:
--
Fix Version/s: (was: 3.6.0)

> split brain in KRaft cluster 
> -
>
> Key: KAFKA-15489
> URL: https://issues.apache.org/jira/browse/KAFKA-15489
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> I found in the current KRaft implementation, when network partition happened 
> between the current controller leader and the other controller nodes, the 
> "split brain" issue will happen. It causes 2 leaders will exist in the 
> controller cluster, and 2 inconsistent sets of metadata will return to the 
> clients.
>  
> *Root cause*
> In 
> [KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote],
>  we said A voter will begin a new election under three conditions:
> 1. If it fails to receive a FetchResponse from the current leader before 
> expiration of quorum.fetch.timeout.ms
> 2. If it receives a EndQuorumEpoch request from the current leader
> 3. If it fails to receive a majority of votes before expiration of 
> quorum.election.timeout.ms after declaring itself a candidate.
> And that's exactly what the current KRaft's implementation.
>  
> However, when the leader is isolated from the network partition, there's no 
> way for it to resign from the leadership and start a new election. So the 
> leader will always be the leader even though all other nodes are down. And 
> this makes the split brain issue possible.
> When reading further in the KIP-595, I found we indeed considered this 
> situation and have solution for that. in [this 
> section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout],
>  it said:
> {quote}In the pull-based model, however, say a new leader has been elected 
> with a new epoch and everyone has learned about it except the old leader 
> (e.g. that leader was not in the voters anymore and hence not receiving the 
> BeginQuorumEpoch as well), then that old leader would not be notified by 
> anyone about the new leader / epoch and become a pure "zombie leader", as 
> there is no regular heartbeats being pushed from leader to the follower. This 
> could lead to stale information being served to the observers and clients 
> inside the cluster.
> {quote}
> {quote}To resolve this issue, we will piggy-back on the 
> "quorum.fetch.timeout.ms" config, such that if the leader did not receive 
> Fetch requests from a majority of the quorum for that amount of time, it 
> would begin a new election and start sending VoteRequest to voter nodes in 
> the cluster to understand the latest quorum. If it couldn't connect to any 
> known voter, the old leader shall keep starting new elections and bump the 
> epoch.
> {quote}
>  
> But we missed this implementation in current KRaft.
>  
> *The flow is like this:*
> 1. 3 controller nodes, A(leader), B(follower), C(follower)
> 2. network partition happened between [A] and [B, C].
> 3. B and C starts new election since fetch timeout expired before receiving 
> fetch response from leader A.
> 4. B (or C) is elected as a leader in new epoch, while A is still the leader 
> in old epoch.
> 5. broker D creates a topic "new", and updates to leader B.
> 6. broker E describe topic "new", but got nothing because it is connecting to 
> the old leader A.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15489) split brain in KRaft cluster

2023-09-27 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reopened KAFKA-15489:
---

> split brain in KRaft cluster 
> -
>
> Key: KAFKA-15489
> URL: https://issues.apache.org/jira/browse/KAFKA-15489
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.6.0
>
>
> I found in the current KRaft implementation, when network partition happened 
> between the current controller leader and the other controller nodes, the 
> "split brain" issue will happen. It causes 2 leaders will exist in the 
> controller cluster, and 2 inconsistent sets of metadata will return to the 
> clients.
>  
> *Root cause*
> In 
> [KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote],
>  we said A voter will begin a new election under three conditions:
> 1. If it fails to receive a FetchResponse from the current leader before 
> expiration of quorum.fetch.timeout.ms
> 2. If it receives a EndQuorumEpoch request from the current leader
> 3. If it fails to receive a majority of votes before expiration of 
> quorum.election.timeout.ms after declaring itself a candidate.
> And that's exactly what the current KRaft's implementation.
>  
> However, when the leader is isolated from the network partition, there's no 
> way for it to resign from the leadership and start a new election. So the 
> leader will always be the leader even though all other nodes are down. And 
> this makes the split brain issue possible.
> When reading further in the KIP-595, I found we indeed considered this 
> situation and have solution for that. in [this 
> section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout],
>  it said:
> {quote}In the pull-based model, however, say a new leader has been elected 
> with a new epoch and everyone has learned about it except the old leader 
> (e.g. that leader was not in the voters anymore and hence not receiving the 
> BeginQuorumEpoch as well), then that old leader would not be notified by 
> anyone about the new leader / epoch and become a pure "zombie leader", as 
> there is no regular heartbeats being pushed from leader to the follower. This 
> could lead to stale information being served to the observers and clients 
> inside the cluster.
> {quote}
> {quote}To resolve this issue, we will piggy-back on the 
> "quorum.fetch.timeout.ms" config, such that if the leader did not receive 
> Fetch requests from a majority of the quorum for that amount of time, it 
> would begin a new election and start sending VoteRequest to voter nodes in 
> the cluster to understand the latest quorum. If it couldn't connect to any 
> known voter, the old leader shall keep starting new elections and bump the 
> epoch.
> {quote}
>  
> But we missed this implementation in current KRaft.
>  
> *The flow is like this:*
> 1. 3 controller nodes, A(leader), B(follower), C(follower)
> 2. network partition happened between [A] and [B, C].
> 3. B and C starts new election since fetch timeout expired before receiving 
> fetch response from leader A.
> 4. B (or C) is elected as a leader in new epoch, while A is still the leader 
> in old epoch.
> 5. broker D creates a topic "new", and updates to leader B.
> 6. broker E describe topic "new", but got nothing because it is connecting to 
> the old leader A.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15507) adminClient should not throw retriable exception when closing instance

2023-09-26 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15507:
-

 Summary: adminClient should not throw retriable exception when 
closing instance
 Key: KAFKA-15507
 URL: https://issues.apache.org/jira/browse/KAFKA-15507
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 3.5.1
Reporter: Luke Chen


When adminClient is closing the instance, it'll first set `hardShutdownTimeMs` 
to a positive timeout value, and then wait until existing threads to complete 
within the timeout. However, within this waiting, when new caller tries to 
invoke new commend in adminClient, it'll immediately get an 


{code:java}
TimeoutException("The AdminClient thread is not accepting new calls.")
{code}

There are some issues with the design:
1. Since the `TimeoutException` is a retriable exception, the caller will enter 
a tight loop and keep trying it
2. The error message is confusing. What does "the adminClient is not accepting 
new calls" mean?

We should improve it by throwing a non-retriable error (ex: 
IllegalStateException), then, the error message should clearly describe the 
adminClient is closing.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15498) [CVE fix] Upgrade Snappy-Java to 1.1.10.4

2023-09-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17768589#comment-17768589
 ] 

Luke Chen commented on KAFKA-15498:
---

Thanks [~divijvaidya]!

> [CVE fix] Upgrade Snappy-Java to 1.1.10.4
> -
>
> Key: KAFKA-15498
> URL: https://issues.apache.org/jira/browse/KAFKA-15498
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1, 3.5.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.6.0
>
>
> Snappy-java published a new vulnerability
> <[https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv]>
> that will cause OOM error in the server.
> Kafka is also impacted by this vulnerability since it's like CVE-2023-34455
> <[https://nvd.nist.gov/vuln/detail/CVE-2023-34455]>.
> We'd better bump the snappy-java version to bypass this vulnerability.
> PR <[https://github.com/apache/kafka/pull/14434]> is created to run the CI
> build.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo

2023-09-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17768343#comment-17768343
 ] 

Luke Chen commented on KAFKA-15491:
---

[~lihaosky][~mjsax], FYI

> RackId doesn't exist error while running WordCountDemo
> --
>
> Key: KAFKA-15491
> URL: https://issues.apache.org/jira/browse/KAFKA-15491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Priority: Major
>
> While running the WordCountDemo following the 
> [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
> following error logs in the stream application output. Though everything 
> still works fine, it'd be better there are no ERROR logs in the demo app.
> {code:java}
> [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo

2023-09-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15491:
--
Summary: RackId doesn't exist error while running WordCountDemo  (was: 
RackId doesn't exist while running WordCountDemo)

> RackId doesn't exist error while running WordCountDemo
> --
>
> Key: KAFKA-15491
> URL: https://issues.apache.org/jira/browse/KAFKA-15491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Priority: Major
>
> While running the WordCountDemo following the 
> [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
> following error logs in the stream application output. Though everything 
> still works fine, it'd be better there are no ERROR logs in the demo app.
> {code:java}
> [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15491) RackId doesn't exist while running WordCountDemo

2023-09-23 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15491:
-

 Summary: RackId doesn't exist while running WordCountDemo
 Key: KAFKA-15491
 URL: https://issues.apache.org/jira/browse/KAFKA-15491
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Luke Chen


While running the WordCountDemo following the 
[docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
following error logs in the stream application output. Though everything still 
works fine, it'd be better there are no ERROR logs in the demo app.


{code:java}
[2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
 (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
[2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
 (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15489) split brain in KRaft cluster

2023-09-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15489:
--
Description: 
I found in the current KRaft implementation, when network partition happened 
between the current controller leader and the other controller nodes, the 
"split brain" issue will happen. It causes 2 leaders will exist in the 
controller cluster, and 2 inconsistent sets of metadata will return to the 
clients.

 

*Root cause*
In 
[KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote],
 we said A voter will begin a new election under three conditions:

1. If it fails to receive a FetchResponse from the current leader before 
expiration of quorum.fetch.timeout.ms
2. If it receives a EndQuorumEpoch request from the current leader
3. If it fails to receive a majority of votes before expiration of 
quorum.election.timeout.ms after declaring itself a candidate.

And that's exactly what the current KRaft's implementation.

 

However, when the leader is isolated from the network partition, there's no way 
for it to resign from the leadership and start a new election. So the leader 
will always be the leader even though all other nodes are down. And this makes 
the split brain issue possible.

When reading further in the KIP-595, I found we indeed considered this 
situation and have solution for that. in [this 
section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout],
 it said:
{quote}In the pull-based model, however, say a new leader has been elected with 
a new epoch and everyone has learned about it except the old leader (e.g. that 
leader was not in the voters anymore and hence not receiving the 
BeginQuorumEpoch as well), then that old leader would not be notified by anyone 
about the new leader / epoch and become a pure "zombie leader", as there is no 
regular heartbeats being pushed from leader to the follower. This could lead to 
stale information being served to the observers and clients inside the cluster.
{quote}
{quote}To resolve this issue, we will piggy-back on the 
"quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch 
requests from a majority of the quorum for that amount of time, it would begin 
a new election and start sending VoteRequest to voter nodes in the cluster to 
understand the latest quorum. If it couldn't connect to any known voter, the 
old leader shall keep starting new elections and bump the epoch.
{quote}
 

But we missed this implementation in current KRaft.

 
*The flow is like this:*
1. 3 controller nodes, A(leader), B(follower), C(follower)
2. network partition happened between [A] and [B, C].
3. B and C starts new election since fetch timeout expired before receiving 
fetch response from leader A.
4. B (or C) is elected as a leader in new epoch, while A is still the leader in 
old epoch.
5. broker D creates a topic "new", and updates to leader B.
6. broker E describe topic "new", but got nothing because it is connecting to 
the old leader A.

 

  was:
I found in the current KRaft implementation, when network partition happened 
between the current controller leader and the other controller nodes, the 
"split brain" issue will happen. It causes 2 leaders will exist in the 
controller cluster, and 2 inconsistent sets of metadata will return to the 
clients.

 

*Root cause*
In 
[KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote],
 we said A voter will begin a new election under three conditions:

1. If it fails to receive a FetchResponse from the current leader before 
expiration of quorum.fetch.timeout.ms
2. If it receives a EndQuorumEpoch request from the current leader
3. If it fails to receive a majority of votes before expiration of 
quorum.election.timeout.ms after declaring itself a candidate.

And that's exactly what the current KRaft's implementation.

 

However, when the leader is isolated from the network partition, there's no way 
for it to resign from the leadership and start a new election. So the leader 
will always be the leader even though all other nodes are down. And this makes 
the split brain issue possible.

When reading further in the KIP-595, I found we indeed considered this 
situation and have solution for that. in [this 
section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout],
 it said:
{quote}In the pull-based model, however, say a new leader has been elected with 
a new epoch and everyone has learned about it except the old leader (e.g. that 
leader was not in the voters anymore and hen

[jira] [Created] (KAFKA-15489) split brain in KRaft cluster

2023-09-23 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15489:
-

 Summary: split brain in KRaft cluster 
 Key: KAFKA-15489
 URL: https://issues.apache.org/jira/browse/KAFKA-15489
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.5.1
Reporter: Luke Chen
Assignee: Luke Chen


I found in the current KRaft implementation, when network partition happened 
between the current controller leader and the other controller nodes, the 
"split brain" issue will happen. It causes 2 leaders will exist in the 
controller cluster, and 2 inconsistent sets of metadata will return to the 
clients.

 

*Root cause*
In 
[KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote],
 we said A voter will begin a new election under three conditions:

1. If it fails to receive a FetchResponse from the current leader before 
expiration of quorum.fetch.timeout.ms
2. If it receives a EndQuorumEpoch request from the current leader
3. If it fails to receive a majority of votes before expiration of 
quorum.election.timeout.ms after declaring itself a candidate.

And that's exactly what the current KRaft's implementation.

 

However, when the leader is isolated from the network partition, there's no way 
for it to resign from the leadership and start a new election. So the leader 
will always be the leader even though all other nodes are down. And this makes 
the split brain issue possible.

When reading further in the KIP-595, I found we indeed considered this 
situation and have solution for that. in [this 
section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout],
 it said:
{quote}In the pull-based model, however, say a new leader has been elected with 
a new epoch and everyone has learned about it except the old leader (e.g. that 
leader was not in the voters anymore and hence not receiving the 
BeginQuorumEpoch as well), then that old leader would not be notified by anyone 
about the new leader / epoch and become a pure "zombie leader", as there is no 
regular heartbeats being pushed from leader to the follower. This could lead to 
stale information being served to the observers and clients inside the cluster.
{quote}
{quote}To resolve this issue, we will piggy-back on the 
"quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch 
requests from a majority of the quorum for that amount of time, it would begin 
a new election and start sending VoteRequest to voter nodes in the cluster to 
understand the latest quorum. If it couldn't connect to any known voter, the 
old leader shall keep starting new elections and bump the epoch.
{quote}
 

But we missed this implementation in current KRaft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15414) remote logs get deleted after partition reassignment

2023-09-22 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767860#comment-17767860
 ] 

Luke Chen commented on KAFKA-15414:
---

Awesome! Thank you for the confirmation, [~fvisconte]!

> remote logs get deleted after partition reassignment
> 
>
> Key: KAFKA-15414
> URL: https://issues.apache.org/jira/browse/KAFKA-15414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
> Attachments: Screenshot 2023-09-12 at 13.53.07.png, 
> image-2023-08-29-11-12-58-875.png
>
>
> it seems I'm reaching that codepath when running reassignments on my cluster 
> and segment are deleted from remote store despite a huge retention (topic 
> created a few hours ago with 1000h retention).
> It seems to happen consistently on some partitions when reassigning but not 
> all partitions.
> My test:
> I have a test topic with 30 partition configured with 1000h global retention 
> and 2 minutes local retention
> I have a load tester producing to all partitions evenly
> I have consumer load tester consuming that topic
> I regularly reset offsets to earliest on my consumer to test backfilling from 
> tiered storage.
> My consumer was catching up consuming the backlog and I wanted to upscale my 
> cluster to speed up recovery: I upscaled my cluster from 3 to 12 brokers and 
> reassigned my test topic to all available brokers to have an even 
> leader/follower count per broker.
> When I triggered the reassignment, the consumer lag dropped on some of my 
> topic partitions:
> !image-2023-08-29-11-12-58-875.png|width=800,height=79! Screenshot 2023-08-28 
> at 20 57 09
> Later I tried to reassign back my topic to 3 brokers and the issue happened 
> again.
> Both times in my logs, I've seen a bunch of logs like:
> [RemoteLogManager=10005 partition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17] 
> Deleted remote log segment RemoteLogSegmentId
> {topicIdPartition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17, 
> id=Mk0chBQrTyKETTawIulQog}
> due to leader epoch cache truncation. Current earliest epoch: 
> EpochEntry(epoch=14, startOffset=46776780), segmentEndOffset: 46437796 and 
> segmentEpochs: [10]
> Looking at my s3 bucket. The segments prior to my reassignment have been 
> indeed deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException

2023-09-21 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767818#comment-17767818
 ] 

Luke Chen commented on KAFKA-15481:
---

Thanks for the suggestion! [~ben.manes]!

> Concurrency bug in RemoteIndexCache leads to IOException
> 
>
> Key: KAFKA-15481
> URL: https://issues.apache.org/jira/browse/KAFKA-15481
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.7.0
>
>
> RemoteIndexCache has a concurrency bug which leads to IOException while 
> fetching data from remote tier.
> Below events in order of timeline -
> Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
> async, so the files have not been renamed to "deleted" suffix yet.
> Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
> because it has been removed by 1, fetches the entry from S3, writes it to 
> existing file (using replace existing)
> Thread 1: async removalListener is invoked, acquires a lock on old entry 
> (which has been removed from cache), it renames the file to "deleted" and 
> starts deleting it
> Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
> and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
> returns an error as it won't allow creation of 2GB random access file.
> *Potential Fix*
> Use EvictionListener instead of RemovalListener in Caffeine cache as per the 
> documentation:
> {quote} When the operation must be performed synchronously with eviction, use 
> {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will 
> only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit 
> removal, {{Cache.asMap()}} offers compute methods that are performed 
> atomically.{quote}
> This will ensure that removal from cache and marking the file with delete 
> suffix is synchronously done, hence the above race condition will not occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException

2023-09-21 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767486#comment-17767486
 ] 

Luke Chen commented on KAFKA-15481:
---

After re-reading the suggestion in Caffeine 
[doc|https://github.com/ben-manes/caffeine/wiki/Removal], the 
`evictionListener` only get invoked when "object eviction", not removal 
explicitly. We should use `internalCache.asMap().computeIfPresent()` instead, 
which I think will fix the issue I mentioned above since the concurrentMap will 
help us protect multiple threads updating the value in the same key. So, my 
thought is:

When in RemoteIndexCache#getIndexEntry, we use 
{code:java}
internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
uuid -> createCacheEntry(remoteLogSegmentMetadata));
{code}
It internally use  `ConcurrentMap#computeIfAbsent` to update the entry.

And in `RemoteIndexCache#remove`, we can now use:

{code:java}
`internalCache.asMap().computeIfPresent(key, () -> {// rename files and return 
null to remove the key})` 
{code}

WDYT?


> Concurrency bug in RemoteIndexCache leads to IOException
> 
>
> Key: KAFKA-15481
> URL: https://issues.apache.org/jira/browse/KAFKA-15481
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.7.0
>
>
> RemoteIndexCache has a concurrency bug which leads to IOException while 
> fetching data from remote tier.
> Below events in order of timeline -
> Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
> async, so the files have not been renamed to "deleted" suffix yet.
> Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
> because it has been removed by 1, fetches the entry from S3, writes it to 
> existing file (using replace existing)
> Thread 1: async removalListener is invoked, acquires a lock on old entry 
> (which has been removed from cache), it renames the file to "deleted" and 
> starts deleting it
> Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
> and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
> returns an error as it won't allow creation of 2GB random access file.
> *Potential Fix*
> Use EvictionListener instead of RemovalListener in Caffeine cache as per the 
> documentation:
> {quote} When the operation must be performed synchronously with eviction, use 
> {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will 
> only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit 
> removal, {{Cache.asMap()}} offers compute methods that are performed 
> atomically.{quote}
> This will ensure that removal from cache and marking the file with delete 
> suffix is synchronously done, hence the above race condition will not occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException

2023-09-21 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767485#comment-17767485
 ] 

Luke Chen commented on KAFKA-15481:
---

About the solution to change to sync way, I have a question:
Currently, we use readLock for both RemoteIndexCache#getIndexEntry and 
RemoteIndexCache#remove. That means, the original will still appear after using 
sync way:

Thread 1 (cache thread): (readLock) invalidates the entry, removalListener is 
invoked *sync*, the files is going to be renamed (not yet)

Thread 2: (fetch thread): (readLock) tries to find entry in cache, doesn't find 
it because it has been removed by 1, fetches the entry from S3, writes it to 
existing file (using replace existing)

Thread 1: removalListener acquires a lock on old entry (which has been removed 
from cache), it renames the file to "deleted" and starts deleting it

Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
returns an error as it won't allow creation of 2GB random access file.

Is my understanding correct?

> Concurrency bug in RemoteIndexCache leads to IOException
> 
>
> Key: KAFKA-15481
> URL: https://issues.apache.org/jira/browse/KAFKA-15481
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.7.0
>
>
> RemoteIndexCache has a concurrency bug which leads to IOException while 
> fetching data from remote tier.
> Below events in order of timeline -
> Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
> async, so the files have not been renamed to "deleted" suffix yet.
> Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
> because it has been removed by 1, fetches the entry from S3, writes it to 
> existing file (using replace existing)
> Thread 1: async removalListener is invoked, acquires a lock on old entry 
> (which has been removed from cache), it renames the file to "deleted" and 
> starts deleting it
> Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
> and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
> returns an error as it won't allow creation of 2GB random access file.
> *Potential Fix*
> Use EvictionListener instead of RemovalListener in Caffeine cache as per the 
> documentation:
> {quote} When the operation must be performed synchronously with eviction, use 
> {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will 
> only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit 
> removal, {{Cache.asMap()}} offers compute methods that are performed 
> atomically.{quote}
> This will ensure that removal from cache and marking the file with delete 
> suffix is synchronously done, hence the above race condition will not occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException

2023-09-21 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767483#comment-17767483
 ] 

Luke Chen commented on KAFKA-15481:
---

Nice find, [~divijvaidya]! So, the issue is because we time gap between entry 
invalidation and the file renaming (i.e. removalListener got invoked). 
One thing to confirm, this is not a blocker for v3.6.0, right? I don't think it 
is since tiered storage is just a tech preview feature.

> Concurrency bug in RemoteIndexCache leads to IOException
> 
>
> Key: KAFKA-15481
> URL: https://issues.apache.org/jira/browse/KAFKA-15481
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.7.0
>
>
> RemoteIndexCache has a concurrency bug which leads to IOException while 
> fetching data from remote tier.
> Below events in order of timeline -
> Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
> async, so the files have not been renamed to "deleted" suffix yet.
> Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
> because it has been removed by 1, fetches the entry from S3, writes it to 
> existing file (using replace existing)
> Thread 1: async removalListener is invoked, acquires a lock on old entry 
> (which has been removed from cache), it renames the file to "deleted" and 
> starts deleting it
> Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
> and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
> returns an error as it won't allow creation of 2GB random access file.
> *Potential Fix*
> Use EvictionListener instead of RemovalListener in Caffeine cache as per the 
> documentation:
> {quote} When the operation must be performed synchronously with eviction, use 
> {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will 
> only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit 
> removal, {{Cache.asMap()}} offers compute methods that are performed 
> atomically.{quote}
> This will ensure that removal from cache and marking the file with delete 
> suffix is synchronously done, hence the above race condition will not occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15464) Allow dynamic reloading of certificates with different DN / SANs

2023-09-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17765515#comment-17765515
 ] 

Luke Chen commented on KAFKA-15464:
---

[~rsivaram] [~omkreddy], since you're the experts of this area, could you have 
a look and comment on it? Thanks.

> Allow dynamic reloading of certificates with different DN / SANs
> 
>
> Key: KAFKA-15464
> URL: https://issues.apache.org/jira/browse/KAFKA-15464
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Assignee: Jakub Scholz
>Priority: Major
>
> Kafka currently doesn't allow dynamic reloading of keystores when the new key 
> has a different DN or removes some of the SANs. While it might help to 
> prevent users from breaking their cluster, in some cases it would be great to 
> be able to bypass this validation when desired.
> More details are in the [KIP-978: Allow dynamic reloading of certificates 
> with different DN / 
> SANs|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429128]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-09-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17765502#comment-17765502
 ] 

Luke Chen commented on KAFKA-7739:
--

This will be included in v3.6.0, and be considered as an early access feature. 
It's not recommended to run in the production environment.

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]
> Next version of tiered storage is tracked at 
> https://issues.apache.org/jira/browse/KAFKA-15420



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-09-14 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17765414#comment-17765414
 ] 

Luke Chen commented on KAFKA-7739:
--

This epic is created on Dec. 14, 2018 and is finally resolved now (Sept. 14, 
2023)! What a great job for all the contributors to this tiered storage epic! 
And big thanks to the main driver [~satish.duggana]! Thank you all!

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]
> Next version of tiered storage is tracked at 
> https://issues.apache.org/jira/browse/KAFKA-15420



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15442) add document to introduce tiered storage feature and the usage

2023-09-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15442.
---
Resolution: Fixed

> add document to introduce tiered storage feature and the usage
> --
>
> Key: KAFKA-15442
> URL: https://issues.apache.org/jira/browse/KAFKA-15442
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> Add a section in the document to introduce the tiered storage feature and how 
> to enable it, use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15414) remote logs get deleted after partition reassignment

2023-09-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764170#comment-17764170
 ] 

Luke Chen commented on KAFKA-15414:
---

[~fvisconte], one thing to clarify, this time, the remote segments are now 
deleted, right? 

Could you provide full log for investigation ? If no, I'll try to reproduce it 
in my env. 

> remote logs get deleted after partition reassignment
> 
>
> Key: KAFKA-15414
> URL: https://issues.apache.org/jira/browse/KAFKA-15414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
> Attachments: Screenshot 2023-09-12 at 13.53.07.png, 
> image-2023-08-29-11-12-58-875.png
>
>
> it seems I'm reaching that codepath when running reassignments on my cluster 
> and segment are deleted from remote store despite a huge retention (topic 
> created a few hours ago with 1000h retention).
> It seems to happen consistently on some partitions when reassigning but not 
> all partitions.
> My test:
> I have a test topic with 30 partition configured with 1000h global retention 
> and 2 minutes local retention
> I have a load tester producing to all partitions evenly
> I have consumer load tester consuming that topic
> I regularly reset offsets to earliest on my consumer to test backfilling from 
> tiered storage.
> My consumer was catching up consuming the backlog and I wanted to upscale my 
> cluster to speed up recovery: I upscaled my cluster from 3 to 12 brokers and 
> reassigned my test topic to all available brokers to have an even 
> leader/follower count per broker.
> When I triggered the reassignment, the consumer lag dropped on some of my 
> topic partitions:
> !image-2023-08-29-11-12-58-875.png|width=800,height=79! Screenshot 2023-08-28 
> at 20 57 09
> Later I tried to reassign back my topic to 3 brokers and the issue happened 
> again.
> Both times in my logs, I've seen a bunch of logs like:
> [RemoteLogManager=10005 partition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17] 
> Deleted remote log segment RemoteLogSegmentId
> {topicIdPartition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17, 
> id=Mk0chBQrTyKETTawIulQog}
> due to leader epoch cache truncation. Current earliest epoch: 
> EpochEntry(epoch=14, startOffset=46776780), segmentEndOffset: 46437796 and 
> segmentEpochs: [10]
> Looking at my s3 bucket. The segments prior to my reassignment have been 
> indeed deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15442) add document to introduce tiered storage feature and the usage

2023-09-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764151#comment-17764151
 ] 

Luke Chen commented on KAFKA-15442:
---

I'll try to draft something tomorrow.

> add document to introduce tiered storage feature and the usage
> --
>
> Key: KAFKA-15442
> URL: https://issues.apache.org/jira/browse/KAFKA-15442
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> Add a section in the document to introduce the tiered storage feature and how 
> to enable it, use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15453) Enable `testFencingOnTransactionExpiration` in TransactionsWithTieredStoreTest

2023-09-11 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-15453:
-

Assignee: Luke Chen

> Enable `testFencingOnTransactionExpiration` in TransactionsWithTieredStoreTest
> --
>
> Key: KAFKA-15453
> URL: https://issues.apache.org/jira/browse/KAFKA-15453
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Satish Duggana
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-09-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762931#comment-17762931
 ] 

Luke Chen commented on KAFKA-14912:
---

Agree to limit the cache by size, instead of the total entry number.

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: hudeqi
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-09-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762920#comment-17762920
 ] 

Luke Chen commented on KAFKA-12473:
---

Yes, we can wait for KIP-848. Thanks [~dajac].

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-09-07 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-12473:
-

Assignee: (was: Luke Chen)

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14993) Improve TransactionIndex instance handling while copying to and fetching from RSM.

2023-09-07 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14993:
--
Description: 
RSM should throw a ResourceNotFoundException if it does not have 
TransactionIndex. Currently, it expects an empty InputStream and creates an 
unnecessary file in the cache. This can be avoided by catching 
ResourceNotFoundException and not creating an instance. There are minor 
cleanups needed in RemoteIndexCache and other TransactionIndex usages.

Also, update the LocalTieredStorage, see 
[this|https://github.com/apache/kafka/pull/13837#discussion_r1258917584] 
comment.

Note, please remember to update the javadoc in RSM after the fix. See: 
https://github.com/apache/kafka/pull/14352

  was:
RSM should throw a ResourceNotFoundException if it does not have 
TransactionIndex. Currently, it expects an empty InputStream and creates an 
unnecessary file in the cache. This can be avoided by catching 
ResourceNotFoundException and not creating an instance. There are minor 
cleanups needed in RemoteIndexCache and other TransactionIndex usages.

Also, update the LocalTieredStorage, see 
[this|https://github.com/apache/kafka/pull/13837#discussion_r1258917584] 
comment.


> Improve TransactionIndex instance handling while copying to and fetching from 
> RSM.
> --
>
> Key: KAFKA-14993
> URL: https://issues.apache.org/jira/browse/KAFKA-14993
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> RSM should throw a ResourceNotFoundException if it does not have 
> TransactionIndex. Currently, it expects an empty InputStream and creates an 
> unnecessary file in the cache. This can be avoided by catching 
> ResourceNotFoundException and not creating an instance. There are minor 
> cleanups needed in RemoteIndexCache and other TransactionIndex usages.
> Also, update the LocalTieredStorage, see 
> [this|https://github.com/apache/kafka/pull/13837#discussion_r1258917584] 
> comment.
> Note, please remember to update the javadoc in RSM after the fix. See: 
> https://github.com/apache/kafka/pull/14352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2023-09-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762673#comment-17762673
 ] 

Luke Chen commented on KAFKA-15402:
---

Maybe this issue can be workaround in your test env to set 0 of 
[max.incremental.fetch.session.cache.slots|https://kafka.apache.org/documentation/#brokerconfigs_max.incremental.fetch.session.cache.slots]
 (default is 1000), so that it won't create the fetch sessions.

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Benoit Delbosc
>Priority: Major
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15442) add document to introduce tiered storage feature and the usage

2023-09-07 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15442:
-

 Summary: add document to introduce tiered storage feature and the 
usage
 Key: KAFKA-15442
 URL: https://issues.apache.org/jira/browse/KAFKA-15442
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen
 Fix For: 3.6.0


Add a section in the document to introduce the tiered storage feature and how 
to enable it, use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-09-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15052.
---
Resolution: Fixed

> Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
> -
>
> Key: KAFKA-15052
> URL: https://issues.apache.org/jira/browse/KAFKA-15052
> Project: Kafka
>  Issue Type: Test
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
> Test failed at 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
>  as well as in various local runs.
> The test creates a topic, fences a broker, notes partition imbalance due to 
> another broker taking over the partition the fenced broker lost, re-registers 
> and unfences the fenced broker, sends {{AlterPartition}} for the lost 
> partition adding the now unfenced broker back to its ISR, then waits for the 
> partition imbalance to disappear.
> The local failures seem to happen when the brokers (including the ones that 
> never get fenced by the test) accidentally get fenced by losing their session 
> due to reaching the (aggressively low for test purposes) session timeout.
> The Cloudbees failure quoted above also seems to indicate that this happened:
> {code:java}
> ...[truncated 738209 chars]...
> 23. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write 
> event for maybeBalancePartitionLeaders because scheduled (DEFERRED), 
> checkIntervalNs (OptionalLong[10]) and isImbalanced (true) 
> (org.apache.kafka.controller.QuorumController:1401)
> [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-0, foo-1, foo-2 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
> foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 
> 3 -> 4, partitionEpoch: 4 -> 5 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
> appendTimestamp=240, 
> records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
> brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
> prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
> snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:512)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
> maybeFenceReplicas(451616131) will be completed when the log reaches offset 
> 27. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-1 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, leaderEpoch: 4 
> -> 5

[jira] [Updated] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-09-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15052:
--
Fix Version/s: 3.6.0
   (was: 3.7.0)

> Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
> -
>
> Key: KAFKA-15052
> URL: https://issues.apache.org/jira/browse/KAFKA-15052
> Project: Kafka
>  Issue Type: Test
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
> Test failed at 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
>  as well as in various local runs.
> The test creates a topic, fences a broker, notes partition imbalance due to 
> another broker taking over the partition the fenced broker lost, re-registers 
> and unfences the fenced broker, sends {{AlterPartition}} for the lost 
> partition adding the now unfenced broker back to its ISR, then waits for the 
> partition imbalance to disappear.
> The local failures seem to happen when the brokers (including the ones that 
> never get fenced by the test) accidentally get fenced by losing their session 
> due to reaching the (aggressively low for test purposes) session timeout.
> The Cloudbees failure quoted above also seems to indicate that this happened:
> {code:java}
> ...[truncated 738209 chars]...
> 23. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write 
> event for maybeBalancePartitionLeaders because scheduled (DEFERRED), 
> checkIntervalNs (OptionalLong[10]) and isImbalanced (true) 
> (org.apache.kafka.controller.QuorumController:1401)
> [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-0, foo-1, foo-2 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
> foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 
> 3 -> 4, partitionEpoch: 4 -> 5 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
> appendTimestamp=240, 
> records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
> brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
> prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
> snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:512)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
> maybeFenceReplicas(451616131) will be completed when the log reaches offset 
> 27. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-1 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: le

[jira] [Commented] (KAFKA-15399) Enable OffloadAndConsumeFromLeader test

2023-09-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761585#comment-17761585
 ] 

Luke Chen commented on KAFKA-15399:
---

Confirmed that in the [latest trunk 
build|https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2167/], there 
are no `OffloadAndConsumeFromLeaderTest` test failures.

 

 

> Enable OffloadAndConsumeFromLeader test
> ---
>
> Key: KAFKA-15399
> URL: https://issues.apache.org/jira/browse/KAFKA-15399
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Build / JDK 17 and Scala 2.13 / initializationError – 
> org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15427) Integration tests in TS test harness detect resource leaks

2023-09-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761566#comment-17761566
 ] 

Luke Chen commented on KAFKA-15427:
---

PR: https://github.com/apache/kafka/pull/14319

> Integration tests in TS test harness detect resource leaks
> --
>
> Key: KAFKA-15427
> URL: https://issues.apache.org/jira/browse/KAFKA-15427
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.6.0
>
>
> The pull request ([https://github.com/apache/kafka/pull/14116]) for adding 
> the Tiered Storage test harness uncovered resource leaks as part of the build 
> ([https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14116/28/testReport/junit/org.apache.kafka.tiered.storage.integration/OffloadAndConsumeFromLeaderTest/Build___JDK_20_and_Scala_2_13___initializationError/)]
> This can be reproduced locally by running the following command:
> {code:java}
> ./gradlew --no-parallel --max-workers 1 -PmaxParallelForks=1 storage:test 
> --tests 
> org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManagerTest 
> --tests 
> org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest 
> --rerun{code}
> Output:
> {code:java}
> > Task :storage:testGradle Test Run :storage:test > Gradle Test Executor 3 > 
> > RemoteLogMetadataManagerTest > 
> > testRemotePartitionDeletion(RemoteLogMetadataManager) > 
> > remoteLogMetadataManager = 
> > org.apache.kafka.server.log.remote.storage.InmemoryRemoteLogMetadataManager@4cc76301
> >  PASSED
> Gradle Test Run :storage:test > Gradle Test Executor 3 > 
> RemoteLogMetadataManagerTest > 
> testRemotePartitionDeletion(RemoteLogMetadataManager) > 
> remoteLogMetadataManager = 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerWrapperWithHarness@2ca47471
>  PASSED
> Gradle Test Run :storage:test > Gradle Test Executor 3 > 
> RemoteLogMetadataManagerTest > testFetchSegments(RemoteLogMetadataManager) > 
> remoteLogMetadataManager = 
> org.apache.kafka.server.log.remote.storage.InmemoryRemoteLogMetadataManager@ce12fbb
>  PASSED
> Gradle Test Run :storage:test > Gradle Test Executor 3 > 
> RemoteLogMetadataManagerTest > testFetchSegments(RemoteLogMetadataManager) > 
> remoteLogMetadataManager = 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerWrapperWithHarness@69aabcb0
>  PASSED
> org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest.initializationError
>  failed, log available in 
> /Users/lolovc/Documents/kafka/storage/build/reports/testOutput/org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest.initializationError.test.stdoutGradle
>  Test Run :storage:test > Gradle Test Executor 3 > 
> OffloadAndConsumeFromLeaderTest > initializationError FAILED
>     org.opentest4j.AssertionFailedError: Found 2 unexpected threads during 
> @BeforeAll: `controller-event-thread,Test worker-EventThread` ==> expected: 
>  but was: 
> ... {code}
> The point of this Jira ticket is to find the resource leak and fix it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15427) Integration tests in TS test harness detect resource leaks

2023-09-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15427.
---
Resolution: Fixed

> Integration tests in TS test harness detect resource leaks
> --
>
> Key: KAFKA-15427
> URL: https://issues.apache.org/jira/browse/KAFKA-15427
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.6.0
>
>
> The pull request ([https://github.com/apache/kafka/pull/14116]) for adding 
> the Tiered Storage test harness uncovered resource leaks as part of the build 
> ([https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14116/28/testReport/junit/org.apache.kafka.tiered.storage.integration/OffloadAndConsumeFromLeaderTest/Build___JDK_20_and_Scala_2_13___initializationError/)]
> This can be reproduced locally by running the following command:
> {code:java}
> ./gradlew --no-parallel --max-workers 1 -PmaxParallelForks=1 storage:test 
> --tests 
> org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManagerTest 
> --tests 
> org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest 
> --rerun{code}
> Output:
> {code:java}
> > Task :storage:testGradle Test Run :storage:test > Gradle Test Executor 3 > 
> > RemoteLogMetadataManagerTest > 
> > testRemotePartitionDeletion(RemoteLogMetadataManager) > 
> > remoteLogMetadataManager = 
> > org.apache.kafka.server.log.remote.storage.InmemoryRemoteLogMetadataManager@4cc76301
> >  PASSED
> Gradle Test Run :storage:test > Gradle Test Executor 3 > 
> RemoteLogMetadataManagerTest > 
> testRemotePartitionDeletion(RemoteLogMetadataManager) > 
> remoteLogMetadataManager = 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerWrapperWithHarness@2ca47471
>  PASSED
> Gradle Test Run :storage:test > Gradle Test Executor 3 > 
> RemoteLogMetadataManagerTest > testFetchSegments(RemoteLogMetadataManager) > 
> remoteLogMetadataManager = 
> org.apache.kafka.server.log.remote.storage.InmemoryRemoteLogMetadataManager@ce12fbb
>  PASSED
> Gradle Test Run :storage:test > Gradle Test Executor 3 > 
> RemoteLogMetadataManagerTest > testFetchSegments(RemoteLogMetadataManager) > 
> remoteLogMetadataManager = 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerWrapperWithHarness@69aabcb0
>  PASSED
> org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest.initializationError
>  failed, log available in 
> /Users/lolovc/Documents/kafka/storage/build/reports/testOutput/org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest.initializationError.test.stdoutGradle
>  Test Run :storage:test > Gradle Test Executor 3 > 
> OffloadAndConsumeFromLeaderTest > initializationError FAILED
>     org.opentest4j.AssertionFailedError: Found 2 unexpected threads during 
> @BeforeAll: `controller-event-thread,Test worker-EventThread` ==> expected: 
>  but was: 
> ... {code}
> The point of this Jira ticket is to find the resource leak and fix it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15421) Enable DynamicBrokerReconfigurationTest#testThreadPoolResize test

2023-09-01 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-15421:
-

Assignee: Luke Chen

> Enable DynamicBrokerReconfigurationTest#testThreadPoolResize test
> -
>
> Key: KAFKA-15421
> URL: https://issues.apache.org/jira/browse/KAFKA-15421
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.6.0
>Reporter: Kamal Chandraprakash
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15397) Deserializing produce requests may cause memory leaks when exceptions occur

2023-08-31 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761155#comment-17761155
 ] 

Luke Chen commented on KAFKA-15397:
---

Do you get the heap dump when OOM happened?

> Deserializing produce requests may cause memory leaks when exceptions occur
> ---
>
> Key: KAFKA-15397
> URL: https://issues.apache.org/jira/browse/KAFKA-15397
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.8.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Blocker
> Attachments: SeaTalk_IMG_1692796505.png, SeaTalk_IMG_1692796533.png, 
> SeaTalk_IMG_1692796604.png, SeaTalk_IMG_1692796891.png
>
>
> When the client sends a produce request in an abnormal way and the server 
> accepts it for deserialization, a "java.lang.IllegalArgumentException" may 
> occur, which will cause a large number of "TopicProduceData" objects to be 
> instantiated without being cleaned up. In the end, the entire service of 
> kafka is OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15412) Reading an unknown version of quorum-state-file should trigger an error

2023-08-30 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15412.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> Reading an unknown version of quorum-state-file should trigger an error
> ---
>
> Key: KAFKA-15412
> URL: https://issues.apache.org/jira/browse/KAFKA-15412
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: John Mannooparambil
>Priority: Minor
> Fix For: 3.7.0
>
>
> Reading an unknown version of quorum-state-file should trigger an error. 
> Currently the only known version is 0. Reading any other version should cause 
> an error. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15397) Deserializing produce requests may cause memory leaks when exceptions occur

2023-08-29 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760224#comment-17760224
 ] 

Luke Chen commented on KAFKA-15397:
---

Thanks!

> Deserializing produce requests may cause memory leaks when exceptions occur
> ---
>
> Key: KAFKA-15397
> URL: https://issues.apache.org/jira/browse/KAFKA-15397
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.8.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Blocker
> Attachments: SeaTalk_IMG_1692796505.png, SeaTalk_IMG_1692796533.png, 
> SeaTalk_IMG_1692796604.png, SeaTalk_IMG_1692796891.png
>
>
> When the client sends a produce request in an abnormal way and the server 
> accepts it for deserialization, a "java.lang.IllegalArgumentException" may 
> occur, which will cause a large number of "TopicProduceData" objects to be 
> instantiated without being cleaned up. In the end, the entire service of 
> kafka is OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15397) Deserializing produce requests may cause memory leaks when exceptions occur

2023-08-29 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760215#comment-17760215
 ] 

Luke Chen commented on KAFKA-15397:
---

[~hudeqi] , could you add the link to the PR you mentioned above?  (i.e. 
”MINOR: Add more validation during KRPC deserialization“) Thanks.

> Deserializing produce requests may cause memory leaks when exceptions occur
> ---
>
> Key: KAFKA-15397
> URL: https://issues.apache.org/jira/browse/KAFKA-15397
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.8.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Blocker
> Attachments: SeaTalk_IMG_1692796505.png, SeaTalk_IMG_1692796533.png, 
> SeaTalk_IMG_1692796604.png, SeaTalk_IMG_1692796891.png
>
>
> When the client sends a produce request in an abnormal way and the server 
> accepts it for deserialization, a "java.lang.IllegalArgumentException" may 
> occur, which will cause a large number of "TopicProduceData" objects to be 
> instantiated without being cleaned up. In the end, the entire service of 
> kafka is OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15414) remote logs get deleted after partition reassignment

2023-08-28 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15414:
-

 Summary: remote logs get deleted after partition reassignment
 Key: KAFKA-15414
 URL: https://issues.apache.org/jira/browse/KAFKA-15414
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen
 Attachments: image-2023-08-29-11-12-58-875.png

it seems I'm reaching that codepath when running reassignments on my cluster 
and segment are deleted from remote store despite a huge retention (topic 
created a few hours ago with 1000h retention).
It seems to happen consistently on some partitions when reassigning but not all 
partitions.

My test:

I have a test topic with 30 partition configured with 1000h global retention 
and 2 minutes local retention
I have a load tester producing to all partitions evenly
I have consumer load tester consuming that topic
I regularly reset offsets to earliest on my consumer to test backfilling from 
tiered storage.

My consumer was catching up consuming the backlog and I wanted to upscale my 
cluster to speed up recovery: I upscaled my cluster from 3 to 12 brokers and 
reassigned my test topic to all available brokers to have an even 
leader/follower count per broker.

When I triggered the reassignment, the consumer lag dropped on some of my topic 
partitions:
!image-2023-08-29-11-12-58-875.png|width=800,height=79! Screenshot 2023-08-28 
at 20 57 09

Later I tried to reassign back my topic to 3 brokers and the issue happened 
again.

Both times in my logs, I've seen a bunch of logs like:

[RemoteLogManager=10005 partition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17] Deleted 
remote log segment RemoteLogSegmentId

{topicIdPartition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17, 
id=Mk0chBQrTyKETTawIulQog}

due to leader epoch cache truncation. Current earliest epoch: 
EpochEntry(epoch=14, startOffset=46776780), segmentEndOffset: 46437796 and 
segmentEpochs: [10]

Looking at my s3 bucket. The segments prior to my reassignment have been indeed 
deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15409) Distinguishing controller configs from broker configs in KRaft mode

2023-08-28 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15409:
-

 Summary: Distinguishing controller configs from broker configs in 
KRaft mode
 Key: KAFKA-15409
 URL: https://issues.apache.org/jira/browse/KAFKA-15409
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: Luke Chen
Assignee: Luke Chen


In the doc, we category the configs by components. Currently, we have:

{code:java}

3. Configuration
3.1 Broker Configs
3.2 Topic Configs
3.3 Producer Configs
3.4 Consumer Configs
3.5 Kafka Connect Configs
Source Connector Configs
Sink Connector Configs 
3.6 Kafka Streams Configs
3.7 AdminClient Configs
3.8 System Properties 
{code}

In the `3.1 Broker Configs` section, currently it contains:
1. controller role only configs
2. broker role only configs
3. controller and broker both applicable configs

We should have a way to allow users to know which configs are for controller, 
and which are for broker, and which are for both.


Created a 
[wiki|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263427911]
 to list the configs for controller/broker.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14780) Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay deterministic

2023-08-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14780:
--
Fix Version/s: 3.7.0
   (was: 3.6.0)

> Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay 
> deterministic
> 
>
> Key: KAFKA-14780
> URL: https://issues.apache.org/jira/browse/KAFKA-14780
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Minor
> Fix For: 3.7.0
>
>
> The test {{RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay}} 
> relies on the actual system clock which makes it frequently fail on my poor 
> intellij setup.
>  
> The {{{}RefreshingHttpsJwks{}}}`component creates and uses a scheduled 
> executor service. We could expose the scheduling mechanism to be able to mock 
> its behaviour. One way to do could be to use the {{KafkaScheduler}} which has 
> a {{MockScheduler}} implementation which relies on {{MockTime}} instead of 
> the real time clock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14780) Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay deterministic

2023-08-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14780.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay 
> deterministic
> 
>
> Key: KAFKA-14780
> URL: https://issues.apache.org/jira/browse/KAFKA-14780
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Minor
> Fix For: 3.6.0
>
>
> The test {{RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay}} 
> relies on the actual system clock which makes it frequently fail on my poor 
> intellij setup.
>  
> The {{{}RefreshingHttpsJwks{}}}`component creates and uses a scheduled 
> executor service. We could expose the scheduling mechanism to be able to mock 
> its behaviour. One way to do could be to use the {{KafkaScheduler}} which has 
> a {{MockScheduler}} implementation which relies on {{MockTime}} instead of 
> the real time clock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that already converted above. At this point, when we try to extract the 
ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an 
AlterPartition request with empty ISR, and impacting the kafka availability.

 

>From the log, I can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 
h4. *Impact:*

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

>From the log, I can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 
h4. *Impact:*

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  

[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/6bd17419b76f8cf8d7e4a11c071494dfaa72cd50/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

>From the log, I can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 
h4. *Impact:*

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

>From the log, I can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 
h4. *Impact:*

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5

[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

>From the log, I can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 
h4. *Impact:*

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

>From the log, I can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>

[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
{code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> In 
> [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
>  (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
> bumped the AlterPartitionRequest version to 3 to use `NewIsrW

[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

>From the log, I can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
...
[2023-08-16 03:57:55,157] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition test_topic-1 
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.NotEnoughReplicasException:
 The size of the current ISR Set() is insufficient to satisfy the min.isr 
requirement of 2 for partition test_topic-1 {code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Blocker

[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, if 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
{code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
{code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> In 
> [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
>  (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
> bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
> instead of `NewIsr` one. And when building the request for older version, 
> we'll manually convert/downgrade the request into the older version for 
> backward compatibility 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L

[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request for older version, we'll 
manually convert/downgrade the request into the older version for backward 
compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
{code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request, we'll manually 
convert/downgrade the request into the older version for backward compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
{code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> In 
> [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
>  (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
> bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
> instead of `NewIsr` one. And when building the request for older version, 
> we'll manually convert/downgrade the request into the older version for 
> backward compatibility 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
>  to extract ISR

[jira] [Commented] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754976#comment-17754976
 ] 

Luke Chen commented on KAFKA-15353:
---

I'm setting this issue as blocker for v3.5.2 and v3.6.0. Let me know if you 
have any thoughts.

Sorry that I'm going to attend a conference the following days, so I can't 
submit patch for this issue soon. Welcome to take it over if anyone has 
available cycle. Thanks.

cc [~calvinliu] [~junrao] [~dajac] 

 

 

> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> In 
> [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
>  (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
> bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
> instead of `NewIsr` one. And when building the request, we'll manually 
> convert/downgrade the request into the older version for backward 
> compatibility 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
>  to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
> field, and then clear the `NewIsrWithEpochs` field.
>  
> The problem is, when the AlterPartitionRequest sent out for the first time, 
> there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
> we'll build the AlterPartitionRequest again. But this time, the request data 
> is the one that converted above. At this point, when we try to extract the 
> ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an 
> AlterPartition request with empty ISR, and impacting the kafka availability. 
> From the log, I can see this:
> {code:java}
> [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated 
> to  (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
> {code}
>  
> This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 
> or later. During the rolling upgrade, there will be some nodes in v3.5.0, and 
> some are not. So, for the node in v3.5.0 will try to build an old version of 
> AlterPartitionRequest. And then, if it happen to have some transient error 
> during the AlterPartitionRequest send, the ISR will be empty and no producers 
> will be able to write data to the partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Fix Version/s: 3.6.0
   3.5.2

> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> In 
> [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
>  (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
> bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
> instead of `NewIsr` one. And when building the request, we'll manually 
> convert/downgrade the request into the older version for backward 
> compatibility 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
>  to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
> field, and then clear the `NewIsrWithEpochs` field.
>  
> The problem is, when the AlterPartitionRequest sent out for the first time, 
> there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
> we'll build the AlterPartitionRequest again. But this time, the request data 
> is the one that converted above. At this point, when we try to extract the 
> ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an 
> AlterPartition request with empty ISR, and impacting the kafka availability. 
> From the log, I can see this:
> {code:java}
> [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated 
> to  (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
> {code}
>  
> This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 
> or later. During the rolling upgrade, there will be some nodes in v3.5.0, and 
> some are not. So, for the node in v3.5.0 will try to build an old version of 
> AlterPartitionRequest. And then, if it happen to have some transient error 
> during the AlterPartitionRequest send, the ISR will be empty and no producers 
> will be able to write data to the partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Priority: Blocker  (was: Major)

> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Blocker
>
> In 
> [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
>  (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
> bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
> instead of `NewIsr` one. And when building the request, we'll manually 
> convert/downgrade the request into the older version for backward 
> compatibility 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
>  to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
> field, and then clear the `NewIsrWithEpochs` field.
>  
> The problem is, when the AlterPartitionRequest sent out for the first time, 
> there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
> we'll build the AlterPartitionRequest again. But this time, the request data 
> is the one that converted above. At this point, when we try to extract the 
> ISR from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an 
> AlterPartition request with empty ISR, and impacting the kafka availability. 
> From the log, I can see this:
> {code:java}
> [2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated 
> to  (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
> {code}
>  
> This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 
> or later. During the rolling upgrade, there will be some nodes in v3.5.0, and 
> some are not. So, for the node in v3.5.0 will try to build an old version of 
> AlterPartitionRequest. And then, if it happen to have some transient error 
> during the AlterPartitionRequest send, the ISR will be empty and no producers 
> will be able to write data to the partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15353:
--
Description: 
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request, we'll manually 
convert/downgrade the request into the older version for backward compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field.

 

The problem is, when the AlterPartitionRequest sent out for the first time, 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability. From the log, I 
can see this:
{code:java}
[2023-08-16 03:57:55,122] INFO [Partition test_topic-1 broker=3] ISR updated to 
 (under-min-isr) and version updated to 9 (kafka.cluster.Partition)
{code}
 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.

  was:
In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request, we'll manually 
convert/downgrade the request into the older version for backward compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field. 

 

The problem is, when the AlterPartitionRequest sent out for the first time, 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.


> Empty ISR returned from controller after AlterPartition request
> ---
>
> Key: KAFKA-15353
> URL: https://issues.apache.org/jira/browse/KAFKA-15353
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Priority: Major
>
> In 
> [KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
>  (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
> bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
> instead of `NewIsr` one. And when building the request, we'll manually 
> convert/downgrade the request into the older version for backward 
> compatibility 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
>  to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
> field, and then clear the `NewIsrWithEpochs` field.
>  
> The problem is, when the AlterPartitionRequest sent out for the first time, 
> there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the ret

[jira] [Created] (KAFKA-15353) Empty ISR returned from controller after AlterPartition request

2023-08-16 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15353:
-

 Summary: Empty ISR returned from controller after AlterPartition 
request
 Key: KAFKA-15353
 URL: https://issues.apache.org/jira/browse/KAFKA-15353
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.5.0
Reporter: Luke Chen


In 
[KIP-903|https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR],
 (more specifically this [PR|https://github.com/apache/kafka/pull/13408]), we 
bumped the AlterPartitionRequest version to 3 to use `NewIsrWithEpochs` field 
instead of `NewIsr` one. And when building the request, we'll manually 
convert/downgrade the request into the older version for backward compatibility 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L85-L96],
 to extract ISR info from `NewIsrWithEpochs` and then fill in the `NewIsr` 
field, and then clear the `NewIsrWithEpochs` field. 

 

The problem is, when the AlterPartitionRequest sent out for the first time, 
there's some transient error (ex: NOT_CONTROLLER), we'll retry. On the retry, 
we'll build the AlterPartitionRequest again. But this time, the request data is 
the one that converted above. At this point, when we try to extract the ISR 
from `NewIsrWithEpochs`, we'll get empty. So, we'll send out an AlterPartition 
request with empty ISR, and impacting the kafka availability.

 

This will happen when users trying to upgrade from versions < 3.5.0 to 3.5.0 or 
later. During the rolling upgrade, there will be some nodes in v3.5.0, and some 
are not. So, for the node in v3.5.0 will try to build an old version of 
AlterPartitionRequest. And then, if it happen to have some transient error 
during the AlterPartitionRequest send, the ISR will be empty and no producers 
will be able to write data to the partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15329) Make default `remote.log.metadata.manager.class.name` as topic based RLMM

2023-08-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15329.
---
Resolution: Fixed

> Make default `remote.log.metadata.manager.class.name` as topic based RLMM
> -
>
> Key: KAFKA-15329
> URL: https://issues.apache.org/jira/browse/KAFKA-15329
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0
>
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs.1]
>  , we should set default "remote.log.metadata.manager.class.name" as topic 
> based RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-08-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15083:
--
Description: 
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.

 

When configuring RLMM, the configs passed into {{configure}} method is the 
{{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's 
no configs related to {{{}remote.log.metadata.*{}}}, ex: 
{{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: 
{{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), 
and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass 
into RLMM, including 
{{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/
 {{remote.log.metadata.consumer.}} prefixes.

Ex:
{code:java}
# default value 
# remote.log.storage.manager.impl.prefix=rsm.config. 
# remote.log.metadata.manager.impl.prefix=rlmm.config. 

# pass to RLMM
rlmm.config.remote.log.metadata.topic.num.partitions=50 
rlmm.config.remote.log.metadata.topic.replication.factor=4 

# pass to RSM
rsm.config.test=value {code}

  was:
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.

 

When configuring RLMM, the configs passed into {{configure}} method is the 
{{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's 
no configs related to {{{}remote.log.metadata.*{}}}, ex: 
{{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: 
{{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), 
and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass 
into RLMM, including 
{{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/
 {{remote.log.metadata.consumer.}} prefixes.

Ex:
# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.

rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4

rsm.config.test=value


> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific

[jira] [Updated] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-08-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15083:
--
Description: 
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.

 

When configuring RLMM, the configs passed into {{configure}} method is the 
{{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's 
no configs related to {{{}remote.log.metadata.*{}}}, ex: 
{{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: 
{{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), 
and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass 
into RLMM, including 
{{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/
 {{remote.log.metadata.consumer.}} prefixes.

Ex:
# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.

rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4

rsm.config.test=value

  was:
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.


> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.
>  
> When configuring RLMM, the configs passed into {{configure}} method is the 
> {{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, 
> there's no configs related to {{{}remote.log.metadata.*{}}}, ex: 
> {{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
> set the config in broker, it'll never be applied.
> This PR fixed the issue to allow users setting RLMM prefix: 
> {{remote.log.metadata.m

[jira] [Created] (KAFKA-15329) Make default `remote.log.metadata.manager.class.name` as topic based RLMM

2023-08-10 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15329:
-

 Summary: Make default `remote.log.metadata.manager.class.name` as 
topic based RLMM
 Key: KAFKA-15329
 URL: https://issues.apache.org/jira/browse/KAFKA-15329
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen
 Fix For: 3.6.0


As described in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs.1]
 , we should set default "remote.log.metadata.manager.class.name" as topic 
based RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14421) OffsetFetchRequest throws NPE Exception

2023-08-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14421.
---
Resolution: Resolved

Resolved in the later release.

> OffsetFetchRequest throws NPE Exception
> ---
>
> Key: KAFKA-14421
> URL: https://issues.apache.org/jira/browse/KAFKA-14421
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: yws
>Assignee: yws
>Priority: Major
> Attachments: image-2022-11-27-22-28-52-165.png, 
> image-2022-11-27-22-41-45-358.png
>
>
> when I use 0.10.2 client  send Metadata request to  0.10.0 server,  NPE 
> exception happens,
>  !image-2022-11-27-22-28-52-165.png! 
> the NPE exception quite confused me,  because if  just send Metadata request 
> doest not cause the NPE exception occurs, after troubleshooting the problem, 
> It is the NetworkClient#poll call  ConsumerNetworkClient#trySend  and further 
> call NetworkClient#doSendwhen trying to build OffsetFetchRequest, because 
> the 0.10.0 server doest not support  fetch all TopicPartitions, it throw 
> UnsupportedVersionException, 
> {code:java}
> private void doSend(ClientRequest clientRequest, boolean isInternalRequest, 
> long now) {
> String nodeId = clientRequest.destination();
> ..
> AbstractRequest request = null;
> AbstractRequest.Builder builder = clientRequest.requestBuilder();
> try {
> NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
> // Note: if versionInfo is null, we have no server version 
> information. This would be
> // the case when sending the initial ApiVersionRequest which 
> fetches the version
> // information itself.  It is also the case when 
> discoverBrokerVersions is set to false.
> if (versionInfo == null) {
> if (discoverBrokerVersions && log.isTraceEnabled())
> log.trace("No version information found when sending 
> message of type {} to node {}. " +
> "Assuming version {}.", clientRequest.apiKey(), 
> nodeId, builder.version());
> } else {
> short version = 
> versionInfo.usableVersion(clientRequest.apiKey());
> builder.setVersion(version);
> }
> // The call to build may also throw UnsupportedVersionException, 
> if there are essential
> // fields that cannot be represented in the chosen version.
> request = builder.build();
> } catch (UnsupportedVersionException e) {
> // If the version is not supported, skip sending the request over 
> the wire.
> // Instead, simply add it to the local queue of aborted requests.
> log.debug("Version mismatch when attempting to send {} to {}",
> clientRequest.toString(), clientRequest.destination(), e);
> ClientResponse clientResponse = new 
> ClientResponse(clientRequest.makeHeader(),
> clientRequest.callback(), clientRequest.destination(), 
> now, now,
> false, e, null);
> abortedSends.add(clientResponse);
> return;
> }
> {code}
>  !image-2022-11-27-22-41-45-358.png! 
> until now, all are expected, but unfortunately, in catch 
> UnsupportedVersionException code block, clientRequest.toString need to call 
> requestBuilder#toString, that is OffsetFetchRequest's Builder#toString, when 
> partition is ALL_TOPIC_PARTITIONS, it is null, therefore it cause the 
> unexpected NPE, and make the normal MetadataRequest failed..  
> {code:java}
> catch (UnsupportedVersionException e) {
>  
> log.debug("Version mismatch when attempting to send {} to {}",
> clientRequest.toString(), clientRequest.destination(), e);
> ClientResponse clientResponse = new 
> ClientResponse(clientRequest.makeHeader(),
> clientRequest.callback(), clientRequest.destination(), 
> now, now,
> false, e, null);
> abortedSends.add(clientResponse);
> return;
> }
> ClientRequest#toString()
>public String toString() {
> return "ClientRequest(expectResponse=" + expectResponse +
> ", callback=" + callback +
> ", destination=" + destination +
> ", correlationId=" + correlationId +
> ", clientId=" + clientId +
> ", createdTimeMs=" + createdTimeMs +
> ", requestBuilder=" + requestBuilder +
> ")";
> }
>   OffsetFetchRequest's Builder#toString
> public String toString() {
> StringBuilder bld = new StringBuilder();
> bld.append("(type=OffsetFetchRe

[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751095#comment-17751095
 ] 

Luke Chen commented on KAFKA-15267:
---

(1) and (3) are good to me. The only concern is we don't have KIP for adopting 
4.1. Could we at least start a discussion thread in dev mailing list to see if 
there is any concern from the community?

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires customers to clean any tiered topics before 
> switching off TS, which is a

[jira] [Commented] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751036#comment-17751036
 ] 

Luke Chen commented on KAFKA-15083:
---

[~satish.duggana] , sorry that I didn't get the ping last time. 

The issue here is, when configuring RLMM, the configs passed into `configure` 
method is the `RemoteLogManagerConfig`. But in `RemoteLogManagerConfig`, 
there's no configs related to `{_}{color:#00}remote.log.metadata.*`, ex: 
`remote.log.metadata.topic.replication.factor`.{color}{_} {color:#00}So, 
even if users have set the config in broker, it'll never be applied. Working on 
a patch now.{color}

> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751033#comment-17751033
 ] 

Luke Chen commented on KAFKA-15267:
---

Do we have any update on this issue? I think this is a blocker for v3.6.0, do 
you agree?

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires customers to clean any tiered topics before 
> switching off TS, which is a fair ask. Should we wish to revise this later it 
> should be possible.
> h2. Option 5: Make Kafka forget abou

[jira] [Commented] (KAFKA-15295) Add config validation when remote storage is enabled on a topic

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751031#comment-17751031
 ] 

Luke Chen commented on KAFKA-15295:
---

[~ckamal] , are you still working on this? If you don't have time, I can work 
on it. Please let me know. Thanks.

> Add config validation when remote storage is enabled on a topic
> ---
>
> Key: KAFKA-15295
> URL: https://issues.apache.org/jira/browse/KAFKA-15295
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> If system level remote storage is not enabled, then enabling remote storage 
> on a topic should throw exception while validating the configs. 
> See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for 
> more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15294) Make remote storage related configs as public (i.e. non-internal)

2023-08-02 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15294:
-

 Summary: Make remote storage related configs as public (i.e. 
non-internal)
 Key: KAFKA-15294
 URL: https://issues.apache.org/jira/browse/KAFKA-15294
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen
 Fix For: 3.6.0


We should publish all the remote storage related configs in v3.6.0. It can be 
verified by:

 
{code:java}
./gradlew releaseTarGz

# The build output is stored in 
./core/build/distributions/kafka_2.13-3.x.x-site-docs.tgz. Untar the file 
verify it{code}
{{}}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750101#comment-17750101
 ] 

Luke Chen edited comment on KAFKA-15267 at 8/2/23 5:22 AM:
---

For option (4),

It looks like we want to validate one of the 2 things before startup when 
{{remote.storage.enabled}} is disabled:

4.1. all tiering topics are deleted

4.2. all tiering topics are disabled and no data on remote storage

 

For (4.2), before KIP-950, are we able to adopt this solution?

 

I'm fine for solution (4.1). But IMO if we chose to adopt option (4), it might 
need a KIP to mention this change because it is a non-backward compatible 
change, especially it doesn't impact the specific topic only, it'll impact the 
whole cluster. For some users, they might feel surprised when seeing the broker 
can't startup after enabling tiered storage for testing, and then disabling it.


was (Author: showuon):
For option (4),

It looks like we can want to validate one of the 2 things before startup when 
{{remote.storage.enabled}} is disabled:

4.1. all tiering topics are deleted

4.2. all tiering topics are disabled and no data on remote storage

 

For (4.2), before KIP-950, are we able to adopt this solution?

 

I'm fine for solution (4.1). But IMO if we chose to adopt option (4), it might 
need a KIP to mention this change because it is a non-backward compatible 
change, especially it doesn't impact the specific topic only, it'll impact the 
whole cluster. For some users, they might feel surprised when seeing the broker 
can't startup after enabling tiered storage for testing, and then disabling it.

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all t

[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750101#comment-17750101
 ] 

Luke Chen commented on KAFKA-15267:
---

For option (4),

It looks like we can want to validate one of the 2 things before startup when 
{{remote.storage.enabled}} is disabled:

4.1. all tiering topics are deleted

4.2. all tiering topics are disabled and no data on remote storage

 

For (4.2), before KIP-950, are we able to adopt this solution?

 

I'm fine for solution (4.1). But IMO if we chose to adopt option (4), it might 
need a KIP to mention this change because it is a non-backward compatible 
change, especially it doesn't impact the specific topic only, it'll impact the 
whole cluster. For some users, they might feel surprised when seeing the broker 
can't startup after enabling tiered storage for testing, and then disabling it.

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine wheth

[jira] [Commented] (KAFKA-15107) Additional custom metadata for remote log segment

2023-08-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749589#comment-17749589
 ] 

Luke Chen commented on KAFKA-15107:
---

Set as blocker for 3.6.0 since this KIP assumes it'll be released as tiered 
storage feature in 3.6.0

> Additional custom metadata for remote log segment
> -
>
> Key: KAFKA-15107
> URL: https://issues.apache.org/jira/browse/KAFKA-15107
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Blocker
>  Labels: tiered-storage
> Fix For: 3.6.0
>
>
> Based on the [KIP-917: Additional custom metadata for remote log 
> segment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-917%3A+Additional+custom+metadata+for+remote+log+segment],
>  the following needs to be implemented:
>  # {{{}RemoteLogSegmentMetadata{}}}{{{}.CustomMetadata{}}}.
>  # {{RemoteStorageManager.copyLogSegmentData}} needs to be updated to the new 
> return type (+ javadoc).
>  # {{RemoteLogSegmentMetadata.customMetadata}} and 
> {{RemoteLogSegmentMetadata.createWithCustomMetadata}} methods. Same in 
> {{{}RemoteLogSegmentMetadataSnapshot{}}}.
>  # {{RemoteLogSegmentMetadataRecord}} and 
> {{RemoteLogSegmentMetadataSnapshotRecord}} definitions need to be updated.
>  # Custom metadata should be persisted by {{RemoteLogManager}} if provided.
>  # The new config {{remote.log.metadata.custom.metadata.max.size}} needs to 
> be introduced.
>  # The custom metadata size limit must be applied according to the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15107) Additional custom metadata for remote log segment

2023-08-01 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15107:
--
Priority: Blocker  (was: Minor)

> Additional custom metadata for remote log segment
> -
>
> Key: KAFKA-15107
> URL: https://issues.apache.org/jira/browse/KAFKA-15107
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Blocker
>  Labels: tiered-storage
> Fix For: 3.6.0
>
>
> Based on the [KIP-917: Additional custom metadata for remote log 
> segment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-917%3A+Additional+custom+metadata+for+remote+log+segment],
>  the following needs to be implemented:
>  # {{{}RemoteLogSegmentMetadata{}}}{{{}.CustomMetadata{}}}.
>  # {{RemoteStorageManager.copyLogSegmentData}} needs to be updated to the new 
> return type (+ javadoc).
>  # {{RemoteLogSegmentMetadata.customMetadata}} and 
> {{RemoteLogSegmentMetadata.createWithCustomMetadata}} methods. Same in 
> {{{}RemoteLogSegmentMetadataSnapshot{}}}.
>  # {{RemoteLogSegmentMetadataRecord}} and 
> {{RemoteLogSegmentMetadataSnapshotRecord}} definitions need to be updated.
>  # Custom metadata should be persisted by {{RemoteLogManager}} if provided.
>  # The new config {{remote.log.metadata.custom.metadata.max.size}} needs to 
> be introduced.
>  # The custom metadata size limit must be applied according to the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-07-31 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749476#comment-17749476
 ] 

Luke Chen commented on KAFKA-15267:
---

[~christo_lolov] , thanks for raising this issue.

I was trying to remember when KRaft feature released, did we support downgrade 
to a ZK only cluster? I remember there's no this downgrade path, right? Checked 
[KIP-500|https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum#KIP500:ReplaceZooKeeperwithaSelfManagedMetadataQuorum-Compatibility,Deprecation,andMigrationPlan],
 I didn't see any downgrade path described, only upgrade path. 

So, I'm thinking should we worry about the downgrade issue?

If no, I think we can only consider enable/disable issue? If so, maybe solution 
(1) (i.e. change nothing) should be good enough, since when system wide TS 
disabled, remote log manager won't be initialized, and no further works will be 
done. 


Thoughts?

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that 

[jira] [Commented] (KAFKA-14908) Sporadic "Address already in use" when starting kafka cluster embedded within tests

2023-07-26 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17747721#comment-17747721
 ] 

Luke Chen commented on KAFKA-14908:
---

Thanks [~divijvaidya] , but I cannot see the `Address already in use` error 
from the gradle error output. How do you identify it?

> Sporadic "Address already in use" when starting kafka cluster embedded within 
> tests
> ---
>
> Key: KAFKA-14908
> URL: https://issues.apache.org/jira/browse/KAFKA-14908
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Keith Wall
>Priority: Major
> Fix For: 3.6.0
>
>
> We have an integration test suite that starts/stops a kafka cluster 
> before/after each test.   Kafka is being started programmatically within the 
> same JVM that is running the tests.
> Sometimes we get sporadic failures from with Kafka as it tries to bind the 
> server socket.
> {code:java}
> org.apache.kafka.common.KafkaException: Socket server failed to bind to 
> 0.0.0.0:9092: Address already in use.
>     at kafka.network.Acceptor.openServerSocket(SocketServer.scala:684)
>     at kafka.network.Acceptor.(SocketServer.scala:576)
>     at kafka.network.DataPlaneAcceptor.(SocketServer.scala:433)
>     at 
> kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:247)
>     at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:226)
>     at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:173)
>     at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:173)
>     at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
>     at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>     at kafka.network.SocketServer.(SocketServer.scala:173)
>     at kafka.server.KafkaServer.startup(KafkaServer.scala:331) {code}
> Investigation has shown that the socket is in the timed_wait state from a 
> previous test.
> I know Kafka supports ephemeral ports, but this isn't convenient to our 
> use-case.  
> I'd like to suggest that Kafka is changed to set the SO_REUSEADDR on the 
> server socket.  I believe this is standard practice for server applications 
> that run on well known ports .
> I don't believe this change would introduce a backward compatibility 
> concerns. 
>  
> I will open a PR so that can be considered. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0

2023-07-25 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-15239:
-

Assignee: Federico Valeri

> producerPerformance system test for old client failed after v3.5.0
> --
>
> Key: KAFKA-15239
> URL: https://issues.apache.org/jira/browse/KAFKA-15239
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 3.6.0
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>
> While running producer performance tool in system test for old client (ex: 
> quota_test), we will try to run with the dev-branch's jar file, to make sure 
> it is backward compatible, as described 
> [here|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/producer_performance.py#L86-L88],.
> {code:java}
> # In order to ensure more consistent configuration between versions, always 
> use the ProducerPerformance tool from the development branch {code}
>  
> But in KAFKA-14525, we're moving tools from core module to a separate tool 
> module, we're actually breaking the backward compatibility. We should fix the 
> system test. Also maybe we should also mention anywhere about this backward 
> compatibility issue?
> Note:
> This is the command run in system test. Suppose it's testing old client 3.4.0 
> (file put under `~/Downloads/kafka_2.13-3.4.0` in my env), and running under 
> the latest trunk env.
> {code:java}
> > for file in ./tools/build/libs/kafka-tools*.jar; do 
> > CLASSPATH=$CLASSPATH:$file; done; for file in 
> > ./tools/build/dependant-libs*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; 
> > export CLASSPATH;  export 
> > KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:config/tools-log4j.properties";
> >  KAFKA_OPTS= KAFKA_HEAP_OPTS="-XX:+HeapDumpOnOutOfMemoryError" 
> > ~/Downloads/kafka_2.13-3.4.0/bin/kafka-run-class.sh 
> > org.apache.kafka.tools.ProducerPerformance --topic test_topic --num-records 
> > 5 --record-size 3000 --throughput -1 --producer-props 
> > bootstrap.servers=localhost:9092 client.id=overridden_id 
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/utils/ThroughputThrottler
>     at 
> org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101)
>     at 
> org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.utils.ThroughputThrottler
>     at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
>     ... 2 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0

2023-07-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17747302#comment-17747302
 ] 

Luke Chen commented on KAFKA-15239:
---

Thanks [~fvaleri] ! Assigning to you now.

> producerPerformance system test for old client failed after v3.5.0
> --
>
> Key: KAFKA-15239
> URL: https://issues.apache.org/jira/browse/KAFKA-15239
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 3.6.0
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>
> While running producer performance tool in system test for old client (ex: 
> quota_test), we will try to run with the dev-branch's jar file, to make sure 
> it is backward compatible, as described 
> [here|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/producer_performance.py#L86-L88],.
> {code:java}
> # In order to ensure more consistent configuration between versions, always 
> use the ProducerPerformance tool from the development branch {code}
>  
> But in KAFKA-14525, we're moving tools from core module to a separate tool 
> module, we're actually breaking the backward compatibility. We should fix the 
> system test. Also maybe we should also mention anywhere about this backward 
> compatibility issue?
> Note:
> This is the command run in system test. Suppose it's testing old client 3.4.0 
> (file put under `~/Downloads/kafka_2.13-3.4.0` in my env), and running under 
> the latest trunk env.
> {code:java}
> > for file in ./tools/build/libs/kafka-tools*.jar; do 
> > CLASSPATH=$CLASSPATH:$file; done; for file in 
> > ./tools/build/dependant-libs*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; 
> > export CLASSPATH;  export 
> > KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:config/tools-log4j.properties";
> >  KAFKA_OPTS= KAFKA_HEAP_OPTS="-XX:+HeapDumpOnOutOfMemoryError" 
> > ~/Downloads/kafka_2.13-3.4.0/bin/kafka-run-class.sh 
> > org.apache.kafka.tools.ProducerPerformance --topic test_topic --num-records 
> > 5 --record-size 3000 --throughput -1 --producer-props 
> > bootstrap.servers=localhost:9092 client.id=overridden_id 
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/utils/ThroughputThrottler
>     at 
> org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101)
>     at 
> org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.utils.ThroughputThrottler
>     at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
>     ... 2 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15218) NPE will be thrown while deleting topic and fetch from follower concurrently

2023-07-25 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15218.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> NPE will be thrown while deleting topic and fetch from follower concurrently
> 
>
> Key: KAFKA-15218
> URL: https://issues.apache.org/jira/browse/KAFKA-15218
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.6.0
>
>
> When deleting topics, we'll first clear all the remoteReplicaMap when 
> stopPartitions 
> [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/server/ReplicaManager.scala#L554].
>  But this time, there might be fetch request coming from follower, and try to 
> check if the replica is eligible to be added into ISR 
> [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/cluster/Partition.scala#L1001].
>  At this moment, NPE will be thrown. Although it's fine since this topic is 
> already deleted, it'd be better to avoid it happen.
>  
>  
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "kafka.cluster.Replica.stateSnapshot()" because the return value of 
> "kafka.utils.Pool.get(Object)" is null  at 
> kafka.cluster.Partition.isReplicaIsrEligible(Partition.scala:992) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.cluster.Partition.canAddReplicaToIsr(Partition.scala:974) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.cluster.Partition.maybeExpandIsr(Partition.scala:947) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:866) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.cluster.Partition.fetchRecords(Partition.scala:1361) 
> ~[kafka_2.13-3.5.0.jar:?] at 
> kafka.server.ReplicaManager.read$1(ReplicaManager.scala:1164) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.server.ReplicaManager.$anonfun$readFromLocalLog$7(ReplicaManager.scala:1235)
>  ~[kafka_2.13-3.5.0.jar:?] at 
> scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) 
> ~[scala-library-2.13.10.jar:?]  at 
> scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) 
> ~[scala-library-2.13.10.jar:?] at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:933) 
> ~[scala-library-2.13.10.jar:?] at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:1234) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1044) 
> ~[kafka_2.13-3.5.0.jar:?]   at 
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:994) 
> ~[kafka_2.13-3.5.0.jar:?] at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:181) ~[kafka_2.13-3.5.0.jar:?] 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) 
> ~[kafka_2.13-3.5.0.jar:?] at java.lang.Thread.run(Thread.java:1623) [?:?] 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    1   2   3   4   5   6   7   8   9   10   >