[GitHub] [kafka] dengziming commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…

2022-05-16 Thread GitBox


dengziming commented on PR #12170:
URL: https://github.com/apache/kafka/pull/12170#issuecomment-1128442992

   It's OK to add topicId in the docs, the segment.bytes property is not 
intended to be printed, this seems to be a small bug: KAFKA-13718


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode

2022-05-16 Thread GitBox


dengziming commented on PR #12109:
URL: https://github.com/apache/kafka/pull/12109#issuecomment-1128357555

   Resolve conflicts, and I also changed `ZkAdminManager.createTopics` to throw 
`ConfigException` on null config values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable

2022-05-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13907:
--
Labels: newbie  (was: )

> Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
> --
>
> Key: KAFKA-13907
> URL: https://issues.apache.org/jira/browse/KAFKA-13907
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Priority: Major
>  Labels: newbie
>
> ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang 
> up waiting for controlled shutdown, there may be some bug related to it.
> since this bug can be reproduced locally, it won't be hard to investigated.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


dengziming commented on PR #12165:
URL: https://github.com/apache/kafka/pull/12165#issuecomment-1128348174

   Thank you @showuon @hachikuji, I changed the assertion and disabled 
`testCleanShutdownWithKRaftControllerUnavailable ` and created KAFKA-13907 for 
this bug.
   For `testCleanShutdownAfterFailedStartup`, currently it only fails 
occasionally, so didn't disable it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable

2022-05-16 Thread dengziming (Jira)
dengziming created KAFKA-13907:
--

 Summary: Fix hanging 
ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
 Key: KAFKA-13907
 URL: https://issues.apache.org/jira/browse/KAFKA-13907
 Project: Kafka
  Issue Type: Bug
Reporter: dengziming


ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang up 
waiting for controlled shutdown, there may be some bug related to it.

since this bug can be reproduced locally, it won't be hard to investigated.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537882#comment-17537882
 ] 

Luke Chen commented on KAFKA-12495:
---

No problem, [~mcabrera] !

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Luke Chen
>Priority: Critical
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]
> W1 becomes leader 
> W1 

[GitHub] [kafka] bozhao12 commented on a diff in pull request #12158: MINOR:A few code cleanUps in KafkaController

2022-05-16 Thread GitBox


bozhao12 commented on code in PR #12158:
URL: https://github.com/apache/kafka/pull/12158#discussion_r874295581


##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -468,14 +468,6 @@ class KafkaController(val config: KafkaConfig,
 
 // shutdown leader rebalance scheduler
 kafkaScheduler.shutdown()
-offlinePartitionCount = 0

Review Comment:
   @hachikuji Thanks for your suggestion, I updated the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


showuon commented on PR #12165:
URL: https://github.com/apache/kafka/pull/12165#issuecomment-1128325494

   @dengziming , I'd suggest we follow @hachikuji 's advice to add an 
