[GitHub] [kafka] dengziming commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…
dengziming commented on PR #12170: URL: https://github.com/apache/kafka/pull/12170#issuecomment-1128442992 It's OK to add topicId in the docs, the segment.bytes property is not intended to be printed, this seems to be a small bug: KAFKA-13718 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode
dengziming commented on PR #12109: URL: https://github.com/apache/kafka/pull/12109#issuecomment-1128357555 Resolve conflicts, and I also changed `ZkAdminManager.createTopics` to throw `ConfigException` on null config values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-13907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13907: -- Labels: newbie (was: ) > Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable > -- > > Key: KAFKA-13907 > URL: https://issues.apache.org/jira/browse/KAFKA-13907 > Project: Kafka > Issue Type: Bug >Reporter: dengziming >Priority: Major > Labels: newbie > > ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang > up waiting for controlled shutdown, there may be some bug related to it. > since this bug can be reproduced locally, it won't be hard to investigated. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dengziming commented on pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
dengziming commented on PR #12165: URL: https://github.com/apache/kafka/pull/12165#issuecomment-1128348174 Thank you @showuon @hachikuji, I changed the assertion and disabled `testCleanShutdownWithKRaftControllerUnavailable ` and created KAFKA-13907 for this bug. For `testCleanShutdownAfterFailedStartup`, currently it only fails occasionally, so didn't disable it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
dengziming created KAFKA-13907: -- Summary: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable Key: KAFKA-13907 URL: https://issues.apache.org/jira/browse/KAFKA-13907 Project: Kafka Issue Type: Bug Reporter: dengziming ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang up waiting for controlled shutdown, there may be some bug related to it. since this bug can be reproduced locally, it won't be hard to investigated. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537882#comment-17537882 ] Luke Chen commented on KAFKA-12495: --- No problem, [~mcabrera] ! > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Luke Chen >Priority: Critical > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2, BT4] > W4 joins with assignment: [BT4, BT5] > W1 becomes leader > W1
[GitHub] [kafka] bozhao12 commented on a diff in pull request #12158: MINOR:A few code cleanUps in KafkaController
bozhao12 commented on code in PR #12158: URL: https://github.com/apache/kafka/pull/12158#discussion_r874295581 ## core/src/main/scala/kafka/controller/KafkaController.scala: ## @@ -468,14 +468,6 @@ class KafkaController(val config: KafkaConfig, // shutdown leader rebalance scheduler kafkaScheduler.shutdown() -offlinePartitionCount = 0 Review Comment: @hachikuji Thanks for your suggestion, I updated the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
showuon commented on PR #12165: URL: https://github.com/apache/kafka/pull/12165#issuecomment-1128325494 @dengziming , I'd suggest we follow @hachikuji 's advice to add an `assertCause` to fix the failed tests. I can create another JIRA to address our thoughts in this comment: https://github.com/apache/kafka/pull/12165#discussion_r873557577 Also, as Jason mentioned, there are still 2 failed tests: ``` Build / JDK 8 and Scala 2.12 / kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.server.ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable(String).quorum=kraft ``` If you don't have time, you can just disable them first, and create jira ticket for them. Let me know if you need help. I hope we can fix them in our timezone today. The failed tests make my PR build result very bad (and I don't like that, haha) Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12171: MINOR: Convert admin integration tests
hachikuji opened a new pull request, #12171: URL: https://github.com/apache/kafka/pull/12171 This patch enables KRaft support in `PlaintextAdminIntegrationTest`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java
[ https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537864#comment-17537864 ] Kvicii.Yu edited comment on KAFKA-13867 at 5/17/22 1:24 AM: [~cmccabe] hi, My understanding is to just modify the method name of ibpVersion, rename to ibpVersion, and change all the locations where this method is used? was (Author: JIRAUSER283467): [~cmccabe] hi, My understanding is to just modify the method name of ibpVersion, rename to ibpVersion? > Improve JavaDoc for MetadataVersion.java > > > Key: KAFKA-13867 > URL: https://issues.apache.org/jira/browse/KAFKA-13867 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java
[ https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537864#comment-17537864 ] Kvicii.Yu commented on KAFKA-13867: --- [~cmccabe] hi, My understanding is to just modify the method name of ibpVersion, rename to ibpVersion? > Improve JavaDoc for MetadataVersion.java > > > Key: KAFKA-13867 > URL: https://issues.apache.org/jira/browse/KAFKA-13867 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mjsax commented on pull request #12137: MINOR: Consolidate StreamsException and TaskCorruptedException
mjsax commented on PR #12137: URL: https://github.com/apache/kafka/pull/12137#issuecomment-1128295416 > Avoiding the public API change seems quite hacky to me I leave it up to you -- it's only a small KIP so if you want to do it, just go for it -- personally, I don't see much benefit in unifying both exceptions (also indicated that this PR doesn't change any code that uses them). But it also does not hurt... Your call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537860#comment-17537860 ] Matthias J. Sax commented on KAFKA-6520: It's still an open item – and it's complex (for details, see the KIP discussion thread). > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji commented on pull request #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs
hachikuji commented on PR #12162: URL: https://github.com/apache/kafka/pull/12162#issuecomment-1128281035 @dengziming By the way, KAFKA-13609 seems to apply to broker configurations. Similarly, I do not think we need a KIP since INVALID_CONFIG is already used by these APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13899) Inconsistent error codes returned from AlterConfig APIs
[ https://issues.apache.org/jira/browse/KAFKA-13899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13899. - Fix Version/s: 3.2.1 Resolution: Fixed > Inconsistent error codes returned from AlterConfig APIs > --- > > Key: KAFKA-13899 > URL: https://issues.apache.org/jira/browse/KAFKA-13899 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.2.1 > > > In the AlterConfigs/IncrementalAlterConfigs zk handler, we return > INVALID_REQUEST and INVALID_CONFIG inconsistently. The problem is in > `LogConfig.validate`. We may either return `ConfigException` or > `InvalidConfigException`. When the first of these is thrown, we catch it and > convert to INVALID_REQUEST. It seems more consistent to convert to > INVALID_CONFIG. > Note that the kraft implementation returns INVALID_CONFIG consistently. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs
hachikuji merged PR #12162: URL: https://github.com/apache/kafka/pull/12162 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
cmccabe commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r874258218 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -108,7 +108,8 @@ class KafkaRaftManager[T]( time: Time, metrics: Metrics, threadNamePrefixOpt: Option[String], - val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]] + val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], + apiVersions: ApiVersions Review Comment: I don't see why we would want to pass a specific `ApiVersions` object in. We're not supplying any information, right? We don't know anything about the versions of the nodes until we start contacting them. So, can we just have an accessor method in `RaftManager` that returns the `ApiVersions` of the `RaftManager`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537647#comment-17537647 ] Karthik Raman edited comment on KAFKA-6520 at 5/17/22 12:26 AM: [~mjsax] / [~Yohan123] / [~VinceMu] : Just wondering if there is any update on this issue and the fix to be added in any latest releases? Thank you. was (Author: JIRAUSER289548): [~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue and the fix to be added in any latest releases? Thank you. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cmccabe commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
cmccabe commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r874257133 ## clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java: ## @@ -91,19 +95,11 @@ public static NodeApiVersions create(short apiKey, short minVersion, short maxVe .setMaxVersion(maxVersion))); } -public NodeApiVersions(ApiVersionCollection nodeApiVersions) { -for (ApiVersion nodeApiVersion : nodeApiVersions) { -if (ApiKeys.hasId(nodeApiVersion.apiKey())) { -ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey()); -supportedVersions.put(nodeApiKey, nodeApiVersion); -} else { -// Newer brokers may support ApiKeys we don't know about -unknownApis.add(nodeApiVersion); -} -} +public NodeApiVersions(Collection nodeApiVersions) { Review Comment: Is it necessary to have a special constructor for the case where there are no supported features? This won't be a common situation in the future, right? This feels like the kind of thing that could easily lead to mistakes, if someone accidentally uses it outside of test code. Looking at my IDE I see 10 uses of this in test code. Can we just change them to use the two-argument form? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Moovlin commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…
Moovlin commented on PR #12170: URL: https://github.com/apache/kafka/pull/12170#issuecomment-1128249740 There are no code changes in this PR. It's an update to the quickstart.html file so I think these tests failures can be ignored. This is only my second PR so please let me know if I have to do something different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12138: MINOR: Followers should not have any remote replica states left over from previous leadership
hachikuji commented on code in PR #12138: URL: https://github.com/apache/kafka/pull/12138#discussion_r874209109 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -578,7 +579,8 @@ class Partition(val topicPartition: TopicPartition, // Updating the assignment and ISR state is safe if the partition epoch is // larger or equal to the current partition epoch. updateAssignmentAndIsr( -assignment = partitionState.replicas.asScala.map(_.toInt), +assignment = replicas, +followers = replicas.filter(_ != localBrokerId), Review Comment: Why not push this computation down to `updateAssignmentAndIsr`? ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -673,7 +675,8 @@ class Partition(val topicPartition: TopicPartition, updateAssignmentAndIsr( assignment = partitionState.replicas.asScala.iterator.map(_.toInt).toSeq, -isr = Set.empty[Int], +followers = Seq.empty, +isr = Set.empty, Review Comment: Perhaps a separate issue, but any reason not to update ISR? ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -578,7 +579,8 @@ class Partition(val topicPartition: TopicPartition, // Updating the assignment and ISR state is safe if the partition epoch is // larger or equal to the current partition epoch. updateAssignmentAndIsr( -assignment = partitionState.replicas.asScala.map(_.toInt), +assignment = replicas, Review Comment: Would it make sense to change `assignment` to `replicas`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
hachikuji commented on code in PR #12165: URL: https://github.com/apache/kafka/pull/12165#discussion_r874198037 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => -assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") -assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) +assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() && e.isInstanceOf[RuntimeException]) e.getCause.getCause else e), Review Comment: Would a utility like this make sense? ```scala private def assertCause( expectedClass: Class[_], e: Throwable ): Unit = { var cause = e while (cause != null) { if (expectedClass.isInstance(cause)) { return } cause = cause.getCause } fail(s"Failed to assert cause $expectedClass") } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r874121554 ## core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala: ## @@ -124,4 +124,8 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, def close(): Unit = { networkClient.close() } + + override def toString: String = { Review Comment: We needed to add the `toString` for logging purposes in `RemoteLeaderEndPoint`, where we print out the endpoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r874120703 ## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} + +import scala.collection.Map + +/** + * The LeaderEndPoint acts as an abstraction which serves all fetches from the leader + * for the fetcher threads. + */ +trait LeaderEndPoint { + + type FetchData = FetchResponseData.PartitionData + type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition + + /** + * A boolean specifying if truncation when fetching from the leader is supported + */ + def isTruncationOnFetchSupported: Boolean Review Comment: I agree that it probably shouldn't be a part of the interface. I can probably get away without passing it into the constructors though, since it's always false for LocalLeaderEndPoint and the value can be found from brokerConfig for RemoteLeaderEndPoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r874118991 ## checkstyle/suppressions.xml: ## @@ -309,7 +309,7 @@ files="(RemoteLogManagerConfig).java"/> -
[GitHub] [kafka] kowshik commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
kowshik commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r874090550 ## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix from the ReplicaFetcherThread + * @param endpoint A ReplicaFetcherBlockingSend Review Comment: The doc can be slightly better for this parameter, for example: `The raw leader endpoint to be used by this class for communicating with the leader`. ## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix from the ReplicaFetcherThread Review Comment: Lets remove `from the ReplicaFetcherThread` ## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + *
[GitHub] [kafka] Moovlin opened a new pull request, #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…
Moovlin opened a new pull request, #12170: URL: https://github.com/apache/kafka/pull/12170 …gment.bytes *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [X] Verify test coverage and CI build status - [X] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure
fvaleri commented on code in PR #12159: URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986 ## clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java: ## @@ -449,9 +450,9 @@ public void close() { @Test public void testCloseOldestConnection() throws Exception { String id = "0"; -blockingConnect(id); - -time.sleep(6000); // The max idle time is 5000ms +selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); +selector.poll(0); Review Comment: Hi @divijvaidya, thanks. I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request, #12169: MINOR: improve description of `commit.interval.ms` config
mjsax opened a new pull request, #12169: URL: https://github.com/apache/kafka/pull/12169 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
hachikuji commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r874042264 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1292,8 +1292,10 @@ public void handleResponse(AbstractResponse response) { reenqueue(); } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { reenqueue(); -} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || -error == Errors.CLUSTER_AUTHORIZATION_FAILED) { +} else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) { +log.warn("Retrying upon missing initProducerId due to cluster authorization error"); +reenqueue(); Review Comment: Hmm, I think we do want the error to get propagated back to the application. Most users would expect the application to fail so that they have a chance to see the issue and fix it. High level, what I was thinking we could do is fail the request by calling `result.fail`, then transition back to `UNINITIALIZED`. Then if the user wants to, they can call `initTransactions()` to retry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #12168: Minor: Remove extraneous code in LocalLogManager
mumrah opened a new pull request, #12168: URL: https://github.com/apache/kafka/pull/12168 Looks like this was some left over test code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure
fvaleri commented on code in PR #12159: URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986 ## clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java: ## @@ -449,9 +450,9 @@ public void close() { @Test public void testCloseOldestConnection() throws Exception { String id = "0"; -blockingConnect(id); - -time.sleep(6000); // The max idle time is 5000ms +selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); +selector.poll(0); Review Comment: Hi @divijvaidya, thanks for looking at this. I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`. ## clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java: ## @@ -449,9 +450,9 @@ public void close() { @Test public void testCloseOldestConnection() throws Exception { String id = "0"; -blockingConnect(id); - -time.sleep(6000); // The max idle time is 5000ms +selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); +selector.poll(0); Review Comment: Hi @divijvaidya, thanks for looking at this. I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure
fvaleri commented on code in PR #12159: URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986 ## clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java: ## @@ -449,9 +450,9 @@ public void close() { @Test public void testCloseOldestConnection() throws Exception { String id = "0"; -blockingConnect(id); - -time.sleep(6000); // The max idle time is 5000ms +selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); +selector.poll(0); Review Comment: Thanks @divijvaidya for looking at this. I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12158: MINOR:A few code cleanUps in KafkaController
hachikuji commented on code in PR #12158: URL: https://github.com/apache/kafka/pull/12158#discussion_r873958800 ## core/src/main/scala/kafka/controller/KafkaController.scala: ## @@ -468,14 +468,6 @@ class KafkaController(val config: KafkaConfig, // shutdown leader rebalance scheduler kafkaScheduler.shutdown() -offlinePartitionCount = 0 Review Comment: The way `updateMetrics` is written is a little annoying. While we're at it, would it be any clearer to revise `updateMetrics` to the following structure: ```scala if (isActive) { offlinePartitionCount = controllerContext.offlinePartitionCount ... } else { offlinePartitionCount = 0 ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13716) add tests for `DeleteRecordsCommand` class
[ https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537654#comment-17537654 ] Richard Joerger edited comment on KAFKA-13716 at 5/16/22 4:53 PM: -- Since we havn't heard from Shivanjal, I went ahead and added some test as well as opened a pull request. Any feedback would be great. was (Author: rjoerger): Since we hadn't heard from Shivanjal, I went ahead and added some test as well as opened a pull request. Any feedback would be great. > add tests for `DeleteRecordsCommand` class > -- > > Key: KAFKA-13716 > URL: https://issues.apache.org/jira/browse/KAFKA-13716 > Project: Kafka > Issue Type: Test > Components: tools >Reporter: Luke Chen >Assignee: Shivanjal Arora >Priority: Major > Labels: Newbie, newbie > > Found there's no tests for `DeleteRecordsCommand` class, which is used in > `kafka-delete-records.sh`. We should add it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13716) add tests for `DeleteRecordsCommand` class
[ https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537654#comment-17537654 ] Richard Joerger commented on KAFKA-13716: - Since we hadn't heard from Shivanjal, I went ahead and added some test as well as opened a pull request. Any feedback would be great. > add tests for `DeleteRecordsCommand` class > -- > > Key: KAFKA-13716 > URL: https://issues.apache.org/jira/browse/KAFKA-13716 > Project: Kafka > Issue Type: Test > Components: tools >Reporter: Luke Chen >Assignee: Shivanjal Arora >Priority: Major > Labels: Newbie, newbie > > Found there's no tests for `DeleteRecordsCommand` class, which is used in > `kafka-delete-records.sh`. We should add it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537647#comment-17537647 ] Karthik Raman edited comment on KAFKA-6520 at 5/16/22 4:46 PM: --- [~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue and the fix to be added in any latest releases? Thank you. was (Author: JIRAUSER289548): [~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue to be added in any latest releases? Thank you. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537647#comment-17537647 ] Karthik Raman commented on KAFKA-6520: -- [~mjsax] / [~Yohan123] : Just wondering if there is any update on this issue to be added in any latest releases? Thank you. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12153: MINOR: Clarify impact of num.replica.fetchers
hachikuji merged PR #12153: URL: https://github.com/apache/kafka/pull/12153 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs
hachikuji commented on PR #12162: URL: https://github.com/apache/kafka/pull/12162#issuecomment-1127889552 @dengziming I am thinking not. We already raise INVALID_CONFIG in some cases and it was documented explicitly in at least the IncrementalAlterConfig KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Moovlin opened a new pull request, #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…
Moovlin opened a new pull request, #12167: URL: https://github.com/apache/kafka/pull/12167 …eleteRecordsCommand. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* Added a new test for the front end of the DeleteRecordsCommand command line tool. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537612#comment-17537612 ] Manuel Garcia Cabrera commented on KAFKA-12495: --- [~showuon] I'm short on time right now, so I won't be able to tackle this. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Luke Chen >Priority: Critical > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2,
[GitHub] [kafka] mimaison commented on pull request #10644: [KAFKA-12635] auto sync consumer offset 0 when translated offset larger than partition end offset
mimaison commented on PR #10644: URL: https://github.com/apache/kafka/pull/10644#issuecomment-1127837181 Closing, this issue has been fixed in https://github.com/apache/kafka/pull/11748 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison closed pull request #10644: [KAFKA-12635] auto sync consumer offset 0 when translated offset larger than partition end offset
mimaison closed pull request #10644: [KAFKA-12635] auto sync consumer offset 0 when translated offset larger than partition end offset URL: https://github.com/apache/kafka/pull/10644 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-12635. Fix Version/s: 3.3.0 Assignee: Mickael Maison (was: Ning Zhang) Resolution: Fixed > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Mickael Maison >Priority: Major > Fix For: 3.3.0 > > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mimaison merged pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison merged PR #11748: URL: https://github.com/apache/kafka/pull/11748 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r873878544 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -169,6 +172,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) +.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs Review Comment: @dadufour To translate these offsets the checkpoint connector would need to access the offset-syncs topic from the other MirrorMaker instance (and reverse the mapping). This all seems doable but this will require a KIP. @viktorsomogyi @urbandan is this something you'd be interested in doing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] qingwei91 commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock
qingwei91 commented on code in PR #12166: URL: https://github.com/apache/kafka/pull/12166#discussion_r873762061 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -226,9 +226,10 @@ private void emitNonJoinedOuterRecords( if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) { return; } -if (sharedTimeTracker.nextTimeToEmit == 0) { -sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); -} + +// Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime, +// they can get out of sync during a clock drift +sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); Review Comment: Is it ok to have comments here? it wasn't obvious to me what this piece of code was doing initially, I thought having comments might help, but I don't feel strongly, please let me know if you'd like it removed ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() { runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows); } +@Test +public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() { Review Comment: This test is quite convoluted because it relies on low-level API, this appears to be the 1st instance in test (other test relies on higher level API), is this acceptable? I resort to this approach because we need to manipulate TimeTracker which isn't available in high level API. And I don't feel comfortable to make larger change in the codebase. Please let me know if you think there's a better way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] qingwei91 opened a new pull request, #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock
qingwei91 opened a new pull request, #12166: URL: https://github.com/apache/kafka/pull/12166 We should sync nextTimeToEmit with wall clock on each method call to ensure throttling works correctly in case of clock drift. If we dont, then in the event of significant clock drift, throttling might not happen for a long time, this can hurt performance. I've added a unit test to simulate clock drift and verify my change works. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
showuon commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r873739814 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -319,27 +319,21 @@ public void testReplication() throws Exception { waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster."); -waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, -Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); Review Comment: Thanks for the clear explanation. I understand now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13906) Invalid replica state transition
[ https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-13906: Labels: BUG controller replication reproducible-bug (was: BUG controller) > Invalid replica state transition > > > Key: KAFKA-13906 > URL: https://issues.apache.org/jira/browse/KAFKA-13906 > Project: Kafka > Issue Type: Bug > Components: controller, core, replication >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, > 3.2.1 >Reporter: Igor Soarez >Priority: Major > Labels: BUG, controller, replication, reproducible-bug > > The controller runs into an IllegalStateException when reacting to changes in > broker membership status if there are topics that are pending deletion. > > How to reproduce: > # Setup cluster with 3 brokers > # Create a topic with a partition being led by each broker and produce some > data > # Kill one of the brokers that is not the controller, and keep that broker > down > # Delete the topic > # Restart the other broker that is not the controller > > Logs and stacktrace: > {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed > (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful > state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} > {{ at > scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}} > {{ at > kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}} > {{ at > kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}} > {{ at > kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}} > {{ at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}} > {{ at > kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}} > {{ at > kafka.controller.KafkaController.process(KafkaController.scala:2534)}} > {{ at > kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}} > {{ at > kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}} > {{--}} > {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states > before moving to OnlineReplica state. Instead it is in > ReplicaDeletionSuccessful state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at >
[jira] [Updated] (KAFKA-13906) Invalid replica state transition
[ https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-13906: Labels: BUG controller (was: reproducible-bug) > Invalid replica state transition > > > Key: KAFKA-13906 > URL: https://issues.apache.org/jira/browse/KAFKA-13906 > Project: Kafka > Issue Type: Bug > Components: controller, core, replication >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, > 3.2.1 >Reporter: Igor Soarez >Priority: Major > Labels: BUG, controller > > The controller runs into an IllegalStateException when reacting to changes in > broker membership status if there are topics that are pending deletion. > > How to reproduce: > # Setup cluster with 3 brokers > # Create a topic with a partition being led by each broker and produce some > data > # Kill one of the brokers that is not the controller, and keep that broker > down > # Delete the topic > # Restart the other broker that is not the controller > > Logs and stacktrace: > {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed > (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful > state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} > {{ at > scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}} > {{ at > kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}} > {{ at > kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}} > {{ at > kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}} > {{ at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}} > {{ at > kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}} > {{ at > kafka.controller.KafkaController.process(KafkaController.scala:2534)}} > {{ at > kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}} > {{ at > kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}} > {{--}} > {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states > before moving to OnlineReplica state. Instead it is in > ReplicaDeletionSuccessful state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} > {{ at >
[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric
[ https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-8366: --- Component/s: controller core metrics > partitions of topics being deleted show up in the offline partitions metric > --- > > Key: KAFKA-8366 > URL: https://issues.apache.org/jira/browse/KAFKA-8366 > Project: Kafka > Issue Type: Bug > Components: controller, core, metrics >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1 >Reporter: Radai Rosenblatt >Priority: Major > Labels: BUG, controller, metrics, reproducible-bug > > i believe this is a bug > offline partitions is a metric that indicates an error condition - lack of > kafka availability. > as an artifact of how deletion is implemented the partitions for a topic > undergoing deletion will show up as offline, which just creates > false-positive alerts. > if needed, maybe there should exist a separate "partitions to be deleted" > sensor. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric
[ https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-8366: --- Issue Type: Bug (was: Improvement) > partitions of topics being deleted show up in the offline partitions metric > --- > > Key: KAFKA-8366 > URL: https://issues.apache.org/jira/browse/KAFKA-8366 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1 >Reporter: Radai Rosenblatt >Priority: Major > Labels: BUG, controller, metrics > > i believe this is a bug > offline partitions is a metric that indicates an error condition - lack of > kafka availability. > as an artifact of how deletion is implemented the partitions for a topic > undergoing deletion will show up as offline, which just creates > false-positive alerts. > if needed, maybe there should exist a separate "partitions to be deleted" > sensor. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric
[ https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-8366: --- Labels: BUG controller metrics reproducible-bug (was: BUG controller metrics) > partitions of topics being deleted show up in the offline partitions metric > --- > > Key: KAFKA-8366 > URL: https://issues.apache.org/jira/browse/KAFKA-8366 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1 >Reporter: Radai Rosenblatt >Priority: Major > Labels: BUG, controller, metrics, reproducible-bug > > i believe this is a bug > offline partitions is a metric that indicates an error condition - lack of > kafka availability. > as an artifact of how deletion is implemented the partitions for a topic > undergoing deletion will show up as offline, which just creates > false-positive alerts. > if needed, maybe there should exist a separate "partitions to be deleted" > sensor. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13906) Invalid replica state transition
Igor Soarez created KAFKA-13906: --- Summary: Invalid replica state transition Key: KAFKA-13906 URL: https://issues.apache.org/jira/browse/KAFKA-13906 Project: Kafka Issue Type: Bug Components: controller, core, replication Affects Versions: 3.1.1, 3.0.1, 3.0.0, 3.1.0, 3.2.0, 3.3.0, 3.0.2, 3.1.2, 3.2.1 Reporter: Igor Soarez The controller runs into an IllegalStateException when reacting to changes in broker membership status if there are topics that are pending deletion. How to reproduce: # Setup cluster with 3 brokers # Create a topic with a partition being led by each broker and produce some data # Kill one of the brokers that is not the controller, and keep that broker down # Delete the topic # Restart the other broker that is not the controller Logs and stacktrace: {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 epoch 1 initiated state change of replica 3 for partition test-topic-2 from ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed (state.change.logger)}} {{java.lang.IllegalStateException: Replica [Topic=test-topic,Partition=2,Replica=3] should be in the OfflineReplica,ReplicaDeletionStarted states before moving to ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful state}} {{ at kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} {{ at scala.collection.immutable.List.foreach(List.scala:333)}} {{ at kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} {{ at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} {{ at scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}} {{ at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}} {{ at kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}} {{ at kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}} {{ at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}} {{ at kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}} {{ at kafka.controller.KafkaController.process(KafkaController.scala:2534)}} {{ at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}} {{ at kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}} {{--}} {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 epoch 1 initiated state change of replica 3 for partition test-topic-2 from ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}} {{java.lang.IllegalStateException: Replica [Topic=test-topic,Partition=2,Replica=3] should be in the NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states before moving to OnlineReplica state. Instead it is in ReplicaDeletionSuccessful state}} {{ at kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} {{ at scala.collection.immutable.List.foreach(List.scala:333)}} {{ at kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} {{ at kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} {{ at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} {{ at scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}} {{ at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}} {{ at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:543)}} {{ at kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1607)}} {{ at kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
[GitHub] [kafka] mimaison commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
mimaison commented on PR #10528: URL: https://github.com/apache/kafka/pull/10528#issuecomment-1127619143 Thanks, that makes sense. I've started reviewing #11780 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on PR #11748: URL: https://github.com/apache/kafka/pull/11748#issuecomment-1127616971 Thanks for the review @showuon. I've replied to your questions and pushed an update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r873670168 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal")); } +@Test +public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedException { +String consumerGroupName = "consumer-group-no-checkpoints"; +Map consumerProps = Collections.singletonMap("group.id", consumerGroupName); + +// ensure there are some records in the topic on the source cluster +produceMessages(primary, "test-topic-1"); + +// warm up consumers before starting the connectors, so we don't need to wait for discovery +warmUpConsumer(consumerProps); + +// one way replication from primary to backup +mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); +mm2Config = new MirrorMakerConfig(mm2Props); +waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + +// make sure the topics are created in the backup cluster +waitForTopicCreated(backup, remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS)); +waitForTopicCreated(backup, remoteTopicName("test-topic-no-records", PRIMARY_CLUSTER_ALIAS)); + +// commit some offsets for both topics in the source cluster +TopicPartition tp1 = new TopicPartition("test-topic-1", 0); +TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0); +//Map consumerProps = Collections.singletonMap("group.id", consumerGroupName); +try (Consumer consumer = primary.kafka().createConsumer(consumerProps)) { +Collection tps = Arrays.asList(tp1, tp2); +Map endOffsets = consumer.endOffsets(tps); +Map offsetsToCommit = endOffsets.entrySet().stream() +.collect(Collectors.toMap( +Map.Entry::getKey, +e -> new OffsetAndMetadata(e.getValue()) +)); +consumer.commitSync(offsetsToCommit); +} + +// Only test-topic-1 should have translated offsets because we've not yet mirrored any records for topic-no-records +MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)); +waitForCondition(() -> { +Map translatedOffsets = backupClient.remoteConsumerOffsets( +consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L)); +return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) && + !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); +}, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to backup cluster"); + +// Send some records to topic-no-records in the source cluster Review Comment: I've renamed to `test-topic-no-checkpoints` so it match the test/group names -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure
divijvaidya commented on code in PR #12159: URL: https://github.com/apache/kafka/pull/12159#discussion_r873646603 ## clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java: ## @@ -449,9 +450,9 @@ public void close() { @Test public void testCloseOldestConnection() throws Exception { String id = "0"; -blockingConnect(id); - -time.sleep(6000); // The max idle time is 5000ms +selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); +selector.poll(0); Review Comment: Note that `connect()` is synchronous method here. It might not complete with the first poll event. That is the reason it is tested in a while loop inside `blockingConnect`. My suggestion would be to keep using blockingConnect with a (new) timeout to terminate while loop in it similar to implementation of `NetworkTestUtils.waitForChannelReady`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
franz1981 commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r873652472 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -30,6 +33,18 @@ */ public final class Checksums { +private static final MethodHandle BYTE_BUFFER_UPDATE; + +static { +MethodHandle byteBufferUpdate = null; +try { +byteBufferUpdate = MethodHandles.publicLookup().findVirtual(Checksum.class, "update", MethodType.methodType(void.class, ByteBuffer.class)); +} catch (Throwable silent) { Review Comment: I've added a LOG::warn in case the given signature is expected to be found (although it doesn't consider the security manager enabling option - but I don't believe is a common Kafka practice suggested to users). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric
[ https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-8366: --- Affects Version/s: 3.1.1 3.0.1 3.0.0 3.1.0 3.2.0 3.3.0 3.0.2 3.2.1 > partitions of topics being deleted show up in the offline partitions metric > --- > > Key: KAFKA-8366 > URL: https://issues.apache.org/jira/browse/KAFKA-8366 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1 >Reporter: Radai Rosenblatt >Priority: Major > > i believe this is a bug > offline partitions is a metric that indicates an error condition - lack of > kafka availability. > as an artifact of how deletion is implemented the partitions for a topic > undergoing deletion will show up as offline, which just creates > false-positive alerts. > if needed, maybe there should exist a separate "partitions to be deleted" > sensor. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric
[ https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537501#comment-17537501 ] Igor Soarez commented on KAFKA-8366: I also ran into this issue. I managed to replicate this bug with an integration test [https://github.com/soarez/kafka/blob/replicate-bug-offline-partition-metrics-from-deleted-topics/core/src/test/scala/integration/kafka/api/OfflinePartitionsFromDeletedTopicTest.scala] The problem is that the controller caches the offline partitions count, and when it is re-elected it fails to clear it if the topic is now being deleted. > partitions of topics being deleted show up in the offline partitions metric > --- > > Key: KAFKA-8366 > URL: https://issues.apache.org/jira/browse/KAFKA-8366 > Project: Kafka > Issue Type: Improvement >Reporter: Radai Rosenblatt >Priority: Major > > i believe this is a bug > offline partitions is a metric that indicates an error condition - lack of > kafka availability. > as an artifact of how deletion is implemented the partitions for a topic > undergoing deletion will show up as offline, which just creates > false-positive alerts. > if needed, maybe there should exist a separate "partitions to be deleted" > sensor. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
franz1981 commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r873640450 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { Review Comment: I can add some doc, although in theory it's unrelated to the changes of this PR; but is a nice addition :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
franz1981 commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r873638337 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { if (buffer.hasArray()) { checksum.update(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, length); -} else { -int start = buffer.position() + offset; -for (int i = start; i < start + length; i++) -checksum.update(buffer.get(i)); +return; +} +if (BYTE_BUFFER_UPDATE != null && buffer.isDirect()) { +final int oldPosition = buffer.position(); Review Comment: We are moving `limit` as well, hence I don't see any advantage re using `mark/reset`. Let me know If I've missed something -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
divijvaidya commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r873603513 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { if (buffer.hasArray()) { checksum.update(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, length); -} else { -int start = buffer.position() + offset; -for (int i = start; i < start + length; i++) -checksum.update(buffer.get(i)); +return; +} +if (BYTE_BUFFER_UPDATE != null && buffer.isDirect()) { +final int oldPosition = buffer.position(); Review Comment: alternatively, use buffer.mark() inside try and buffer.reset() in the finally. ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -30,6 +33,18 @@ */ public final class Checksums { +private static final MethodHandle BYTE_BUFFER_UPDATE; + +static { +MethodHandle byteBufferUpdate = null; +try { +byteBufferUpdate = MethodHandles.publicLookup().findVirtual(Checksum.class, "update", MethodType.methodType(void.class, ByteBuffer.class)); +} catch (Throwable silent) { Review Comment: I understand that you are keeping it here for backward compatibility with older versions (8) of JDK which don't have this API. But we should ideally want to throw an error when we remove support for JDK 8 from Kafka. Could we do one of the following two options here: 1. (preferred) Use [isJava9Compatible](https://docs.gradle.org/current/javadoc/org/gradle/api/JavaVersion.html#isJava9Compatible--) to decide whether we want to propagate the exception here or fail silently OR 2. add a TODO here so that we can remember to propagate exception when support for JDK 8 is removed? Also, a comment explaining why are we consuming the exception would be nice. ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,11 +55,41 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { Review Comment: A java doc here which clarifies that this method leaves the buffer as it was received would be nice addition here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12164: Update upgrade.html
cadonna merged PR #12164: URL: https://github.com/apache/kafka/pull/12164 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r873606546 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -319,27 +319,21 @@ public void testReplication() throws Exception { waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster."); -waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, -Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); Review Comment: For this types of changes, a diagram often helps. We see that `mm2-offset-syncs.primary.internal` only contains mapping from `test-topic-1` on the backup cluster to `backup.test-topic-1` on the primary cluster. Then when backup->primary checkpoint runs it's only able to compute committed offsets for `backup.test-topic-1`, it has no information about offsets for the local `test-topic-1` in the primary cluster. https://user-images.githubusercontent.com/903615/168580961-8f3e5860-cae3-43bf-9d36-ecd7553e42eb.png;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r873586510 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -319,27 +319,21 @@ public void testReplication() throws Exception { waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster."); -waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, -Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); - Map primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)); primaryClient.close(); backupClient.close(); // Failback consumer group to primary cluster -try (Consumer backupConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) { -backupConsumer.assign(primaryOffsets.keySet()); -primaryOffsets.forEach(backupConsumer::seek); -backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); -backupConsumer.commitAsync(); - -assertTrue(backupConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream offset."); -assertTrue(backupConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero downstream offset."); -assertTrue(backupConsumer.position( -new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset."); -assertTrue(backupConsumer.position( +try (Consumer primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) { +primaryConsumer.assign(primaryOffsets.keySet()); +primaryOffsets.forEach(primaryConsumer::seek); +primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); +primaryConsumer.commitAsync(); + +assertTrue(primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero downstream offset."); Review Comment: Same as above, now `remoteConsumerOffsets()` only returns offsets for remote topics that are being mirrored from the backup cluster and these are prefixed with `backup.`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r873584393 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -319,27 +319,21 @@ public void testReplication() throws Exception { waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster."); -waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, -Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); Review Comment: This was querying checkpoints from the `backup.checkpoints.internal` topic. This topic is on the primary cluster and contains data emitted by the backup->primary checkpoint connector. In the backup->primary mirroring flow, the remote topics (that end up in the primary cluster) are all prefixed with `backup.`. With this change we now only emit checkpoints for topics present in offset-syncs, so only topics prefixed with `backup.`. So the assertion above still works, the checkpoint connector has emitted checkpoints for `backup.test-topic-1`. But this assertion is not true anymore for `test-topic-1`. This makes sense because in the primary cluster, `test-topic-1` is a local topic, so the backup->primary checkpoint connector has no business committing offsets for that topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r873584393 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -319,27 +319,21 @@ public void testReplication() throws Exception { waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster."); -waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, -Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); Review Comment: This was querying checkpoints from the `backup.checkpoints.internal` topic. This topic is on the primary cluster and contains data emitted by the backup->primary checkpoint connector. In the backup->primary mirroring flow, the remote topics (that end up in the `primary` cluster) are all prefixed with `backup.`. With this change we now only emit checkpoints for topics present in offset-syncs, so only topics prefixed with `backup.`. So the assertion above still works, the checkpoint connector has emitted checkpoints for `backup.test-topic-1`. But this assertion is not true anymore for `test-topic-1`. This makes sense because in the primary cluster, `test-topic-1` is a local topic, so the backup->primary checkpoint connector has no business committing offsets for that topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
dengziming commented on code in PR #12165: URL: https://github.com/apache/kafka/pull/12165#discussion_r873557577 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => -assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") -assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) +assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() && e.isInstanceOf[RuntimeException]) e.getCause.getCause else e), Review Comment: Basically, we can change it to something like : ``` throw if (e.isInstanceOf[ExecutionException]) e.getCause else if (e.isInstanceOf[RuntimeException]) e.getCause.getCause ``` this would make the logic a little messy, and I'm not sure should we should keep the original `RuntimeException` created by @cmccabe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
showuon commented on code in PR #12165: URL: https://github.com/apache/kafka/pull/12165#discussion_r873525802 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => -assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") -assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) +assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() && e.isInstanceOf[RuntimeException]) e.getCause.getCause else e), Review Comment: Thanks for the explanation. I understand now. So, do you think we should fix the failed tests by fixing this: https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444 I mean, the above code is trying to throw the root cause reason the broker failed up to here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/Kafka.scala#L113 And then, we can log the root cause before exit. So, I think we should fix the line `throw if (e.isInstanceOf[ExecutionException]) e.getCause else e ` to allow it to throw the root cause for both `ExecutionException` and `RuntimeException`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12165: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
dengziming commented on code in PR #12165: URL: https://github.com/apache/kafka/pull/12165#discussion_r873512570 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => -assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") -assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) +assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() && e.isInstanceOf[RuntimeException]) e.getCause.getCause else e), Review Comment: Yeah, in the finally block, we will unwrap `ExecutionExecution` so we don't have to invoke `getCause`, but after #11969 the exception type is `RuntimeException` so we have to invoke getCause once more: https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444 I find this is code is no longer useful after #11969, do you think we can remove it here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
dengziming commented on code in PR #12165: URL: https://github.com/apache/kafka/pull/12165#discussion_r873512570 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => -assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") -assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) +assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() && e.isInstanceOf[RuntimeException]) e.getCause.getCause else e), Review Comment: Yeah, in the finally block, we will unwrap `ExecutionExecution` so we don't have to invoke `getCause`: https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444 I find this is code is no longer useful after #11969, do you think we can remove it here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
dengziming commented on code in PR #12165: URL: https://github.com/apache/kafka/pull/12165#discussion_r873512570 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => -assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") -assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) +assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() && e.isInstanceOf[RuntimeException]) e.getCause.getCause else e), Review Comment: Yeah, in the finally block, we will unwrap `ExecutionExecution` : https://github.com/apache/kafka/blob/49226721c0dc5e5b327e0754e01c367990b43758/core/src/main/scala/kafka/server/BrokerServer.scala#L444 I find this is code is no longer useful after #11969, do you think we can remove it here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13905) Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
Luke Chen created KAFKA-13905: - Summary: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs Key: KAFKA-13905 URL: https://issues.apache.org/jira/browse/KAFKA-13905 Project: Kafka Issue Type: Test Reporter: Luke Chen Assignee: dengziming h3. Error Message org.opentest4j.AssertionFailedError: Unexpected exception java.lang.RuntimeException: Received a fatal error while waiting for the broker to catch up with the current cluster metadata. ==> expected: but was: h3. Stacktrace org.opentest4j.AssertionFailedError: Unexpected exception java.lang.RuntimeException: Received a fatal error while waiting for the broker to catch up with the current cluster metadata. ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at kafka.server.ServerShutdownTest.verifyCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:198) at kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs(ServerShutdownTest.scala:168) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon commented on pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
showuon commented on PR #12165: URL: https://github.com/apache/kafka/pull/12165#issuecomment-1127413805 Also, I've created [KAFKA-13905](https://issues.apache.org/jira/browse/KAFKA-13905) for this issue. Please link the PR to that issue, in case there are also other people try to fix it at the same time. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
showuon commented on code in PR #12165: URL: https://github.com/apache/kafka/pull/12165#discussion_r873488369 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -195,8 +195,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => -assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") -assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) +assertTrue(exceptionClassTag.runtimeClass.isInstance(if (isKRaftTest() && e.isInstanceOf[RuntimeException]) e.getCause.getCause else e), Review Comment: I'm a little confused here. You said, we just wrap a `RuntimeException` for the original exception. But here, we `getCause` twice, compared with before, it didn't `getCause`. Am I missing something? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
dengziming commented on PR #12165: URL: https://github.com/apache/kafka/pull/12165#issuecomment-1127393942 Hello @cmccabe, PTAL at this solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request, #12165: MINOR: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs
dengziming opened a new pull request, #12165: URL: https://github.com/apache/kafka/pull/12165 *More detailed description of your change* Before #11969, We will throw an `ExecutionException(KafkaStorageException)` in `BrokerServer.startup`, and the outside ExecutionException is removed in finally block. After #11969, We will throw a `RuntimeException(ExecutionException(KafkaStorageException))`, so this test is constantly failing since the Exception type is not consistent. To fix this, we just need to invoke `getCause` twice to remove RuntimeException and ExecutionException. *Summary of testing strategy (including rationale)* This test is no longer failing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API
showuon merged PR #12087: URL: https://github.com/apache/kafka/pull/12087 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API
showuon commented on PR #12087: URL: https://github.com/apache/kafka/pull/12087#issuecomment-1127340235 All tests passed. Merge into trunk. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS
showuon commented on code in PR #12140: URL: https://github.com/apache/kafka/pull/12140#discussion_r873420698 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java: ## @@ -488,6 +489,54 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() { ensureActiveGroup(rejoinedGeneration, memberId); } +@Test +public void testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws InterruptedException, ExecutionException { +setupCoordinator(); + +String memberId = "memberId"; +int generation = 5; + +// Rebalance once to initialize the generation and memberId +mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +expectJoinGroup("", generation, memberId); +expectSyncGroup(generation, memberId); +ensureActiveGroup(generation, memberId); + +// Force a rebalance +coordinator.requestRejoin("Manual test trigger"); +assertTrue(coordinator.rejoinNeededOrPending()); + +ExecutorService executor = Executors.newFixedThreadPool(1); +try { +// Return RebalanceInProgress in syncGroup +int rejoinedGeneration = 10; +expectJoinGroup(memberId, rejoinedGeneration, memberId); +expectRebalanceInProgressForSyncGroup(rejoinedGeneration, memberId); +Future secondJoin = executor.submit(() -> + coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE))); + +TestUtils.waitForCondition(() -> { +AbstractCoordinator.Generation currentGeneration = coordinator.generation(); +return currentGeneration.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId && +currentGeneration.memberId.equals(memberId); +}, 2000, "Generation should be reset"); Review Comment: nit: I saw the `2000` timeout appeared in `AbstractCoordinatorTest.java` in many places. Could we use a static variable to replace them? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12703) Allow unencrypted private keys when using PEM files
[ https://issues.apache.org/jira/browse/KAFKA-12703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-12703. - Fix Version/s: 3.3.0 Resolution: Fixed > Allow unencrypted private keys when using PEM files > --- > > Key: KAFKA-12703 > URL: https://issues.apache.org/jira/browse/KAFKA-12703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.0 >Reporter: Brian Bascoy >Priority: Major > Fix For: 3.3.0 > > > Unencrypted PEM files seem to be internally [supported in the > codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509] > but setting an ssl.key.password is currently enforced by createKeystore (on > DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder > if this limitation could simply be removed: > > [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b] > > Thanks -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-12703) Allow unencrypted private keys when using PEM files
[ https://issues.apache.org/jira/browse/KAFKA-12703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12703: Summary: Allow unencrypted private keys when using PEM files (was: Unencrypted PEM files can't be loaded) > Allow unencrypted private keys when using PEM files > --- > > Key: KAFKA-12703 > URL: https://issues.apache.org/jira/browse/KAFKA-12703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.0 >Reporter: Brian Bascoy >Priority: Major > > Unencrypted PEM files seem to be internally [supported in the > codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509] > but setting an ssl.key.password is currently enforced by createKeystore (on > DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder > if this limitation could simply be removed: > > [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b] > > Thanks -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dajac merged pull request #11916: KAFKA-12703; Allow unencrypted private keys when using PEM files
dajac merged PR #11916: URL: https://github.com/apache/kafka/pull/11916 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-7955) Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster
[ https://issues.apache.org/jira/browse/KAFKA-7955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537373#comment-17537373 ] Richard Fussenegger edited comment on KAFKA-7955 at 5/16/22 7:15 AM: - https://issues.apache.org/jira/browse/KAFKA-9060 can be considered a duplicate of this one, because one BOM for the entire {{org.apache.kafka}} group would be sufficient to cover everything. Adding a BOM would require substantial rewrite of the Gradle build, because it currently configures all projects to be Java libraries. Using the Java platform plugin thus breaks various tasks that are expected to exist as well as publishing. was (Author: fleshgrinder): https://issues.apache.org/jira/browse/KAFKA-9060 can be considered a duplicate of this one, because one BOM for the entire {{org.apache.kafka}} group would be sufficient to cover everything. > Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster > > > Key: KAFKA-7955 > URL: https://issues.apache.org/jira/browse/KAFKA-7955 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Jeremy Custenborder >Priority: Major > > Using EmbeddedConnectCluster for testing connectors is a little difficult > given the number of dependencies that are required. Providing a > [BOM|https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] > will make it easier for connector developers. For example here are the > dependencies that are required. > {code:xml} > > > org.apache.kafka > connect-api > ${kafka.version} > > > org.apache.kafka > connect-runtime > ${kafka.version} > test > test-jar > > > org.apache.kafka > connect-runtime > ${kafka.version} > > > org.apache.kafka > kafka-clients > ${kafka.version} > > > junit > junit > 4.12 > > > org.apache.kafka > kafka-clients > ${kafka.version} > test > test-jar > > > org.apache.kafka > kafka_2.11 > ${kafka.version} > > > org.apache.kafka > kafka_2.11 > test-jar > test > ${kafka.version} > > > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-9060) Publish BOMs for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537374#comment-17537374 ] Richard Fussenegger commented on KAFKA-9060: In https://issues.apache.org/jira/browse/KAFKA-7955 another user was already asking for a BOM. This issue is more detailed, but both are asking for the same. > Publish BOMs for Kafka > -- > > Key: KAFKA-9060 > URL: https://issues.apache.org/jira/browse/KAFKA-9060 > Project: Kafka > Issue Type: Improvement >Reporter: Michael Holler >Priority: Trivial > > Hey there! Love the project, but I would love it if there was a BOM file that > is published for each version. If you're not familiar with a BOM, it stands > for "Bill of Materials" it helps your Gradle (in my case, but it's originally > a Maven thing) file look like this (using JDBI's implementation as an > example): > {code} > dependencies { > implementation(platform("org.jdbi:jdbi3-bom:3.10.1")) > implementation("org.jdbi:jdbi3-core") > implementation("org.jdbi:jdbi3-kotlin") > implementation("org.jdbi:jdbi3-kotlin-sqlobject") > implementation("org.jdbi:jdbi3-jackson2") > } > {code} > Instead of this: > {code} > val jdbiVersion by extra { "2.6.1" } > > dependencies { > implementation("org.jdbi:jdbi3-core:$jdbiVersion") > implementation("org.jdbi:jdbi3-kotlin:$jdbiVersion") > implementation("org.jdbi:jdbi3-kotlin-sqlobject:$jdbiVersion") > implementation("org.jdbi:jdbi3-jackson2:$jdbiVersion") > } > {code} > Notice how you just leave the versions off when you use a BOM. This can help > reduce the number of dependency compatibility surprises one can encounter, > especially if a transitive dependency brings in a newer version of one of the > components (it'll be reduced to the BOM's version). Note also that you still > have to list dependencies you want with a BOM, just not the versions. > Here's a deeper dive into how a BOM works: > https://howtodoinjava.com/maven/maven-bom-bill-of-materials-dependency/ > The Maven help site also has a section on it (Ctrl+F for "BOM"): > https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html > I think BOMs would be a great for the users of the Kafka project because > there are lots of Kafka libraries (streams, connect-api, connect-json, etc) > that require the same version as other Kafka dependencies to work correctly. > BOMs were designed for exactly this use case. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-7955) Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster
[ https://issues.apache.org/jira/browse/KAFKA-7955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537373#comment-17537373 ] Richard Fussenegger commented on KAFKA-7955: https://issues.apache.org/jira/browse/KAFKA-9060 can be considered a duplicate of this one, because one BOM for the entire {{org.apache.kafka}} group would be sufficient to cover everything. > Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster > > > Key: KAFKA-7955 > URL: https://issues.apache.org/jira/browse/KAFKA-7955 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Jeremy Custenborder >Priority: Major > > Using EmbeddedConnectCluster for testing connectors is a little difficult > given the number of dependencies that are required. Providing a > [BOM|https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] > will make it easier for connector developers. For example here are the > dependencies that are required. > {code:xml} > > > org.apache.kafka > connect-api > ${kafka.version} > > > org.apache.kafka > connect-runtime > ${kafka.version} > test > test-jar > > > org.apache.kafka > connect-runtime > ${kafka.version} > > > org.apache.kafka > kafka-clients > ${kafka.version} > > > junit > junit > 4.12 > > > org.apache.kafka > kafka-clients > ${kafka.version} > test > test-jar > > > org.apache.kafka > kafka_2.11 > ${kafka.version} > > > org.apache.kafka > kafka_2.11 > test-jar > test > ${kafka.version} > > > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)