`assertCause` to fix the failed tests. I can create another JIRA to address our 
thoughts in this comment: 
https://github.com/apache/kafka/pull/12165#discussion_r873557577
   
   Also, as Jason mentioned, there are still 2 failed tests:
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
kafka.server.ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable(String).quorum=kraft
   ```
   
   If you don't have time, you can just disable them first, and create jira 
ticket for them. Let me know if you need help. I hope we can fix them in our 
timezone today. The failed tests make my PR build result very bad (and I don't 
like that, haha)
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12171: MINOR: Convert admin integration tests

2022-05-16 Thread GitBox


hachikuji opened a new pull request, #12171:
URL: https://github.com/apache/kafka/pull/12171

   This patch enables KRaft support in `PlaintextAdminIntegrationTest`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java

2022-05-16 Thread Kvicii.Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537864#comment-17537864
 ] 

Kvicii.Yu edited comment on KAFKA-13867 at 5/17/22 1:24 AM:


[~cmccabe] hi, My understanding is to just modify the method name of 
ibpVersion, rename to ibpVersion, and change all the locations where this 
method is used?


was (Author: JIRAUSER283467):
[~cmccabe] hi, My understanding is to just modify the method name of 
ibpVersion, rename to ibpVersion?

> Improve JavaDoc for MetadataVersion.java
> 
>
> Key: KAFKA-13867
> URL: https://issues.apache.org/jira/browse/KAFKA-13867
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java

2022-05-16 Thread Kvicii.Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537864#comment-17537864
 ] 

Kvicii.Yu commented on KAFKA-13867:
---

[~cmccabe] hi, My understanding is to just modify the method name of 
ibpVersion, rename to ibpVersion?

> Improve JavaDoc for MetadataVersion.java
> 
>
> Key: KAFKA-13867
> URL: https://issues.apache.org/jira/browse/KAFKA-13867
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mjsax commented on pull request #12137: MINOR: Consolidate StreamsException and TaskCorruptedException

2022-05-16 Thread GitBox


mjsax commented on PR #12137:
URL: https://github.com/apache/kafka/pull/12137#issuecomment-1128295416

   > Avoiding the public API change seems quite hacky to me
   
   I leave it up to you -- it's only a small KIP so if you want to do it, just 
go for it -- personally, I don't see much benefit in unifying both exceptions 
(also indicated that this PR doesn't change any code that uses them). But it 
also does not hurt...
   
   Your call.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2022-05-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537860#comment-17537860
 ] 

Matthias J. Sax commented on KAFKA-6520:


It's still an open item – and it's complex (for details, see the KIP discussion 
thread).

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji commented on pull request #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs

2022-05-16 Thread GitBox


hachikuji commented on PR #12162:
URL: https://github.com/apache/kafka/pull/12162#issuecomment-1128281035

   @dengziming By the way, KAFKA-13609 seems to apply to broker configurations. 
Similarly, I do not think we need a KIP since INVALID_CONFIG is already used by 
these APIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13899) Inconsistent error codes returned from AlterConfig APIs

2022-05-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13899.
-
Fix Version/s: 3.2.1
   Resolution: Fixed

> Inconsistent error codes returned from AlterConfig APIs
> ---
>
> Key: KAFKA-13899
> URL: https://issues.apache.org/jira/browse/KAFKA-13899
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.2.1
>
>
> In the AlterConfigs/IncrementalAlterConfigs zk handler, we return 
> INVALID_REQUEST and INVALID_CONFIG inconsistently. The problem is in 
> `LogConfig.validate`. We may either return `ConfigException` or 
> `InvalidConfigException`. When the first of these is thrown, we catch it and 
> convert to INVALID_REQUEST. It seems more consistent to convert to 
> INVALID_CONFIG.
> Note that the kraft implementation returns INVALID_CONFIG consistently.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs

2022-05-16 Thread GitBox


hachikuji merged PR #12162:
URL: https://github.com/apache/kafka/pull/12162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-16 Thread GitBox


cmccabe commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r874258218


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -108,7 +108,8 @@ class KafkaRaftManager[T](
   time: Time,
   metrics: Metrics,
   threadNamePrefixOpt: Option[String],
-  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]]
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]],
+  apiVersions: ApiVersions

Review Comment:
   I don't see why we would want to pass a specific `ApiVersions` object in. 
We're not supplying any information, right? We don't know anything about the 
versions of the nodes until we start contacting them.
   
   So, can we just have an accessor method in `RaftManager` that returns the 
`ApiVersions` of the `RaftManager`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2022-05-16 Thread Karthik Raman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537647#comment-17537647
 ] 

Karthik Raman edited comment on KAFKA-6520 at 5/17/22 12:26 AM:


[~mjsax] / [~Yohan123] / [~VinceMu] : Just wondering if there is any update on 
this issue and the fix to be added in any latest releases? Thank you.


was (Author: JIRAUSER289548):
[~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue 
and the fix to be added in any latest releases? Thank you.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cmccabe commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-16 Thread GitBox


cmccabe commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r874257133


##
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##
@@ -91,19 +95,11 @@ public static NodeApiVersions create(short apiKey, short 
minVersion, short maxVe
 .setMaxVersion(maxVersion)));
 }
 
-public NodeApiVersions(ApiVersionCollection nodeApiVersions) {
-for (ApiVersion nodeApiVersion : nodeApiVersions) {
-if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
-ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
-supportedVersions.put(nodeApiKey, nodeApiVersion);
-} else {
-// Newer brokers may support ApiKeys we don't know about
-unknownApis.add(nodeApiVersion);
-}
-}
+public NodeApiVersions(Collection nodeApiVersions) {

Review Comment:
   Is it necessary to have a special constructor for the case where there are 
no supported features? This won't be a common situation in the future, right? 
This feels like the kind of thing that could easily lead to mistakes, if 
someone accidentally uses it outside of test code.
   
   Looking at my IDE I see 10 uses of this in test code. Can we just change 
them to use the two-argument form?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Moovlin commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…

2022-05-16 Thread GitBox


Moovlin commented on PR #12170:
URL: https://github.com/apache/kafka/pull/12170#issuecomment-1128249740

   There are no code changes in this PR. It's an update to the quickstart.html 
file so I think these tests failures can be ignored. This is only my second PR 
so please let me know if I have to do something different. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12138: MINOR: Followers should not have any remote replica states left over from previous leadership

2022-05-16 Thread GitBox


hachikuji commented on code in PR #12138:
URL: https://github.com/apache/kafka/pull/12138#discussion_r874209109


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -578,7 +579,8 @@ class Partition(val topicPartition: TopicPartition,
   // Updating the assignment and ISR state is safe if the partition epoch 
is
   // larger or equal to the current partition epoch.
   updateAssignmentAndIsr(
-assignment = partitionState.replicas.asScala.map(_.toInt),
+assignment = replicas,
+followers = replicas.filter(_ != localBrokerId),

Review Comment:
   Why not push this computation down to `updateAssignmentAndIsr`?



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -673,7 +675,8 @@ class Partition(val topicPartition: TopicPartition,
 
   updateAssignmentAndIsr(
 assignment = 
partitionState.replicas.asScala.iterator.map(_.toInt).toSeq,
-isr = Set.empty[Int],
+followers = Seq.empty,
+isr = Set.empty,

Review Comment:
   Perhaps a separate issue, but any reason not to update ISR? 



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -578,7 +579,8 @@ class Partition(val topicPartition: TopicPartition,
   // Updating the assignment and ISR state is safe if the partition epoch 
is
   // larger or equal to the current partition epoch.
   updateAssignmentAndIsr(
-assignment = partitionState.replicas.asScala.map(_.toInt),
+assignment = replicas,

Review Comment:
   Would it make sense to change `assignment` to `replicas`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


hachikuji commented on code in PR #12165:
URL: https://github.com/apache/kafka/pull/12165#discussion_r874198037


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   // identify the correct exception, making sure the server was shutdown, 
and cleaning up if anything
   // goes wrong so that awaitShutdown doesn't hang
   case e: Exception =>
-assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected 
exception $e")
-assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else 
BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() 
&& e.isInstanceOf[RuntimeException]) e.getCause.getCause else e),

Review Comment:
   Would a utility like this make sense? 
   ```scala
 private def assertCause(
   expectedClass: Class[_],
   e: Throwable
 ): Unit = {
   var cause = e
   while (cause != null) {
 if (expectedClass.isInstance(cause)) {
   return
 }
 cause = cause.getCause
   }
   fail(s"Failed to assert cause $expectedClass")
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-16 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r874121554


##
core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala:
##
@@ -124,4 +124,8 @@ class ReplicaFetcherBlockingSend(sourceBroker: 
BrokerEndPoint,
   def close(): Unit = {
 networkClient.close()
   }
+
+  override def toString: String = {

Review Comment:
   We needed to add the `toString` for logging purposes in 
`RemoteLeaderEndPoint`, where we print out the endpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-16 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r874120703


##
core/src/main/scala/kafka/server/LeaderEndPoint.scala:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.FetchRequest
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
+
+import scala.collection.Map
+
+/**
+ * The LeaderEndPoint acts as an abstraction which serves all fetches from the 
leader
+ * for the fetcher threads.
+ */
+trait LeaderEndPoint {
+
+  type FetchData = FetchResponseData.PartitionData
+  type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
+
+  /**
+   * A boolean specifying if truncation when fetching from the leader is 
supported
+   */
+  def isTruncationOnFetchSupported: Boolean

Review Comment:
   I agree that it probably shouldn't be a part of the interface. I can 
probably get away without passing it into the constructors though, since it's 
always false for LocalLeaderEndPoint and the value can be found from 
brokerConfig for RemoteLeaderEndPoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-16 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r874118991


##
checkstyle/suppressions.xml:
##
@@ -309,7 +309,7 @@
   files="(RemoteLogManagerConfig).java"/>
 
 
-

[GitHub] [kafka] kowshik commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-16 Thread GitBox


kowshik commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r874090550


##
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Collections, Optional}
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Logging
+import org.apache.kafka.clients.FetchSessionHandler
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic,
 OffsetForLeaderTopicCollection}
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, 
OffsetsForLeaderEpochResponse}
+import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
+
+import scala.jdk.CollectionConverters._
+import scala.collection.{Map, mutable}
+import scala.compat.java8.OptionConverters.RichOptionForJava8
+
+/**
+ * Facilitates fetches from a remote replica leader.
+ *
+ * @param logPrefix The log prefix from the ReplicaFetcherThread
+ * @param endpoint A ReplicaFetcherBlockingSend

Review Comment:
   The doc can be slightly better for this parameter, for example: `The raw 
leader endpoint to be used by this class for communicating with the leader`.



##
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Collections, Optional}
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Logging
+import org.apache.kafka.clients.FetchSessionHandler
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic,
 OffsetForLeaderTopicCollection}
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, 
OffsetsForLeaderEpochResponse}
+import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
+
+import scala.jdk.CollectionConverters._
+import scala.collection.{Map, mutable}
+import scala.compat.java8.OptionConverters.RichOptionForJava8
+
+/**
+ * Facilitates fetches from a remote replica leader.
+ *
+ * @param logPrefix The log prefix from the ReplicaFetcherThread

Review Comment:
   Lets remove `from the ReplicaFetcherThread`



##
core/src/main/scala/kafka/server/LeaderEndPoint.scala:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * 

[GitHub] [kafka] Moovlin opened a new pull request, #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…

2022-05-16 Thread GitBox


Moovlin opened a new pull request, #12170:
URL: https://github.com/apache/kafka/pull/12170

   …gment.bytes
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [X] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

2022-05-16 Thread GitBox


fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986


##
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##
@@ -449,9 +450,9 @@ public void close() {
 @Test
 public void testCloseOldestConnection() throws Exception {
 String id = "0";
-blockingConnect(id);
-
-time.sleep(6000); // The max idle time is 5000ms
+selector.connect(id, new InetSocketAddress("localhost", server.port), 
BUFFER_SIZE, BUFFER_SIZE);
+selector.poll(0);

Review Comment:
   Hi @divijvaidya, thanks. 
   
   I need to fix this and 
`testCloseOldestConnectionWithMultiplePendingReceives`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax opened a new pull request, #12169: MINOR: improve description of `commit.interval.ms` config

2022-05-16 Thread GitBox


mjsax opened a new pull request, #12169:
URL: https://github.com/apache/kafka/pull/12169

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-05-16 Thread GitBox


hachikuji commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r874042264


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1292,8 +1292,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS) {
 reenqueue();
-} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+} else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+log.warn("Retrying upon missing initProducerId due to cluster 
authorization error");
+reenqueue();

Review Comment:
   Hmm, I think we do want the error to get propagated back to the application. 
Most users would expect the application to fail so that they have a chance to 
see the issue and fix it. High level, what I was thinking we could do is fail 
the request by calling `result.fail`, then transition back to `UNINITIALIZED`. 
Then if the user wants to, they can call `initTransactions()` to retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #12168: Minor: Remove extraneous code in LocalLogManager

2022-05-16 Thread GitBox


mumrah opened a new pull request, #12168:
URL: https://github.com/apache/kafka/pull/12168

   Looks like this was some left over test code. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

2022-05-16 Thread GitBox


fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986


##
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##
@@ -449,9 +450,9 @@ public void close() {
 @Test
 public void testCloseOldestConnection() throws Exception {
 String id = "0";
-blockingConnect(id);
-
-time.sleep(6000); // The max idle time is 5000ms
+selector.connect(id, new InetSocketAddress("localhost", server.port), 
BUFFER_SIZE, BUFFER_SIZE);
+selector.poll(0);

Review Comment:
   Hi @divijvaidya, thanks for looking at this. I need to fix this and 
`testCloseOldestConnectionWithMultiplePendingReceives`.



##
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##
@@ -449,9 +450,9 @@ public void close() {
 @Test
 public void testCloseOldestConnection() throws Exception {
 String id = "0";
-blockingConnect(id);
-
-time.sleep(6000); // The max idle time is 5000ms
+selector.connect(id, new InetSocketAddress("localhost", server.port), 
BUFFER_SIZE, BUFFER_SIZE);
+selector.poll(0);

Review Comment:
   Hi @divijvaidya, thanks for looking at this. 
   
   I need to fix this and 
`testCloseOldestConnectionWithMultiplePendingReceives`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

2022-05-16 Thread GitBox


fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986


##
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##
@@ -449,9 +450,9 @@ public void close() {
 @Test
 public void testCloseOldestConnection() throws Exception {
 String id = "0";
-blockingConnect(id);
-
-time.sleep(6000); // The max idle time is 5000ms
+selector.connect(id, new InetSocketAddress("localhost", server.port), 
BUFFER_SIZE, BUFFER_SIZE);
+selector.poll(0);

Review Comment:
   Thanks @divijvaidya for looking at this. I need to fix this and 
`testCloseOldestConnectionWithMultiplePendingReceives`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12158: MINOR:A few code cleanUps in KafkaController

2022-05-16 Thread GitBox


hachikuji commented on code in PR #12158:
URL: https://github.com/apache/kafka/pull/12158#discussion_r873958800


##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -468,14 +468,6 @@ class KafkaController(val config: KafkaConfig,
 
 // shutdown leader rebalance scheduler
 kafkaScheduler.shutdown()
-offlinePartitionCount = 0

Review Comment:
   The way `updateMetrics` is written is a little annoying. While we're at it, 
would it be any clearer to revise `updateMetrics` to the following structure:
   ```scala
   if (isActive) {
 offlinePartitionCount = controllerContext.offlinePartitionCount
 ...
   } else {
 offlinePartitionCount = 0
 ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13716) add tests for `DeleteRecordsCommand` class

2022-05-16 Thread Richard Joerger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537654#comment-17537654
 ] 

Richard Joerger edited comment on KAFKA-13716 at 5/16/22 4:53 PM:
--

Since we havn't heard from Shivanjal, I went ahead and added some test as well 
as opened a pull request. Any feedback would be great. 


was (Author: rjoerger):
Since we hadn't heard from Shivanjal, I went ahead and added some test as well 
as opened a pull request. Any feedback would be great. 

> add tests for `DeleteRecordsCommand` class
> --
>
> Key: KAFKA-13716
> URL: https://issues.apache.org/jira/browse/KAFKA-13716
> Project: Kafka
>  Issue Type: Test
>  Components: tools
>Reporter: Luke Chen
>Assignee: Shivanjal Arora
>Priority: Major
>  Labels: Newbie, newbie
>
> Found there's no tests for `DeleteRecordsCommand` class, which is used in 
> `kafka-delete-records.sh`. We should add it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13716) add tests for `DeleteRecordsCommand` class

2022-05-16 Thread Richard Joerger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537654#comment-17537654
 ] 

Richard Joerger commented on KAFKA-13716:
-

Since we hadn't heard from Shivanjal, I went ahead and added some test as well 
as opened a pull request. Any feedback would be great. 

> add tests for `DeleteRecordsCommand` class
> --
>
> Key: KAFKA-13716
> URL: https://issues.apache.org/jira/browse/KAFKA-13716
> Project: Kafka
>  Issue Type: Test
>  Components: tools
>Reporter: Luke Chen
>Assignee: Shivanjal Arora
>Priority: Major
>  Labels: Newbie, newbie
>
> Found there's no tests for `DeleteRecordsCommand` class, which is used in 
> `kafka-delete-records.sh`. We should add it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2022-05-16 Thread Karthik Raman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537647#comment-17537647
 ] 

Karthik Raman edited comment on KAFKA-6520 at 5/16/22 4:46 PM:
---

[~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue 
and the fix to be added in any latest releases? Thank you.


was (Author: JIRAUSER289548):
[~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue to 
be added in any latest releases? Thank you.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2022-05-16 Thread Karthik Raman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537647#comment-17537647
 ] 

Karthik Raman commented on KAFKA-6520:
--

[~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue to 
be added in any latest releases? Thank you.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12153: MINOR: Clarify impact of num.replica.fetchers

2022-05-16 Thread GitBox


hachikuji merged PR #12153:
URL: https://github.com/apache/kafka/pull/12153


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs

2022-05-16 Thread GitBox


hachikuji commented on PR #12162:
URL: https://github.com/apache/kafka/pull/12162#issuecomment-1127889552

   @dengziming I am thinking not. We already raise INVALID_CONFIG in some cases 
and it was documented explicitly in at least the IncrementalAlterConfig KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Moovlin opened a new pull request, #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…

2022-05-16 Thread GitBox


Moovlin opened a new pull request, #12167:
URL: https://github.com/apache/kafka/pull/12167

   …eleteRecordsCommand.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   Added a new test for the front end of the DeleteRecordsCommand command line 
tool. 
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-16 Thread Manuel Garcia Cabrera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537612#comment-17537612
 ] 

Manuel Garcia Cabrera commented on KAFKA-12495:
---

[~showuon] I'm short on time right now, so I won't be able to tackle this.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Luke Chen
>Priority: Critical
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, 

[GitHub] [kafka] mimaison commented on pull request #10644: [KAFKA-12635] auto sync consumer offset 0 when translated offset larger than partition end offset

2022-05-16 Thread GitBox


mimaison commented on PR #10644:
URL: https://github.com/apache/kafka/pull/10644#issuecomment-1127837181

   Closing, this issue has been fixed in 
https://github.com/apache/kafka/pull/11748


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison closed pull request #10644: [KAFKA-12635] auto sync consumer offset 0 when translated offset larger than partition end offset

2022-05-16 Thread GitBox


mimaison closed pull request #10644: [KAFKA-12635] auto sync consumer offset 0 
when translated offset larger than partition end offset
URL: https://github.com/apache/kafka/pull/10644


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2022-05-16 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-12635.

Fix Version/s: 3.3.0
 Assignee: Mickael Maison  (was: Ning Zhang)
   Resolution: Fixed

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.3.0
>
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mimaison merged pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison merged PR #11748:
URL: https://github.com/apache/kafka/pull/11748


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873878544


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review Comment:
   @dadufour To translate these offsets the checkpoint connector would need to 
access the offset-syncs topic from the other MirrorMaker instance (and reverse 
the mapping). This all seems doable but this will require a KIP. 
   
   @viktorsomogyi @urbandan is this something you'd be interested in doing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] qingwei91 commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

2022-05-16 Thread GitBox


qingwei91 commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r873762061


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -226,9 +226,10 @@ private void emitNonJoinedOuterRecords(
 if (internalProcessorContext.currentSystemTimeMs() < 
sharedTimeTracker.nextTimeToEmit) {
 return;
 }
-if (sharedTimeTracker.nextTimeToEmit == 0) {
-sharedTimeTracker.nextTimeToEmit = 
internalProcessorContext.currentSystemTimeMs();
-}
+
+// Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, 
if we dont set it everytime,
+// they can get out of sync during a clock drift
+sharedTimeTracker.nextTimeToEmit = 
internalProcessorContext.currentSystemTimeMs();

Review Comment:
   Is it ok to have comments here? it wasn't obvious to me what this piece of 
code was doing initially, I thought having comments might help, but I don't 
feel strongly, please let me know if you'd like it removed



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
 runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), 
joinWindows);
 }
 
+@Test
+public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   This test is quite convoluted because it relies on low-level API, this 
appears to be the 1st instance in test (other test relies on higher level API), 
is this acceptable?
   
   I resort to this approach because we need to manipulate TimeTracker which 
isn't available in high level API. And I don't feel comfortable to make larger 
change in the codebase.
   
   Please let me know if you think there's a better way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] qingwei91 opened a new pull request, #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

2022-05-16 Thread GitBox


qingwei91 opened a new pull request, #12166:
URL: https://github.com/apache/kafka/pull/12166

   We should sync nextTimeToEmit with wall clock on each method call to ensure 
throttling works correctly in case of clock drift.
   If we dont, then in the event of significant clock drift, throttling might 
not happen for a long time, this can hurt performance.
   
   I've added a unit test to simulate clock drift and verify my change works.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


showuon commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873739814


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
 waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");

Review Comment:
   Thanks for the clear explanation. I understand now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13906) Invalid replica state transition

2022-05-16 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-13906:

Labels: BUG controller replication reproducible-bug  (was: BUG controller)

> Invalid replica state transition
> 
>
> Key: KAFKA-13906
> URL: https://issues.apache.org/jira/browse/KAFKA-13906
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, replication
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1
>Reporter: Igor Soarez
>Priority: Major
>  Labels: BUG, controller, replication, reproducible-bug
>
> The controller runs into an IllegalStateException when reacting to changes in 
> broker membership status if there are topics that are pending deletion.
>  
> How to reproduce:
>  # Setup cluster with 3 brokers
>  # Create a topic with a partition being led by each broker and produce some 
> data
>  # Kill one of the brokers that is not the controller, and keep that broker 
> down
>  # Delete the topic
>  # Restart the other broker that is not the controller
>  
> Logs and stacktrace:
> {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed 
> (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful 
> state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
> {{        at 
> scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}}
> {{        at 
> kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}}
> {{        at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}}
> {{        at 
> kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}}
> {{        at 
> kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
> {{        at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}}
> {{        at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}}
> {{--}}
> {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states 
> before moving to OnlineReplica state. Instead it is in 
> ReplicaDeletionSuccessful state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> 

[jira] [Updated] (KAFKA-13906) Invalid replica state transition

2022-05-16 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-13906:

Labels: BUG controller  (was: reproducible-bug)

> Invalid replica state transition
> 
>
> Key: KAFKA-13906
> URL: https://issues.apache.org/jira/browse/KAFKA-13906
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, replication
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1
>Reporter: Igor Soarez
>Priority: Major
>  Labels: BUG, controller
>
> The controller runs into an IllegalStateException when reacting to changes in 
> broker membership status if there are topics that are pending deletion.
>  
> How to reproduce:
>  # Setup cluster with 3 brokers
>  # Create a topic with a partition being led by each broker and produce some 
> data
>  # Kill one of the brokers that is not the controller, and keep that broker 
> down
>  # Delete the topic
>  # Restart the other broker that is not the controller
>  
> Logs and stacktrace:
> {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed 
> (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful 
> state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
> {{        at 
> scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}}
> {{        at 
> kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}}
> {{        at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}}
> {{        at 
> kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}}
> {{        at 
> kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
> {{        at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}}
> {{        at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}}
> {{--}}
> {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states 
> before moving to OnlineReplica state. Instead it is in 
> ReplicaDeletionSuccessful state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
> {{        at 
> 

[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2022-05-16 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-8366:
---
Component/s: controller
 core
 metrics

> partitions of topics being deleted show up in the offline partitions metric
> ---
>
> Key: KAFKA-8366
> URL: https://issues.apache.org/jira/browse/KAFKA-8366
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, metrics
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1
>Reporter: Radai Rosenblatt
>Priority: Major
>  Labels: BUG, controller, metrics, reproducible-bug
>
> i believe this is a bug
> offline partitions is a metric that indicates an error condition - lack of 
> kafka availability.
> as an artifact of how deletion is implemented the partitions for a topic 
> undergoing deletion will show up as offline, which just creates 
> false-positive alerts.
> if needed, maybe there should exist a separate "partitions to be deleted" 
> sensor.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2022-05-16 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-8366:
---
Issue Type: Bug  (was: Improvement)

> partitions of topics being deleted show up in the offline partitions metric
> ---
>
> Key: KAFKA-8366
> URL: https://issues.apache.org/jira/browse/KAFKA-8366
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1
>Reporter: Radai Rosenblatt
>Priority: Major
>  Labels: BUG, controller, metrics
>
> i believe this is a bug
> offline partitions is a metric that indicates an error condition - lack of 
> kafka availability.
> as an artifact of how deletion is implemented the partitions for a topic 
> undergoing deletion will show up as offline, which just creates 
> false-positive alerts.
> if needed, maybe there should exist a separate "partitions to be deleted" 
> sensor.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2022-05-16 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-8366:
---
Labels: BUG controller metrics reproducible-bug  (was: BUG controller 
metrics)

> partitions of topics being deleted show up in the offline partitions metric
> ---
>
> Key: KAFKA-8366
> URL: https://issues.apache.org/jira/browse/KAFKA-8366
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1
>Reporter: Radai Rosenblatt
>Priority: Major
>  Labels: BUG, controller, metrics, reproducible-bug
>
> i believe this is a bug
> offline partitions is a metric that indicates an error condition - lack of 
> kafka availability.
> as an artifact of how deletion is implemented the partitions for a topic 
> undergoing deletion will show up as offline, which just creates 
> false-positive alerts.
> if needed, maybe there should exist a separate "partitions to be deleted" 
> sensor.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13906) Invalid replica state transition

2022-05-16 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-13906:
---

 Summary: Invalid replica state transition
 Key: KAFKA-13906
 URL: https://issues.apache.org/jira/browse/KAFKA-13906
 Project: Kafka
  Issue Type: Bug
  Components: controller, core, replication
Affects Versions: 3.1.1, 3.0.1, 3.0.0, 3.1.0, 3.2.0, 3.3.0, 3.0.2, 3.1.2, 
3.2.1
Reporter: Igor Soarez


The controller runs into an IllegalStateException when reacting to changes in 
broker membership status if there are topics that are pending deletion.

 

How to reproduce:
 # Setup cluster with 3 brokers
 # Create a topic with a partition being led by each broker and produce some 
data
 # Kill one of the brokers that is not the controller, and keep that broker down
 # Delete the topic
 # Restart the other broker that is not the controller

 

Logs and stacktrace:

{{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 epoch 
1 initiated state change of replica 3 for partition test-topic-2 from 
ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed 
(state.change.logger)}}
{{java.lang.IllegalStateException: Replica 
[Topic=test-topic,Partition=2,Replica=3] should be in the 
OfflineReplica,ReplicaDeletionStarted states before moving to 
ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful 
state}}
{{        at 
kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
{{        at scala.collection.immutable.List.foreach(List.scala:333)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
{{        at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
{{        at 
scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
{{        at 
kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}}
{{        at 
kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}}
{{        at 
kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}}
{{        at 
kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}}
{{        at 
kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
{{        at 
kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}}
{{        at 
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}}
{{--}}
{{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 epoch 
1 initiated state change of replica 3 for partition test-topic-2 from 
ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}}
{{java.lang.IllegalStateException: Replica 
[Topic=test-topic,Partition=2,Replica=3] should be in the 
NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states before 
moving to OnlineReplica state. Instead it is in ReplicaDeletionSuccessful 
state}}
{{        at 
kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
{{        at scala.collection.immutable.List.foreach(List.scala:333)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
{{        at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
{{        at 
scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
{{        at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
{{        at 
kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:543)}}
{{        at 
kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1607)}}
{{        at 
kafka.controller.KafkaController.process(KafkaController.scala:2534)}}

[GitHub] [kafka] mimaison commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2022-05-16 Thread GitBox


mimaison commented on PR #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-1127619143

   Thanks, that makes sense. I've started reviewing #11780 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison commented on PR #11748:
URL: https://github.com/apache/kafka/pull/11748#issuecomment-1127616971

   Thanks for the review @showuon. I've replied to your questions and pushed an 
update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873670168


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
 assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
 }
 
+@Test
+public void testNoCheckpointsIfNoRecordsAreMirrored() throws 
InterruptedException {
+String consumerGroupName = "consumer-group-no-checkpoints";
+Map consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
+
+// ensure there are some records in the topic on the source cluster
+produceMessages(primary, "test-topic-1");
+
+// warm up consumers before starting the connectors, so we don't need 
to wait for discovery
+warmUpConsumer(consumerProps);
+
+// one way replication from primary to backup
+mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+mm2Config = new MirrorMakerConfig(mm2Props);
+waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+// make sure the topics  are created in the backup cluster
+waitForTopicCreated(backup, remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS));
+waitForTopicCreated(backup, remoteTopicName("test-topic-no-records", 
PRIMARY_CLUSTER_ALIAS));
+
+// commit some offsets for both topics in the source cluster
+TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+//Map consumerProps  = 
Collections.singletonMap("group.id", consumerGroupName);
+try (Consumer consumer = 
primary.kafka().createConsumer(consumerProps)) {
+Collection tps = Arrays.asList(tp1, tp2);
+Map endOffsets = consumer.endOffsets(tps);
+Map offsetsToCommit = 
endOffsets.entrySet().stream()
+.collect(Collectors.toMap(
+Map.Entry::getKey,
+e -> new OffsetAndMetadata(e.getValue())
+));
+consumer.commitSync(offsetsToCommit);
+}
+
+// Only test-topic-1 should have translated offsets because we've not 
yet mirrored any records for topic-no-records
+MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+waitForCondition(() -> {
+Map translatedOffsets = 
backupClient.remoteConsumerOffsets(
+consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
+return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
+   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+}, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to 
backup cluster");
+
+// Send some records to topic-no-records in the source cluster

Review Comment:
   I've renamed to `test-topic-no-checkpoints` so it match the test/group names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

2022-05-16 Thread GitBox


divijvaidya commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r873646603


##
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##
@@ -449,9 +450,9 @@ public void close() {
 @Test
 public void testCloseOldestConnection() throws Exception {
 String id = "0";
-blockingConnect(id);
-
-time.sleep(6000); // The max idle time is 5000ms
+selector.connect(id, new InetSocketAddress("localhost", server.port), 
BUFFER_SIZE, BUFFER_SIZE);
+selector.poll(0);

Review Comment:
   Note that `connect()` is synchronous method here. It might not complete with 
the first poll event. That is the reason it is tested in a while loop inside 
`blockingConnect`.
   
   My suggestion would be to keep using blockingConnect with a (new) timeout to 
terminate while loop in it similar to implementation of 
`NetworkTestUtils.waitForChannelReady`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-16 Thread GitBox


franz1981 commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r873652472


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -30,6 +33,18 @@
  */
 public final class Checksums {
 
+private static final MethodHandle BYTE_BUFFER_UPDATE;
+
+static {
+MethodHandle byteBufferUpdate = null;
+try {
+byteBufferUpdate = 
MethodHandles.publicLookup().findVirtual(Checksum.class, "update", 
MethodType.methodType(void.class, ByteBuffer.class));
+} catch (Throwable silent) {

Review Comment:
   I've added a LOG::warn in case the given signature is expected to be found 
(although it doesn't consider the security manager enabling option - but I 
don't believe is a common Kafka practice suggested to users).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2022-05-16 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-8366:
---
Affects Version/s: 3.1.1
   3.0.1
   3.0.0
   3.1.0
   3.2.0
   3.3.0
   3.0.2
   3.2.1

> partitions of topics being deleted show up in the offline partitions metric
> ---
>
> Key: KAFKA-8366
> URL: https://issues.apache.org/jira/browse/KAFKA-8366
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1
>Reporter: Radai Rosenblatt
>Priority: Major
>
> i believe this is a bug
> offline partitions is a metric that indicates an error condition - lack of 
> kafka availability.
> as an artifact of how deletion is implemented the partitions for a topic 
> undergoing deletion will show up as offline, which just creates 
> false-positive alerts.
> if needed, maybe there should exist a separate "partitions to be deleted" 
> sensor.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2022-05-16 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537501#comment-17537501
 ] 

Igor Soarez commented on KAFKA-8366:


I also ran into this issue. I managed to replicate this bug with an integration 
test

[https://github.com/soarez/kafka/blob/replicate-bug-offline-partition-metrics-from-deleted-topics/core/src/test/scala/integration/kafka/api/OfflinePartitionsFromDeletedTopicTest.scala]

The problem is that the controller caches the offline partitions count, and 
when it is re-elected it fails to clear it if the topic is now being deleted.

 

> partitions of topics being deleted show up in the offline partitions metric
> ---
>
> Key: KAFKA-8366
> URL: https://issues.apache.org/jira/browse/KAFKA-8366
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Radai Rosenblatt
>Priority: Major
>
> i believe this is a bug
> offline partitions is a metric that indicates an error condition - lack of 
> kafka availability.
> as an artifact of how deletion is implemented the partitions for a topic 
> undergoing deletion will show up as offline, which just creates 
> false-positive alerts.
> if needed, maybe there should exist a separate "partitions to be deleted" 
> sensor.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-16 Thread GitBox


franz1981 commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r873640450


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {

Review Comment:
   I can add some doc, although in theory it's unrelated to the changes of this 
PR; but is a nice addition :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-16 Thread GitBox


franz1981 commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r873638337


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {
 if (buffer.hasArray()) {
 checksum.update(buffer.array(), buffer.position() + 
buffer.arrayOffset() + offset, length);
-} else {
-int start = buffer.position() + offset;
-for (int i = start; i < start + length; i++)
-checksum.update(buffer.get(i));
+return;
+}
+if (BYTE_BUFFER_UPDATE != null && buffer.isDirect()) {
+final int oldPosition = buffer.position();

Review Comment:
   We are moving `limit` as well, hence I don't see any advantage re using 
`mark/reset`.
   Let me know If I've missed something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-16 Thread GitBox


divijvaidya commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r873603513


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {
 if (buffer.hasArray()) {
 checksum.update(buffer.array(), buffer.position() + 
buffer.arrayOffset() + offset, length);
-} else {
-int start = buffer.position() + offset;
-for (int i = start; i < start + length; i++)
-checksum.update(buffer.get(i));
+return;
+}
+if (BYTE_BUFFER_UPDATE != null && buffer.isDirect()) {
+final int oldPosition = buffer.position();

Review Comment:
   alternatively, use buffer.mark() inside try and buffer.reset() in the 
finally.



##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -30,6 +33,18 @@
  */
 public final class Checksums {
 
+private static final MethodHandle BYTE_BUFFER_UPDATE;
+
+static {
+MethodHandle byteBufferUpdate = null;
+try {
+byteBufferUpdate = 
MethodHandles.publicLookup().findVirtual(Checksum.class, "update", 
MethodType.methodType(void.class, ByteBuffer.class));
+} catch (Throwable silent) {

Review Comment:
   I understand that you are keeping it here for backward compatibility with 
older versions (8) of JDK which don't have this API. But we should ideally want 
to throw an error when we remove support for JDK 8 from Kafka.
   
   Could we do one of the following two options here:
   
   1. (preferred) Use 
[isJava9Compatible](https://docs.gradle.org/current/javadoc/org/gradle/api/JavaVersion.html#isJava9Compatible--)
 to decide whether we want to propagate the exception here or fail silently OR
   2. add a TODO here so that we can remember to propagate exception when 
support for JDK 8 is removed? 
   
   Also, a comment explaining why are we consuming the exception would be nice.



##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {

Review Comment:
   A java doc here which clarifies that this method leaves the buffer as it was 
received would be nice addition here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12164: Update upgrade.html

2022-05-16 Thread GitBox


cadonna merged PR #12164:
URL: https://github.com/apache/kafka/pull/12164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873606546


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
 waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");

Review Comment:
   For this types of changes, a diagram often helps.
   
   We see that `mm2-offset-syncs.primary.internal` only contains mapping from 
`test-topic-1` on the backup cluster to `backup.test-topic-1` on the primary 
cluster. Then when backup->primary checkpoint runs it's only able to compute 
committed offsets for `backup.test-topic-1`, it has no information about 
offsets for the local `test-topic-1` in the primary cluster.
   https://user-images.githubusercontent.com/903615/168580961-8f3e5860-cae3-43bf-9d36-ecd7553e42eb.png;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873586510


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
 waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");
-
 Map primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
 Duration.ofMillis(CHECKPOINT_DURATION_MS));
  
 primaryClient.close();
 backupClient.close();
 
 // Failback consumer group to primary cluster
-try (Consumer backupConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
-backupConsumer.assign(primaryOffsets.keySet());
-primaryOffsets.forEach(backupConsumer::seek);
-backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-backupConsumer.commitAsync();
-
-assertTrue(backupConsumer.position(new 
TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream 
offset.");
-assertTrue(backupConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");
-assertTrue(backupConsumer.position(
-new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset.");
-assertTrue(backupConsumer.position(
+try (Consumer primaryConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
+primaryConsumer.assign(primaryOffsets.keySet());
+primaryOffsets.forEach(primaryConsumer::seek);
+primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+primaryConsumer.commitAsync();
+
+assertTrue(primaryConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");

Review Comment:
   Same as above, now `remoteConsumerOffsets()` only returns offsets for remote 
topics that are being mirrored from the backup cluster and these are prefixed 
with `backup.`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873584393


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
 waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");

Review Comment:
   This was querying checkpoints from the `backup.checkpoints.internal` topic. 
This topic is on the primary cluster and contains data emitted by the 
backup->primary checkpoint connector. 
   
   In the backup->primary mirroring flow, the remote topics (that end up in the 
primary cluster) are all prefixed with `backup.`. With this change we now only 
emit checkpoints for topics present in offset-syncs, so only topics prefixed 
with `backup.`.
   
   So the assertion above still works, the checkpoint connector has emitted 
checkpoints for `backup.test-topic-1`. But this assertion is not true anymore 
for `test-topic-1`. This makes sense because in the primary cluster, 
`test-topic-1` is a local topic, so the backup->primary checkpoint connector 
has no business committing offsets for that topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-05-16 Thread GitBox


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873584393


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
 waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");

Review Comment:
   This was querying checkpoints from the `backup.checkpoints.internal` topic. 
This topic is on the primary cluster and contains data emitted by the 
backup->primary checkpoint connector. 
   
   In the backup->primary mirroring flow, the remote topics (that end up in the 
`primary` cluster) are all prefixed with `backup.`. With this change we now 
only emit checkpoints for topics present in offset-syncs, so only topics 
prefixed with `backup.`.
   
   So the assertion above still works, the checkpoint connector has emitted 
checkpoints for `backup.test-topic-1`. But this assertion is not true anymore 
for `test-topic-1`. This makes sense because in the primary cluster, 
`test-topic-1` is a local topic, so the backup->primary checkpoint connector 
has no business committing offsets for that topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


dengziming commented on code in PR #12165:
URL: https://github.com/apache/kafka/pull/12165#discussion_r873557577


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   // identify the correct exception, making sure the server was shutdown, 
and cleaning up if anything
   // goes wrong so that awaitShutdown doesn't hang
   case e: Exception =>
-assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected 
exception $e")
-assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else 
BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() 
&& e.isInstanceOf[RuntimeException]) e.getCause.getCause else e),

Review Comment:
   Basically, we can change it to something like :
   ```
   throw
   if (e.isInstanceOf[ExecutionException]) 
   e.getCause
else if (e.isInstanceOf[RuntimeException])
   e.getCause.getCause
   ```
   this would make the logic a little messy, and I'm not sure should we should 
keep the original `RuntimeException` created by @cmccabe.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


showuon commented on code in PR #12165:
URL: https://github.com/apache/kafka/pull/12165#discussion_r873525802


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   // identify the correct exception, making sure the server was shutdown, 
and cleaning up if anything
   // goes wrong so that awaitShutdown doesn't hang
   case e: Exception =>
-assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected 
exception $e")
-assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else 
BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() 
&& e.isInstanceOf[RuntimeException]) e.getCause.getCause else e),

Review Comment:
   Thanks for the explanation. I understand now. So, do you think we should fix 
the failed tests by fixing this:
   
   
https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444
   
   I mean, the above code is trying to throw the root cause reason the broker 
failed up to here:
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/Kafka.scala#L113
   
   And then, we can log the root cause before exit. So, I think we should fix 
the line `throw if (e.isInstanceOf[ExecutionException]) e.getCause else e ` to 
allow it to throw the root cause for both `ExecutionException` and 
`RuntimeException`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


dengziming commented on code in PR #12165:
URL: https://github.com/apache/kafka/pull/12165#discussion_r873512570


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   // identify the correct exception, making sure the server was shutdown, 
and cleaning up if anything
   // goes wrong so that awaitShutdown doesn't hang
   case e: Exception =>
-assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected 
exception $e")
-assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else 
BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() 
&& e.isInstanceOf[RuntimeException]) e.getCause.getCause else e),

Review Comment:
   Yeah, in the finally block, we will unwrap `ExecutionExecution` so we don't 
have to invoke `getCause`, but after #11969 the exception type is 
`RuntimeException` so we have to invoke getCause once more:
   
https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444
   
   I find this is code is no longer useful after #11969, do you think we can 
remove it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


dengziming commented on code in PR #12165:
URL: https://github.com/apache/kafka/pull/12165#discussion_r873512570


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   // identify the correct exception, making sure the server was shutdown, 
and cleaning up if anything
   // goes wrong so that awaitShutdown doesn't hang
   case e: Exception =>
-assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected 
exception $e")
-assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else 
BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() 
&& e.isInstanceOf[RuntimeException]) e.getCause.getCause else e),

Review Comment:
   Yeah, in the finally block, we will unwrap `ExecutionExecution` so we don't 
have to invoke `getCause`:
   
https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444
   
   I find this is code is no longer useful after #11969, do you think we can 
remove it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


dengziming commented on code in PR #12165:
URL: https://github.com/apache/kafka/pull/12165#discussion_r873512570


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   // identify the correct exception, making sure the server was shutdown, 
and cleaning up if anything
   // goes wrong so that awaitShutdown doesn't hang
   case e: Exception =>
-assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected 
exception $e")
-assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else 
BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() 
&& e.isInstanceOf[RuntimeException]) e.getCause.getCause else e),

Review Comment:
   Yeah, in the finally block, we will unwrap `ExecutionExecution` :
   
https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444
   
   I find this is code is no longer useful after #11969, do you think we can 
remove it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13905) Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13905:
-

 Summary: Fix failing 
ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
 Key: KAFKA-13905
 URL: https://issues.apache.org/jira/browse/KAFKA-13905
 Project: Kafka
  Issue Type: Test
Reporter: Luke Chen
Assignee: dengziming


h3. Error Message

org.opentest4j.AssertionFailedError: Unexpected exception 
java.lang.RuntimeException: Received a fatal error while waiting for the broker 
to catch up with the current cluster metadata. ==> expected:  but was: 

h3. Stacktrace

org.opentest4j.AssertionFailedError: Unexpected exception 
java.lang.RuntimeException: Received a fatal error while waiting for the broker 
to catch up with the current cluster metadata. ==> expected:  but was: 
 at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
kafka.server.ServerShutdownTest.verifyCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:198)
 at 
kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs(ServerShutdownTest.scala:168)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


showuon commented on PR #12165:
URL: https://github.com/apache/kafka/pull/12165#issuecomment-1127413805

   Also, I've created 
[KAFKA-13905](https://issues.apache.org/jira/browse/KAFKA-13905) for this 
issue. Please link the PR to that issue, in case there are also other people 
try to fix it at the same time. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


showuon commented on code in PR #12165:
URL: https://github.com/apache/kafka/pull/12165#discussion_r873488369


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   // identify the correct exception, making sure the server was shutdown, 
and cleaning up if anything
   // goes wrong so that awaitShutdown doesn't hang
   case e: Exception =>
-assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected 
exception $e")
-assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else 
BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() 
&& e.isInstanceOf[RuntimeException]) e.getCause.getCause else e),

Review Comment:
   I'm a little confused here. You said, we just wrap a `RuntimeException` for 
the original exception. But here, we `getCause` twice, compared with before, it 
didn't `getCause`. Am I missing something? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


dengziming commented on PR #12165:
URL: https://github.com/apache/kafka/pull/12165#issuecomment-1127393942

   Hello @cmccabe, PTAL at this solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming opened a new pull request, #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs

2022-05-16 Thread GitBox


dengziming opened a new pull request, #12165:
URL: https://github.com/apache/kafka/pull/12165

   *More detailed description of your change*
   Before #11969, We will throw an `ExecutionException(KafkaStorageException)` 
in `BrokerServer.startup`, and the outside ExecutionException is removed in 
finally block.
   After #11969, We will throw a 
`RuntimeException(ExecutionException(KafkaStorageException))`, so this test is 
constantly failing since the Exception type is not consistent.
   
   To fix this, we just need to invoke `getCause` twice to remove 
RuntimeException and ExecutionException.
   
   *Summary of testing strategy (including rationale)*
   This test is no longer failing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API

2022-05-16 Thread GitBox


showuon merged PR #12087:
URL: https://github.com/apache/kafka/pull/12087


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API

2022-05-16 Thread GitBox


showuon commented on PR #12087:
URL: https://github.com/apache/kafka/pull/12087#issuecomment-1127340235

   All tests passed. Merge into trunk. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS

2022-05-16 Thread GitBox


showuon commented on code in PR #12140:
URL: https://github.com/apache/kafka/pull/12140#discussion_r873420698


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##
@@ -488,6 +489,54 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
 ensureActiveGroup(rejoinedGeneration, memberId);
 }
 
+@Test
+public void 
testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws 
InterruptedException, ExecutionException {
+setupCoordinator();
+
+String memberId = "memberId";
+int generation = 5;
+
+// Rebalance once to initialize the generation and memberId
+mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+expectJoinGroup("", generation, memberId);
+expectSyncGroup(generation, memberId);
+ensureActiveGroup(generation, memberId);
+
+// Force a rebalance
+coordinator.requestRejoin("Manual test trigger");
+assertTrue(coordinator.rejoinNeededOrPending());
+
+ExecutorService executor = Executors.newFixedThreadPool(1);
+try {
+// Return RebalanceInProgress in syncGroup
+int rejoinedGeneration = 10;
+expectJoinGroup(memberId, rejoinedGeneration, memberId);
+expectRebalanceInProgressForSyncGroup(rejoinedGeneration, 
memberId);
+Future secondJoin = executor.submit(() ->
+
coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE)));
+
+TestUtils.waitForCondition(() -> {
+AbstractCoordinator.Generation currentGeneration = 
coordinator.generation();
+return currentGeneration.generationId == 
AbstractCoordinator.Generation.NO_GENERATION.generationId &&
+currentGeneration.memberId.equals(memberId);
+}, 2000, "Generation should be reset");

Review Comment:
   nit: I saw the `2000` timeout appeared in `AbstractCoordinatorTest.java` in 
many places. Could we use a static variable to replace them? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12703) Allow unencrypted private keys when using PEM files

2022-05-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-12703.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

> Allow unencrypted private keys when using PEM files
> ---
>
> Key: KAFKA-12703
> URL: https://issues.apache.org/jira/browse/KAFKA-12703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: Brian Bascoy
>Priority: Major
> Fix For: 3.3.0
>
>
> Unencrypted PEM files seem to be internally [supported in the 
> codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509]
>  but setting an ssl.key.password is currently enforced by createKeystore (on 
> DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder 
> if this limitation could simply be removed:
>  
> [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b]
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-12703) Allow unencrypted private keys when using PEM files

2022-05-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-12703:

Summary: Allow unencrypted private keys when using PEM files  (was: 
Unencrypted PEM files can't be loaded)

> Allow unencrypted private keys when using PEM files
> ---
>
> Key: KAFKA-12703
> URL: https://issues.apache.org/jira/browse/KAFKA-12703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: Brian Bascoy
>Priority: Major
>
> Unencrypted PEM files seem to be internally [supported in the 
> codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509]
>  but setting an ssl.key.password is currently enforced by createKeystore (on 
> DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder 
> if this limitation could simply be removed:
>  
> [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b]
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dajac merged pull request #11916: KAFKA-12703; Allow unencrypted private keys when using PEM files

2022-05-16 Thread GitBox


dajac merged PR #11916:
URL: https://github.com/apache/kafka/pull/11916


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-7955) Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster

2022-05-16 Thread Richard Fussenegger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537373#comment-17537373
 ] 

Richard Fussenegger edited comment on KAFKA-7955 at 5/16/22 7:15 AM:
-

https://issues.apache.org/jira/browse/KAFKA-9060 can be considered a duplicate 
of this one, because one BOM for the entire {{org.apache.kafka}} group would be 
sufficient to cover everything.

Adding a BOM would require substantial rewrite of the Gradle build, because it 
currently configures all projects to be Java libraries. Using the Java platform 
plugin thus breaks various tasks that are expected to exist as well as 
publishing.


was (Author: fleshgrinder):
https://issues.apache.org/jira/browse/KAFKA-9060 can be considered a duplicate 
of this one, because one BOM for the entire {{org.apache.kafka}} group would be 
sufficient to cover everything.

> Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster
> 
>
> Key: KAFKA-7955
> URL: https://issues.apache.org/jira/browse/KAFKA-7955
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Jeremy Custenborder
>Priority: Major
>
> Using EmbeddedConnectCluster for testing connectors is a little difficult 
> given the number of dependencies that are required. Providing a 
> [BOM|https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html]
>  will make it easier for connector developers. For example here are the 
> dependencies that are required. 
> {code:xml}
> 
> 
> org.apache.kafka
> connect-api
> ${kafka.version}
> 
> 
> org.apache.kafka
> connect-runtime
> ${kafka.version}
> test
> test-jar
> 
> 
> org.apache.kafka
> connect-runtime
> ${kafka.version}
> 
> 
> org.apache.kafka
> kafka-clients
> ${kafka.version}
> 
> 
> junit
> junit
> 4.12
> 
> 
> org.apache.kafka
> kafka-clients
> ${kafka.version}
> test
> test-jar
> 
> 
> org.apache.kafka
> kafka_2.11
> ${kafka.version}
> 
> 
> org.apache.kafka
> kafka_2.11
> test-jar
> test
> ${kafka.version}
> 
> 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-9060) Publish BOMs for Kafka

2022-05-16 Thread Richard Fussenegger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537374#comment-17537374
 ] 

Richard Fussenegger commented on KAFKA-9060:


In https://issues.apache.org/jira/browse/KAFKA-7955 another user was already 
asking for a BOM. This issue is more detailed, but both are asking for the same.

> Publish BOMs for Kafka
> --
>
> Key: KAFKA-9060
> URL: https://issues.apache.org/jira/browse/KAFKA-9060
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Michael Holler
>Priority: Trivial
>
> Hey there! Love the project, but I would love it if there was a BOM file that 
> is published for each version. If you're not familiar with a BOM, it stands 
> for "Bill of Materials" it helps your Gradle (in my case, but it's originally 
> a Maven thing) file look like this (using JDBI's implementation as an 
> example):
> {code}
> dependencies {
> implementation(platform("org.jdbi:jdbi3-bom:3.10.1"))
> implementation("org.jdbi:jdbi3-core")
> implementation("org.jdbi:jdbi3-kotlin")
> implementation("org.jdbi:jdbi3-kotlin-sqlobject")
> implementation("org.jdbi:jdbi3-jackson2")
> }
> {code}
> Instead of this:
> {code}
> val jdbiVersion by extra { "2.6.1" }
>  
> dependencies {
> implementation("org.jdbi:jdbi3-core:$jdbiVersion")
> implementation("org.jdbi:jdbi3-kotlin:$jdbiVersion")
> implementation("org.jdbi:jdbi3-kotlin-sqlobject:$jdbiVersion")
> implementation("org.jdbi:jdbi3-jackson2:$jdbiVersion")
> }
> {code}
> Notice how you just leave the versions off when you use a BOM. This can help 
> reduce the number of dependency compatibility surprises one can encounter, 
> especially if a transitive dependency brings in a newer version of one of the 
> components (it'll be reduced to the BOM's version). Note also that you still 
> have to list dependencies you want with a BOM, just not the versions.
> Here's a deeper dive into how a BOM works:
> https://howtodoinjava.com/maven/maven-bom-bill-of-materials-dependency/
>  The Maven help site also has a section on it (Ctrl+F for "BOM"):
> https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html
> I think BOMs would be a great for the users of the Kafka project because 
> there are lots of Kafka libraries (streams, connect-api, connect-json, etc) 
> that require the same version as other Kafka dependencies to work correctly. 
> BOMs were designed for exactly this use case. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-7955) Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster

2022-05-16 Thread Richard Fussenegger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537373#comment-17537373
 ] 

Richard Fussenegger commented on KAFKA-7955:


https://issues.apache.org/jira/browse/KAFKA-9060 can be considered a duplicate 
of this one, because one BOM for the entire {{org.apache.kafka}} group would be 
sufficient to cover everything.

> Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster
> 
>
> Key: KAFKA-7955
> URL: https://issues.apache.org/jira/browse/KAFKA-7955
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Jeremy Custenborder
>Priority: Major
>
> Using EmbeddedConnectCluster for testing connectors is a little difficult 
> given the number of dependencies that are required. Providing a 
> [BOM|https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html]
>  will make it easier for connector developers. For example here are the 
> dependencies that are required. 
> {code:xml}
> 
> 
> org.apache.kafka
> connect-api
> ${kafka.version}
> 
> 
> org.apache.kafka
> connect-runtime
> ${kafka.version}
> test
> test-jar
> 
> 
> org.apache.kafka
> connect-runtime
> ${kafka.version}
> 
> 
> org.apache.kafka
> kafka-clients
> ${kafka.version}
> 
> 
> junit
> junit
> 4.12
> 
> 
> org.apache.kafka
> kafka-clients
> ${kafka.version}
> test
> test-jar
> 
> 
> org.apache.kafka
> kafka_2.11
> ${kafka.version}
> 
> 
> org.apache.kafka
> kafka_2.11
> test-jar
> test
> ${kafka.version}
> 
> 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)