[GitHub] [kafka] dengziming commented on pull request #11910: KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode
dengziming commented on pull request #11910: URL: https://github.com/apache/kafka/pull/11910#issuecomment-1072029922 cc @cmccabe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink
[ https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508566#comment-17508566 ] Anil Dasari edited comment on KAFKA-13601 at 3/18/22, 4:52 AM: --- Filed partitioner uses starting offset of the batch in the file name and is the only varying parameter. There will be only one parquet file per worker (consumer/partition) (that is out of sync with offsets) present in destination (s3 in my case) if worker dies before committing an offset. So new or restarted worker would override that parquet file as start offset of the partition remans same. Please let me know if you have any questions. was (Author: JIRAUSER283879): File name in the filed partitioner uses starting offset of the batch and there will be only one parquet file per worker (consumer/partition) present in destination (s3 in my case) if worker dies before committing an offset. So new or restarted worker would override that parquet file as start offset of the partition remans same. Please let me know if you have any questions. > Add option to support sync offset commit in Kafka Connect Sink > -- > > Key: KAFKA-13601 > URL: https://issues.apache.org/jira/browse/KAFKA-13601 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Anil Dasari >Priority: Major > > Exactly once in s3 connector with scheduled rotation and field partitioner > can be achieved with consumer offset sync' commit after message batch flushed > to sink successfully > Currently, WorkerSinkTask committing the consumer offsets asynchronously and > at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354] > > Add config to allow user to select synchronous commit over > WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink
[ https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508566#comment-17508566 ] Anil Dasari commented on KAFKA-13601: - File name in the filed partitioner uses starting offset of the batch and there will be only one parquet file per worker (consumer/partition) present in destination (s3 in my case) if worker dies before committing an offset. So new or restarted worker would override that parquet file as start offset of the partition remans same. Please let me know if you have any questions. > Add option to support sync offset commit in Kafka Connect Sink > -- > > Key: KAFKA-13601 > URL: https://issues.apache.org/jira/browse/KAFKA-13601 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Anil Dasari >Priority: Major > > Exactly once in s3 connector with scheduled rotation and field partitioner > can be achieved with consumer offset sync' commit after message batch flushed > to sink successfully > Currently, WorkerSinkTask committing the consumer offsets asynchronously and > at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354] > > Add config to allow user to select synchronous commit over > WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guizmaii closed pull request #11904: MINOR: Fix `ConsumerConfig.ISOLATION_LEVEL_DOC`
guizmaii closed pull request #11904: URL: https://github.com/apache/kafka/pull/11904 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guizmaii commented on pull request #11904: MINOR: Fix `ConsumerConfig.ISOLATION_LEVEL_DOC`
guizmaii commented on pull request #11904: URL: https://github.com/apache/kafka/pull/11904#issuecomment-1071947794 @dajac I opened a PR on trunk here: https://github.com/apache/kafka/pull/11915 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11905: MINOR: Fix incorrect log for out-of-order KTable
showuon commented on pull request #11905: URL: https://github.com/apache/kafka/pull/11905#issuecomment-1071946919 @tchiotludo , thanks for your contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11905: MINOR: Fix incorrect log for out-of-order KTable
showuon merged pull request #11905: URL: https://github.com/apache/kafka/pull/11905 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11905: MINOR: Fix incorrect log for out-of-order KTable
showuon commented on pull request #11905: URL: https://github.com/apache/kafka/pull/11905#issuecomment-1071946079 Failed tests are unrelated: ``` Build / JDK 17 and Scala 2.13 / kafka.server.LogDirFailureTest.testReplicaFetcherThreadAfterLogDirFailureOnFollower() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testSwitchingToTopicCreationEnabled Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508513#comment-17508513 ] Luke Chen commented on KAFKA-7077: -- Everything goes with idempotent producer by default after v3.0.0 (or more specifically, it's v3.0.1). And this PR [https://github.com/apache/kafka/pull/11475] remove some constraint for it. Close it now. > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > Fix For: 3.2.0 > > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-7077. -- Resolution: Fixed > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > Fix For: 3.2.0 > > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-7077: - Fix Version/s: 3.2.0 > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > Fix For: 3.2.0 > > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13742) Quota byte-rate/request metrics are loaded only when at least one quota is register
[ https://issues.apache.org/jira/browse/KAFKA-13742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya resolved KAFKA-13742. -- Resolution: Not A Problem Given that I got a better understanding of how quota metrics work (https://issues.apache.org/jira/browse/KAFKA-13744), I will close this one. The only comment I may add is that there are no metrics at the moment to match client/users with topic/partitions. This information is only captured on the client-side as far as I know. I found quota metrics as a good proxy to get this mapping, though is still incomplete as the map to the topic is implicit at the moment on the broker-side. Would be a nice addition to have this mapping available, but it should be discussed in another issue if there's interest in that. > Quota byte-rate/request metrics are loaded only when at least one quota is > register > --- > > Key: KAFKA-13742 > URL: https://issues.apache.org/jira/browse/KAFKA-13742 > Project: Kafka > Issue Type: Bug > Components: core, metrics >Reporter: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: quotas > > Quota metrics are loaded only when at least one quota is present: > * Metrics: > [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L552-L563] > * Reporting when quotas are enabled: > [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L249-L256] > * Quotas enabled: `def quotasEnabled: Boolean = quotaTypesEnabled != > QuotaTypes.NoQuotas` > Even though throttling is specific for quotas, byte-rate/request per > user/client-id is a valid metric for any deployment. > > The current workaround is to add _any_ quota, as this will enable metrics for > *all* client-id/users. > If these metrics are captured for all clients regardless of the quotas > created, it would be a better experience to have a config to opt-in into > these metrics instead of creating meaningless quotas just to get these > metrics. > For threshold metrics, it makes sense to me to enable them only when quotas > are enabled. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13744) Quota metric tags are inconsistent
[ https://issues.apache.org/jira/browse/KAFKA-13744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya resolved KAFKA-13744. -- Resolution: Not A Problem > Quota metric tags are inconsistent > -- > > Key: KAFKA-13744 > URL: https://issues.apache.org/jira/browse/KAFKA-13744 > Project: Kafka > Issue Type: Bug > Components: core, metrics >Reporter: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: quotas > Attachments: image-2022-03-15-16-57-12-583.png > > > When enabling metrics for quotas the metrics apply to _all_ clients (see > https://issues.apache.org/jira/browse/KAFKA-13742). > Though, the tags are calculated depending on the quotas registered and > applied to all clients: > [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694] > This causes different metric tags result depending on which quota is > registered first. > For instance, if a quota is registered with userId and clientId, then metrics > are tagged with both, though if then a quota is registered with only tagged > with clientId, then all metrics are only tagged by clientId — even though > user principal is available. > !image-2022-03-15-16-57-12-583.png|width=1034,height=415! > I managed to reproduce this behavior here: > * From 10:30 to 10:45, there was a quota with both client-id and user-id > * It was removed by 10:45, so no metrics were exposed. > * After, a quota with client id was created, and metrics were collected only > with client id, even though the user was available. > I'd expect metrics to always contain both, if available — and simplify the > logic here > [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694]. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13744) Quota metric tags are inconsistent
[ https://issues.apache.org/jira/browse/KAFKA-13744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508473#comment-17508473 ] Jorge Esteban Quilcate Otoya commented on KAFKA-13744: -- Thanks, [~junrao]! I think I got it now after looking deeper into the code and reading the KIPs. So, depending on the type of quotas defined, if all of them are of the same type, then all the clients are tagged with that type: ``` case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled => ("", clientId) case QuotaTypes.UserQuotaEnabled => (sanitizedUser, "") case QuotaTypes.UserClientIdQuotaEnabled => (sanitizedUser, clientId) ``` But, as soon as there is a mix of types, that leads to evaluate each client against the quotas registered: ``` case _ => val userEntity = Some(UserEntity(sanitizedUser)) val clientIdEntity = Some(ClientIdEntity(clientId)) var metricTags = (sanitizedUser, clientId) // 1) /config/users//clients/ if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, clientIdEntity))) { // 2) /config/users//clients/ metricTags = (sanitizedUser, clientId) if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, Some(DefaultClientIdEntity { // 3) /config/users/ metricTags = (sanitizedUser, "") if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, None))) { // 4) /config/users//clients/ metricTags = (sanitizedUser, clientId) if (!overriddenQuotas.containsKey(KafkaQuotaEntity(Some(DefaultUserEntity), clientIdEntity))) { // 5) /config/users//clients/ metricTags = (sanitizedUser, clientId) if (!overriddenQuotas.containsKey(DefaultUserClientIdQuotaEntity)) { // 6) /config/users/ metricTags = (sanitizedUser, "") if (!overriddenQuotas.containsKey(DefaultUserQuotaEntity)) { // 7) /config/clients/ // 8) /config/clients/ metricTags = ("", clientId) } } } } } } ``` If the client doesn’t match, then it falls back to the client id. If there’s any quota for that client, or default, that matches, it applies the tagging depending on the match. This now explains why I'm seeing different behavior depending on what quota I define first. > Quota metric tags are inconsistent > -- > > Key: KAFKA-13744 > URL: https://issues.apache.org/jira/browse/KAFKA-13744 > Project: Kafka > Issue Type: Bug > Components: core, metrics >Reporter: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: quotas > Attachments: image-2022-03-15-16-57-12-583.png > > > When enabling metrics for quotas the metrics apply to _all_ clients (see > https://issues.apache.org/jira/browse/KAFKA-13742). > Though, the tags are calculated depending on the quotas registered and > applied to all clients: > [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694] > This causes different metric tags result depending on which quota is > registered first. > For instance, if a quota is registered with userId and clientId, then metrics > are tagged with both, though if then a quota is registered with only tagged > with clientId, then all metrics are only tagged by clientId — even though > user principal is available. > !image-2022-03-15-16-57-12-583.png|width=1034,height=415! > I managed to reproduce this behavior here: > * From 10:30 to 10:45, there was a quota with both client-id and user-id > * It was removed by 10:45, so no metrics were exposed. > * After, a quota with client id was created, and metrics were collected only > with client id, even though the user was available. > I'd expect metrics to always contain both, if available — and simplify the > logic here > [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694]. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829589585 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; + +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} +} -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +if (election == Election.PREFERRED) { +return electPreferredLeader(); +} -return false; +return electAnyLeader(); } -class BestLeader { -final int node; -final boolean unclean; +/** + * Assumes that the election type is Election.PREFERRED + */ +private ElectionResult electPreferredLeader() { +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} + +Optional onlineLeader = targetReplicas.stream() Review comment: Yes. I think you are correct based on how we use it in the replication control manager. Based on how we use it in the replication control manager, this code should always return at either line 151, line 156 or line 167. I would still like to keep this code as this code is attempting to keep the partition online. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829589585 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; + +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} +} -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +if (election == Election.PREFERRED) { +return electPreferredLeader(); +} -return false; +return electAnyLeader(); } -class BestLeader { -final int node; -final boolean unclean; +/** + * Assumes that the election type is Election.PREFERRED + */ +private ElectionResult electPreferredLeader() { +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} + +Optional onlineLeader = targetReplicas.stream() Review comment: Yes. I think you are correct based on how we use it in the replication control manager. Based on how we use it replication control manager this code should always return at either line 151, line 156 or line 167. I would still like to keep this code as this code is attempting to keep the partition online. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java
jolshan commented on pull request #11912: URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071816320 Ah yeah, that could be useful? I think it's less necessary since the server should be building the response, but doesn't hurt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan removed a comment on pull request #11912: KAFKA-13752: Uuid compare using equals in java
jolshan removed a comment on pull request #11912: URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071812161 I was thinking adding the test from the ticket and verifying it _**doesn't**_ return an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java
jolshan commented on pull request #11912: URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071812161 I was thinking adding the test from the ticket and verifying it _**doesn't**_ return an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13753) Log cleaner should retain transaction metadata in index until corresponding marker is removed
[ https://issues.apache.org/jira/browse/KAFKA-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13753: Summary: Log cleaner should retain transaction metadata in index until corresponding marker is removed (was: Log cleaner should transaction metadata in index until corresponding marker is removed) > Log cleaner should retain transaction metadata in index until corresponding > marker is removed > - > > Key: KAFKA-13753 > URL: https://issues.apache.org/jira/browse/KAFKA-13753 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Currently the log cleaner will remove aborted transactions from the index as > soon as it detects that the data from the transaction is gone. It does not > wait until the corresponding marker has also been removed. Although it is > extremely unlikely, it seems possible today that a Fetch might fail to return > the aborted transaction metadata correctly if a log cleaning occurs > concurrently. This is because the collection of aborted transactions is only > done after the reading data from the log. It would be safer to preserve the > aborted transaction metadata in the index until the marker is also removed. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dengziming commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell
dengziming commented on pull request #11173: URL: https://github.com/apache/kafka/pull/11173#issuecomment-1071767117 @dajac @showuon @jolshan Thank you for your patience. 🤝 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wsun-confluent commented on a change in pull request #11874: Fix typos in configuration docs
wsun-confluent commented on a change in pull request #11874: URL: https://github.com/apache/kafka/pull/11874#discussion_r829578541 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -216,8 +216,10 @@ private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." -+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." -+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; ++ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." ++ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. " ++ " Record ordering is preserved when enable.idempotence is set to true for idempotent " ++ " producer (or transactional producer), even when max in-flight requests are greater than 1 (supported up to 5)."; Review comment: Hello and thanks for reviewing! The callout about record ordering was originally requested within Confluent; the same is mentioned at this page: https://developer.confluent.io/tutorials/message-ordering/kafka.html. We think this would also benefit AK docs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
junrao commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829550657 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; + +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} +} -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +if (election == Election.PREFERRED) { +return electPreferredLeader(); +} -return false; +return electAnyLeader(); } -class BestLeader { -final int node; -final boolean unclean; +/** + * Assumes that the election type is Election.PREFERRED + */ +private ElectionResult electPreferredLeader() { +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} + +Optional onlineLeader = targetReplicas.stream() Review comment: Preferred leader election is an optimization. If we can't move the leader to the preferred one, it seems there is no need to do anything extra. ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; + +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} +} -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +if (election == Election.PREFERRED) { +return electPreferredLeader(); +} -return false; +return electAnyLeader(); } -class BestLeader { -final int node; -final boolean unclean; +/** + * Assumes that the election type is Election.PREFERRED + */ +private ElectionResult electPreferredLeader() { +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica))
[GitHub] [kafka] jolshan commented on a change in pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs
jolshan commented on a change in pull request #11909: URL: https://github.com/apache/kafka/pull/11909#discussion_r829541377 ## File path: tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java ## @@ -172,6 +174,13 @@ public static void main(String[] args) throws Exception { .dest("describeConfigsSupported") .metavar("DESCRIBE_CONFIGS_SUPPORTED") .help("Whether describeConfigs is supported in the AdminClient."); +parser.addArgument("--idempotent-producer-supported") +.action(store()) +.required(true) +.type(Boolean.class) +.dest("idempotentProducerSupported") +.metavar("IDEMPOTENT_PRODUCER_SUPPORTED") +.help("Whether the producer supports idempotency."); Review comment: Ah. How did I miss this 🤦♀️ Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829532685 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; - -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; -return false; +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} } -class BestLeader { -final int node; -final boolean unclean; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +// 1. Check if the election is not PREFERRED and we already have a valid leader +if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +// 2. Attempt preferred replica election +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} + +// 3. Preferred replica was not elected, only continue if the current leader is not a valid leader +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { Review comment: @junrao how about this algorithm: https://github.com/apache/kafka/pull/11893/commits/4ef0b161c2f44933139989ec7e680f2c05839cc5 Is it easier to read and understand? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829522253 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } +private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders"; + +private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + +private void maybeScheduleNextBalancePartitionLeaders() { +if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED && +leaderImbalanceCheckIntervalNs.isPresent() && +replicationControl.arePartitionLeadersImbalanced()) { + +log.debug( +"Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", +MAYBE_BALANCE_PARTITION_LEADERS, +imbalancedScheduled, +leaderImbalanceCheckIntervalNs, +replicationControl.arePartitionLeadersImbalanced() +); + +ControllerWriteEvent event = new ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> { +ControllerResult result = replicationControl.maybeBalancePartitionLeaders(); + +// reschedule the operation after the leaderImbalanceCheckIntervalNs interval. +// Mark the imbalance event as completed and reschedule if necessary +if (result.response()) { +imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY; +} else { +imbalancedScheduled = ImbalanceSchedule.DEFERRED; +} + +// Note that rescheduling this event here is not required because MAYBE_BALANCE_PARTITION_LEADERS +// is a ControllerWriteEvent. ControllerWriteEvent always calls this method after the records +// generated by a ControllerWriteEvent have been applied. + +return result; +}); + +long delayNs = time.nanoseconds(); +if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) { +delay += leaderImbalanceCheckIntervalNs.getAsLong(); +} + +queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new EarliestDeadlineFunction(delayNs), event); + +imbalancedScheduled = ImbalanceSchedule.SCHEDULED; +} +} + +private void cancelMaybeBalancePartitionLeaders() { +queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS); Review comment: Yes. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829521940 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; - -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; -return false; +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} } -class BestLeader { -final int node; -final boolean unclean; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +// 1. Check if the election is not PREFERRED and we already have a valid leader +if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +// 2. Attempt preferred replica election +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} + +// 3. Preferred replica was not elected, only continue if the current leader is not a valid leader +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} + +// 4. Attempt to keep the partition online based on the ISR +Optional bestLeader = targetReplicas.stream() +.skip(1) +.filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) +.findFirst(); +if (bestLeader.isPresent()) { +return new ElectionResult(bestLeader.get(), false); +} + +if (election == Election.UNCLEAN) { +// 5. Attempt unclean leader election +Optional uncleanLeader = targetReplicas.stream() +.filter(replica -> isAcceptableLeader.apply(replica)) +.findFirst(); +if (uncleanLeader.isPresent()) { +return new ElectionResult(uncleanLeader.get(), true); } -this.node = NO_LEADER; -this.unclean = false; } + +return new ElectionResult(NO_LEADER, false); } private void tryElection(PartitionChangeRecord record) { -BestLeader bestLeader = new BestLeader(); -if (bestLeader.node != partition.leader) { -log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node); -record.setLeader(bestLeader.node); -if (bestLeader.unclean) { +ElectionResult electionResult = electLeader(); +if (electionResult.node != partition.leader) { +log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, electionResult.node); +record.setLeader(electionResult.node); +if (electionResult.unclean) { // If the election was unclean, we have to forcibly set the ISR to just the // new leader. This can re
[GitHub] [kafka] tchiotludo commented on pull request #11905: MINOR: Fix incorrect log for out-of-order KTable
tchiotludo commented on pull request #11905: URL: https://github.com/apache/kafka/pull/11905#issuecomment-1071509762 @showuon must be done I think ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13460) Issue reporting
[ https://issues.apache.org/jira/browse/KAFKA-13460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-13460: -- Component/s: (was: KafkaConnect) > Issue reporting > --- > > Key: KAFKA-13460 > URL: https://issues.apache.org/jira/browse/KAFKA-13460 > Project: Kafka > Issue Type: Wish >Affects Versions: 1.1.1 >Reporter: Mikolaj Ryll >Priority: Critical > Fix For: 1.0.3 > > > I would like to be able to report issue using github. Plx. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink
[ https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508423#comment-17508423 ] Chris Egerton commented on KAFKA-13601: --- [~dasarianil] I don't see how synchronous offset commits would guarantee exactly once. What if the worker dies in between the task writing data and its consumer committing an offset? > Add option to support sync offset commit in Kafka Connect Sink > -- > > Key: KAFKA-13601 > URL: https://issues.apache.org/jira/browse/KAFKA-13601 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Anil Dasari >Priority: Major > > Exactly once in s3 connector with scheduled rotation and field partitioner > can be achieved with consumer offset sync' commit after message batch flushed > to sink successfully > Currently, WorkerSinkTask committing the consumer offsets asynchronously and > at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354] > > Add config to allow user to select synchronous commit over > WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508386#comment-17508386 ] John Roesler edited comment on KAFKA-13714 at 3/17/22, 7:32 PM: Another local repro: {code:java} org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED java.lang.AssertionError: Result:StateQueryResult{partitionResults={ 0=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 1165952ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns ], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 116767ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns ], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) {code} logs: {code:java} [2022-03-17 07:31:56,286] INFO stream-client [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138] Kafka Streams version: test-version (org.apache.kafka.streams.KafkaStreams:912) [2022-03-17 07:31:56,286] INFO stream-client [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138] Kafka Streams commit ID: test-commit-ID (org.apache.kafka.streams.KafkaStreams:913) [2022-03-17 07:31:56,288] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:346) [2022-03-17 07:31:56,295] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1] Creating thread producer client (org.apache.kafka.streams.processor.internals.StreamThread:105) [2022-03-17 07:31:56,297] INFO [Producer clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer] Instantiated an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer:532) [2022-03-17 07:31:56,304] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:397) [2022-03-17 07:31:56,308] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-consumer] Cooperative rebalancing protocol is enabled now (org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration:126) [2022-03-17 07:31:56,308] INFO [Producer clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer] Cluster ID: iZBZzURBQr6rMZEB6oxg7g (org.apache.kafka.clients.Metadata:287) [2022-03-17 07:31:56,30
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829419392 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; - -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; -return false; +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} } -class BestLeader { -final int node; -final boolean unclean; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +// 1. Check if the election is not PREFERRED and we already have a valid leader +if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +// 2. Attempt preferred replica election +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} + +// 3. Preferred replica was not elected, only continue if the current leader is not a valid leader +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} + +// 4. Attempt to keep the partition online based on the ISR +Optional bestLeader = targetReplicas.stream() +.skip(1) +.filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) +.findFirst(); +if (bestLeader.isPresent()) { +return new ElectionResult(bestLeader.get(), false); +} + +if (election == Election.UNCLEAN) { +// 5. Attempt unclean leader election +Optional uncleanLeader = targetReplicas.stream() +.filter(replica -> isAcceptableLeader.apply(replica)) +.findFirst(); +if (uncleanLeader.isPresent()) { +return new ElectionResult(uncleanLeader.get(), true); } -this.node = NO_LEADER; -this.unclean = false; } + +return new ElectionResult(NO_LEADER, false); } private void tryElection(PartitionChangeRecord record) { -BestLeader bestLeader = new BestLeader(); -if (bestLeader.node != partition.leader) { -log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node); -record.setLeader(bestLeader.node); -if (bestLeader.unclean) { +ElectionResult electionResult = electLeader(); +if (electionResult.node != partition.leader) { +log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, electionResult.node); +record.setLeader(electionResult.node); +if (electionResult.unclean) { // If the election was unclean, we have to forcibly set the ISR to just the // new leader. This can re
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829414517 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; - -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; -return false; +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} } -class BestLeader { -final int node; -final boolean unclean; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +// 1. Check if the election is not PREFERRED and we already have a valid leader +if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +// 2. Attempt preferred replica election +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} + +// 3. Preferred replica was not elected, only continue if the current leader is not a valid leader +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} + +// 4. Attempt to keep the partition online based on the ISR +Optional bestLeader = targetReplicas.stream() +.skip(1) Review comment: We can skip the first replica because we already considered it in 2. or lines 144-148. This is a small optimization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829411491 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; - -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; -return false; +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} } -class BestLeader { -final int node; -final boolean unclean; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +// 1. Check if the election is not PREFERRED and we already have a valid leader +if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +// 2. Attempt preferred replica election +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} + +// 3. Preferred replica was not elected, only continue if the current leader is not a valid leader +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { Review comment: Yeah, this code may be difficult to read but I convinced myself that we need this. At this point we know this is true: preferred replica is unacceptable or not in the ISR. This check is saying that to minimize leader change we don't need to elect a new leader if the current leader is acceptable and in the ISR. Let me try to write an algorithm that is easier to read and see what we think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508389#comment-17508389 ] Ismael Juma commented on KAFKA-13752: - What is the impact of this bug? > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > {code:java} > Uuid.ZERO_UUID == new Uuid(0L, 0L){code} > is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > {code:java} > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one > separately and ensure the error is thrown. > List topics = Arrays.asList( > new > MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an > error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); > }) > ); > }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508386#comment-17508386 ] John Roesler commented on KAFKA-13714: -- Another local repro: {code:java} org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED java.lang.AssertionError: Result:StateQueryResult{partitionResults={ 0=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 1165952ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns ], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 116767ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns ], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) {code} > Flaky test IQv2StoreIntegrationTest > --- > > Key: KAFKA-13714 > URL: https://issues.apache.org/jira/browse/KAFKA-13714 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > > I have observed multiple consistency violations in the > IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's > apparently a major flaw in the feature, we should not release with this bug > outstanding. Depending on the time-table, we may want to block the release or > pull the feature until the next release. > > The first observation I have is from 23 Feb 2022. So far all observations > point to the range query in particular, and all observations have been for > RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the > windowed store built on RocksDB segments. > For reference, range queries were implemented on 16 Feb 2022: > [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884] > The window-specific range query test has also failed once that I have seen. > That feature was implemented on 2 Jan 2022: > [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c] > > Here are some stack traces I have seen: > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) > {code} > {code:java} > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 3]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQu
[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on pull request #11733: URL: https://github.com/apache/kafka/pull/11733#issuecomment-1071229122 > The PR looks good overall. I think there is one problem with the fetch validation. We are expecting that followers will detect the RECOVERED state through a `LeaderAndIsr` request from the controller. However, leaders/followers only accept`LeaderAndIsr` requests if there is an epoch bump, and that does not happen for `AlterPartition` requests. For KRaft, I think it is not a problem. You are correct. I removed the FETCH request validation looking at the leader recovery state and file this issue: https://issues.apache.org/jira/browse/KAFKA-13754 This is okay because at the moment the topic partition leader immediately marks the partition as recovered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13754) Follower should reject Fetch request while the leader is recovering
Jose Armando Garcia Sancio created KAFKA-13754: -- Summary: Follower should reject Fetch request while the leader is recovering Key: KAFKA-13754 URL: https://issues.apache.org/jira/browse/KAFKA-13754 Project: Kafka Issue Type: Task Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio In the PR for KIP-704 we removed leader recovery state validation from the FETCH. This is okay because the leader immediately recovers the partition. We should enable this validation before implementing log recovery from unclean leader election. The old implementation and test is in this commit: https://github.com/apache/kafka/pull/11733/commits/c7e54b8f6cef087deac119d61a46d3586ead72b9 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] C0urante commented on a change in pull request #11903: KAFKA-13497: Add trace logging to RegexRouter
C0urante commented on a change in pull request #11903: URL: https://github.com/apache/kafka/pull/11903#discussion_r829392255 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java ## @@ -57,7 +61,10 @@ public R apply(R record) { final Matcher matcher = regex.matcher(record.topic()); if (matcher.matches()) { final String topic = matcher.replaceFirst(replacement); +log.trace("Rerouting from topic '{}' to new topic '{}'", record.topic(), topic); Review comment: I don't see a reason to. If trace-level logging is disabled, the cost of this line will be two method invocations (one for `Record::topic`, which has a [trivial implementation](https://github.com/apache/kafka/blob/7afdb069bf5539ec404d9305239849ac35ad2d82/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L69-L71), and one for `Logger::trace`), plus a check to see if trace-level logging is enabled. No message formatting will take place. If we wrap this with `Logger::isTraceEnabled`, we'll reduce that to a single method invocation and check to see if trace-level logging is enabled. But if trace-level logging is enabled, we'd end up doubling the checks to see if the logger is enabled, and add an extra method invocation. There's a good writeup on the performance implications of this style of checking [here](https://logging.apache.org/log4j/1.2/manual.html#performance). It seems unlikely that any of this will have a significant impact on performance, especially considering that this same code path already contains a regex check a few lines above. There's also the existing logging in `Cast` that's mentioned in the description that takes place at trace level and doesn't have a guard for `Logger::isTraceEnabled`. Ultimately, if we're concerned about performance here, it'd probably be much more effective to cache the mappings of input topic -> output topic than to add guards around trace-level log statements that don't do involve any string concatenation or expensive `toString` calls. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election
junrao commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829355001 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } -boolean shouldTryElection() { -// If the new isr doesn't have the current leader, we need to try to elect a new -// one. Note: this also handles the case where the current leader is NO_LEADER, -// since that value cannot appear in targetIsr. -if (!targetIsr.contains(partition.leader)) return true; - -// Check if we want to try to get away from a non-preferred leader. -if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; +// VisibleForTesting +static class ElectionResult { +final int node; +final boolean unclean; -return false; +private ElectionResult(int node, boolean unclean) { +this.node = node; +this.unclean = unclean; +} } -class BestLeader { -final int node; -final boolean unclean; +// VisibleForTesting +/** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ +ElectionResult electLeader() { +// 1. Check if the election is not PREFERRED and we already have a valid leader +if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { +// Don't consider a new leader since the current leader meets all the constraints +return new ElectionResult(partition.leader, false); +} -BestLeader() { -for (int replica : targetReplicas) { -if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = false; -return; -} -} -if (uncleanElectionOk.get()) { -for (int replica : targetReplicas) { -if (isAcceptableLeader.apply(replica)) { -this.node = replica; -this.unclean = true; -return; -} -} +// 2. Attempt preferred replica election +int preferredReplica = targetReplicas.get(0); +if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { +return new ElectionResult(preferredReplica, false); +} + +// 3. Preferred replica was not elected, only continue if the current leader is not a valid leader +if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { Review comment: Is this step necessary given step 1 and 2? The election != Election.PREFERRED case is covered in step 1 and the other case seems covered by step 2. ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1197,6 +1266,25 @@ private void resetState() { */ private long newBytesSinceLastSnapshot = 0; +/** + * How long to delay partition leader balancing operations. + */ +private final OptionalLong leaderImbalanceCheckIntervalNs; + +private enum ImbalanceSchedule { +// The leader balancing operation has been scheduled +SCHEDULED, +// If the leader balancing operation should be schedued, schedule it with a delay +DEFERRED, +// If the leader balancing operation should be schedued, schdule it immediately Review comment: typo schedued and schdule ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } +private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders"; + +private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + +private void maybeScheduleNextBalancePartitionLeaders() { +if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED && +leaderImbalanceCheckIntervalNs.isPresent() && +replicationControl.arePartitionLeadersImbalanced()) { + +log.debug( +"Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", +MAYBE_BALANCE_PARTITION_LEADERS, +imbalancedScheduled, +leaderImbalanceCheckIntervalNs, +replicationControl.arePartitionLeadersImbalanced() +); + +
[jira] [Created] (KAFKA-13753) Log cleaner should transaction metadata in index until corresponding marker is removed
Jason Gustafson created KAFKA-13753: --- Summary: Log cleaner should transaction metadata in index until corresponding marker is removed Key: KAFKA-13753 URL: https://issues.apache.org/jira/browse/KAFKA-13753 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Currently the log cleaner will remove aborted transactions from the index as soon as it detects that the data from the transaction is gone. It does not wait until the corresponding marker has also been removed. Although it is extremely unlikely, it seems possible today that a Fetch might fail to return the aborted transaction metadata correctly if a log cleaning occurs concurrently. This is because the collection of aborted transactions is only done after the reading data from the log. It would be safer to preserve the aborted transaction metadata in the index until the marker is also removed. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac commented on a change in pull request #11903: KAFKA-13497: Add trace logging to RegexRouter
dajac commented on a change in pull request #11903: URL: https://github.com/apache/kafka/pull/11903#discussion_r82930 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java ## @@ -57,7 +61,10 @@ public R apply(R record) { final Matcher matcher = regex.matcher(record.topic()); if (matcher.matches()) { final String topic = matcher.replaceFirst(replacement); +log.trace("Rerouting from topic '{}' to new topic '{}'", record.topic(), topic); Review comment: Should we gate it with `log.isTraceEnabled()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11904: MINOR: Fix `ConsumerConfig.ISOLATION_LEVEL_DOC`
dajac commented on pull request #11904: URL: https://github.com/apache/kafka/pull/11904#issuecomment-1071105004 @guizmaii Good catch! Why did you target 3.1 branch? We should fix it in trunk and I can back port it to older branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13750) Client Compatability KafkaTest uses invalid idempotency configs
[ https://issues.apache.org/jira/browse/KAFKA-13750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13750. - Fix Version/s: 3.2.0 3.1.1 3.0.2 Resolution: Fixed > Client Compatability KafkaTest uses invalid idempotency configs > --- > > Key: KAFKA-13750 > URL: https://issues.apache.org/jira/browse/KAFKA-13750 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.2.0, 3.1.1, 3.0.2 > > > With the switch to idempotency as a default, some of our tests broke > including > ClientCompatibilityFeaturesTest.run_compatibility_test for versions prior to > 0.11 where EOS was enabled. We need to configure the producer correctly for > these earlier versions. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac merged pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs
dajac merged pull request #11909: URL: https://github.com/apache/kafka/pull/11909 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs
dajac commented on pull request #11909: URL: https://github.com/apache/kafka/pull/11909#issuecomment-1071090738 We don't need to wait for the build as this is a python change. Merging to trunk, 3.1 and 3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11906: MINOR: Doc updates for Kafka 3.0.1
dajac commented on pull request #11906: URL: https://github.com/apache/kafka/pull/11906#issuecomment-1071088775 @mimaison Thanks. Do we need to update the doc in the 3.0 branch as well? Otherwise, we will miss the update that you did if we would ever release 3.0.2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13509) Support max timestamp in GetOffsetShell
[ https://issues.apache.org/jira/browse/KAFKA-13509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13509. - Reviewer: David Jacot Resolution: Fixed > Support max timestamp in GetOffsetShell > --- > > Key: KAFKA-13509 > URL: https://issues.apache.org/jira/browse/KAFKA-13509 > Project: Kafka > Issue Type: Sub-task > Components: tools >Reporter: dengziming >Assignee: dengziming >Priority: Major > Fix For: 3.2.0 > > > We would list offset with max timestamp using `kafka.tools.GetOffsetShell` : > ``` > bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server > localhost:9092 --topic topic1 --time -3 > ``` > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac merged pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell
dajac merged pull request #11173: URL: https://github.com/apache/kafka/pull/11173 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #11914: MINOR: Correct Connect docs on connector/task states
C0urante opened a new pull request #11914: URL: https://github.com/apache/kafka/pull/11914 The `DESTROYED` state is represented internally as a tombstone record when running in distributed mode ([1]) and by the removal of the connector/task from the in-memory status map when running in standalone mode ([2], [3]). As a result, it will never appear to users of the REST API, and we should remove mention of it from our docs so that developers creating tooling against the REST API don't write unnecessary logic to account for that state. [1] - https://github.com/apache/kafka/blob/3dacdc5694da5db283524889d2270695defebbaa/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java#L318 [2] - https://github.com/apache/kafka/blob/3dacdc5694da5db283524889d2270695defebbaa/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java#L64-L65 [3] - https://github.com/apache/kafka/blob/3dacdc5694da5db283524889d2270695defebbaa/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java#L77-L78 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs
dajac commented on pull request #11909: URL: https://github.com/apache/kafka/pull/11909#issuecomment-1071077188 @jolshan Thanks. Do we need to backport it to older branches? I guess that we need it in 3.1 and 3.0, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell
dajac commented on pull request #11173: URL: https://github.com/apache/kafka/pull/11173#issuecomment-1071074769 The build looks good [here](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11173/18/pipeline). I am not sure why github still shows it in progress. It might be due to the outage that they had today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13404) Kafka sink connectors do not commit offset correctly if messages are produced in transaction
[ https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508282#comment-17508282 ] Chris Egerton commented on KAFKA-13404: --- [~yujhe.li] can you clarify the impact of this behavior? Initially I was wondering if this might lead to duplicate record delivery by resuming from an earlier offset than where the consumer last read from, but if the only record between that offset and where the consumer read from is a control record, it won't be delivered to the sink task anyways. The only other practical consequence I can think of is that the consumer lag for the connector may be off by one (see KAFKA-6607 for a similar issue in Kafka Streams). Is that the problem here, or is there something else? > Kafka sink connectors do not commit offset correctly if messages are produced > in transaction > > > Key: KAFKA-13404 > URL: https://issues.apache.org/jira/browse/KAFKA-13404 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.1 >Reporter: Yu-Jhe Li >Priority: Major > Attachments: Main.scala > > > The Kafka sink connectors don't commit offset to the latest log-end offset if > the messages are produced in a transaction. > From the code of [WorkerSinkTask.java|#L467], we found that the sink > connector gets offset from messages and commits it to Kafka after the > messages are processed successfully. But for messages produced in the > transaction, there are additional record [control > batches|http://kafka.apache.org/documentation/#controlbatch] that are used to > indicate the transaction is successful or aborted. > > You can reproduce it by running `connect-file-sink` with the following > properties: > {noformat} > /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties > /connect-file-sink.properties{noformat} > {code:java} > # connect-standalone.properties > bootstrap.servers=localhost:9092 > key.converter=org.apache.kafka.connect.storage.StringConverter > value.converter=org.apache.kafka.connect.storage.StringConverter > key.converter.schemas.enable=true > value.converter.schemas.enable=true > # for testing > offset.flush.interval.ms=1 > consumer.isolation.level=read_committed > consumer.auto.offset.reset=none > {code} > {code:java} > # connect-file-sink.properties > name=local-file-sink > connector.class=FileStreamSink > tasks.max=1 > file=/tmp/test.sink.txt > topics=test{code} > And use the attached Java producer ([^Main.scala] to produce 10 messages to > the `test` topic in a transaction. > You can see that the topic log-end offset is 11 now and the last record in > the segment file is control batches. But the consumer group offset is still > in 10. (If the record is deleted by topic retention, you will get > OffsetOutOfRange exception after restart the connector) > {code:java} > bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server > kafka1:9092 --group connect-local-file-sink --describe > GROUP TOPIC PARTITION CURRENT-OFFSET > LOG-END-OFFSET LAG CONSUMER-ID > HOSTCLIENT-ID > > connect-local-file-sink test0 10 11 > 1 > connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d > /172.21.0.3 connector-consumer-local-file-sink-0 > bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments > --files /kafka/test-0/.log --print-data-log > Dumping /kafka/test-0/.log > Starting offset: 0 > baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 > producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: > true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: > 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 > isvalid: true > | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0 > headerKeys: [] payload: {"value": "banana", "time": 1634805907} > | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1 > headerKeys: [] payload: {"value": "banana", "time": 1634805907} > | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2 > headerKeys: [] payload: {"value": "ice", "time": 1634805907} > | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3 > headerKeys: [] payload: {"value": "apple", "time": 1634805907} > | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4 > headerKeys: [] payload: {"value": "home", "time": 1634805907} > | offset: 5 Cre
[GitHub] [kafka] mimaison merged pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file
mimaison merged pull request #11471: URL: https://github.com/apache/kafka/pull/11471 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13136) kafka-connect task.max : active task in consumer group is limited by the bigger topic to consume
[ https://issues.apache.org/jira/browse/KAFKA-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-13136. --- Resolution: Fixed > kafka-connect task.max : active task in consumer group is limited by the > bigger topic to consume > > > Key: KAFKA-13136 > URL: https://issues.apache.org/jira/browse/KAFKA-13136 > Project: Kafka > Issue Type: Bug >Reporter: raphaelauv >Priority: Major > > In kafka-connect 2.7 > *The maximum number of active task for a sink connector is equal to the topic > with the biggest number of partitions to consume* > An active task is a task with partitions attributed in the consumer-group of > the sink connector > example : > With 2 topics where each have 10 partitions ( 20 partitions in total ) > The maximum number of active task is 10 ( if I set task.max at 12 ,there is > 10 members of the consumer group consuming partitions and 2 members in the > consumer-group that do not have partitions to consume). > If I add a third topic with 15 partitions to the connector conf then the 12 > members of the consumer group are consuming partitions, and then if I set now > task.max at 17 only 15 members are active in the consumer-group. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508270#comment-17508270 ] Xiaobing Fang commented on KAFKA-13752: --- [~jolshan] I have done it in PR, PTAL. > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > {code:java} > Uuid.ZERO_UUID == new Uuid(0L, 0L){code} > is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > {code:java} > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one > separately and ensure the error is thrown. > List topics = Arrays.asList( > new > MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an > error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); > }) > ); > }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] fxbing commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java
fxbing commented on pull request #11912: URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071008981 > done, PTAL. @dengziming -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508255#comment-17508255 ] Justine Olshan edited comment on KAFKA-13752 at 3/17/22, 3:49 PM: -- Ah, so I think the line that is causing an issue is: {code:java} if (topic.topicId() != Uuid.ZERO_UUID && version < 12) throw new UnsupportedVersionException("MetadataRequest version " + version + " does not support non-zero topic IDs.");{code} We should change this to !equals. was (Author: jolshan): Ah, so I think the line that is causing an issue is: ``` if (topic.topicId() != Uuid.ZERO_UUID && version < 12) throw new UnsupportedVersionException("MetadataRequest version " + version + " does not support non-zero topic IDs."); ``` We should change this to !equals. > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > {code:java} > Uuid.ZERO_UUID == new Uuid(0L, 0L){code} > is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > {code:java} > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one > separately and ensure the error is thrown. > List topics = Arrays.asList( > new > MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an > error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); > }) > ); > }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508256#comment-17508256 ] Ron Craig commented on KAFKA-9366: -- Perfect! Thanks, [~dongjin] . > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508255#comment-17508255 ] Justine Olshan commented on KAFKA-13752: Ah, so I think the line that is causing an issue is: ``` if (topic.topicId() != Uuid.ZERO_UUID && version < 12) throw new UnsupportedVersionException("MetadataRequest version " + version + " does not support non-zero topic IDs."); ``` We should change this to !equals. > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > {code:java} > Uuid.ZERO_UUID == new Uuid(0L, 0L){code} > is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > {code:java} > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one > separately and ensure the error is thrown. > List topics = Arrays.asList( > new > MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an > error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); > }) > ); > }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] jolshan commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs
jolshan commented on pull request #11909: URL: https://github.com/apache/kafka/pull/11909#issuecomment-1070978591 Here is a link to a successful run: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-03-17--001.system-test-kafka-branch-builder--1647476641--jolshan--KAFKA-13750--4447ebdb4c/report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508199#comment-17508199 ] Dongjin Lee commented on KAFKA-9366: [~roncraig] Sorry for being late. the PR is now updated with log4j2 2.17.2. > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dengziming commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java
dengziming commented on pull request #11912: URL: https://github.com/apache/kafka/pull/11912#issuecomment-1070947835 Thank you for this PR, can you also add a test case in `MetadataRequestTest.testTopicIdAndNullTopicNameRequests`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11913: MINOR: Remove scala KafkaException
dengziming commented on pull request #11913: URL: https://github.com/apache/kafka/pull/11913#issuecomment-1070870414 We can safely remove this class since we have removed kafka.security.auth.Authorizer in #10450 , ping @ijuma to have a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #11913: MINOR: Remove scala KafkaException
dengziming opened a new pull request #11913: URL: https://github.com/apache/kafka/pull/11913 *More detailed description of your change* We use org.apache.kafka.common.KafkaException instead of kafka.common.KafkaException *Summary of testing strategy (including rationale)* QA ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9542) ZSTD Compression Not Working
[ https://issues.apache.org/jira/browse/KAFKA-9542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508163#comment-17508163 ] Olumide Ajiboye commented on KAFKA-9542: Hi, I just upgraded to `quay.io/strimzi/kafka:0.28.0-kafka-3.1.0` from `quay.io/strimzi/kafka:0.27.1-kafka-3.0.0` and I am observing this same issue. Other compression formats are okay except zstd. Best regards > ZSTD Compression Not Working > > > Key: KAFKA-9542 > URL: https://issues.apache.org/jira/browse/KAFKA-9542 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 2.3.0 > Environment: Linux, CentOS >Reporter: Prashant >Priority: Critical > > I enabled zstd compression at producer by adding "compression.type=zstd" in > producer config. When try to run it, producer fails with > "org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request" > In Broker Logs, I could find following exception: > > [2020-02-12 11:48:04,623] ERROR [ReplicaManager broker=1] Error processing > append operation on partition load_logPlPts-6 (kafka.server.ReplicaManager) > org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could > not initialize class > org.apache.kafka.common.record.CompressionType$ZstdConstructors > at > org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:133) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257) > at > org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:324) > at > scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:269) > at > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:261) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:72) > at kafka.log.Log$$anonfun$append$2.liftedTree1$1(Log.scala:869) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:850) > at kafka.log.Log.maybeHandleIOException(Log.scala:2065) > at kafka.log.Log.append(Log.scala:850) > at kafka.log.Log.appendAsLeader(Log.scala:819) > at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:771) > at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:759) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > > This is fresh broker installed on "CentOS Linux" v7. This doesn't seem to be > a classpath issue as same package is working on MacOS. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dengziming commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell
dengziming commented on pull request #11173: URL: https://github.com/apache/kafka/pull/11173#issuecomment-1070856842 Thank you @dajac for your efforts, I reworded the error message according to your suggestions, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell
dengziming commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r829057799 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -103,59 +104,77 @@ object GetOffsetShell { throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions") } -val listOffsetsTimestamp = options.valueOf(timeOpt).longValue +val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt)) val topicPartitionFilter = if (options.has(topicPartitionsOpt)) { - createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), excludeInternalTopics) + createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt)) } else { - val partitionIdsRequested = createPartitionSet(options.valueOf(partitionsOpt)) - createTopicPartitionFilterWithTopicAndPartitionPattern( if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, -excludeInternalTopics, -partitionIdsRequested +options.valueOf(partitionsOpt) ) } val config = if (options.has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties -config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) -config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) -val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) +config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) +config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) +val adminClient = Admin.create(config) try { - val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter) + val partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics) if (partitionInfos.isEmpty) { throw new IllegalArgumentException("Could not match any topic-partitions with the specified filters") } - val topicPartitions = partitionInfos.flatMap { p => -if (p.leader == null) { - System.err.println(s"Error: topic-partition ${p.topic}:${p.partition} does not have a leader. Skip getting offsets") - None -} else - Some(new TopicPartition(p.topic, p.partition)) - } + val timestampsToSearch = partitionInfos.map(tp => tp -> offsetSpec).toMap.asJava - /* Note that the value of the map can be null */ - val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { -case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala -case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala -case _ => - val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava - consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) => -if (x == null) (k, null) else (k, x.offset: java.lang.Long) - } + val listOffsetsResult = adminClient.listOffsets(timestampsToSearch) + val partitionOffsets = partitionInfos.flatMap { tp => +try { + val partitionInfo = listOffsetsResult.partitionResult(tp).get + Some((tp, partitionInfo.offset)) +} catch { + case e: ExecutionException => +e.getCause match { + case _: LeaderNotAvailableException => +System.err.println(s"Skip getting offsets for: topic-partition ${tp.topic}:${tp.partition} since it does not have a leader right now.") + case _ => +System.err.println(s"Error while getting offset for topic-partition ${tp.topic}:${tp.partition}") +e.printStackTrace() Review comment: Yeah, for AdminClient LeaderNotAvailableException is the same as other KafkaException so the 2 branches can be merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaobing Fang updated KAFKA-13752: -- Description: {code:java} Uuid.ZERO_UUID == new Uuid(0L, 0L){code} is true in scala, but in java is false. So this test run sccessfully. Is this the expected situation?? {code:java} @Test public void testTopicIdAndNullTopicNameRequests() { // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. List topics = Arrays.asList( new MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new Uuid(0L, 0L))); // if version is 10 or 11, the invalid topic metadata should return an error List invalidVersions = Arrays.asList((short) 10, (short) 11); invalidVersions.forEach(version -> topics.forEach(topic -> { MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); }) ); }{code} was: {code:java} Uuid.ZERO_UUID == new Uuid(0L, 0L){code} is true in scala, but in java is false. So this test run sccessfully. Is this the expected situation?? {code:java} @Test public void testTopicIdAndNullTopicNameRequests() { // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. List topics = Arrays.asList( new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new Uuid(0L, 0L))); // if version is 10 or 11, the invalid topic metadata should return an error List invalidVersions = Arrays.asList((short) 10, (short) 11); invalidVersions.forEach(version -> topics.forEach(topic -> { MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); }) ); }{code} > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > {code:java} > Uuid.ZERO_UUID == new Uuid(0L, 0L){code} > is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > {code:java} > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one > separately and ensure the error is thrown. > List topics = Arrays.asList( > new > MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an > error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); > }) > ); > }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] fxbing opened a new pull request #11912: KAFKA-13752: Uuid compare using equals in java
fxbing opened a new pull request #11912: URL: https://github.com/apache/kafka/pull/11912 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508157#comment-17508157 ] Ismael Juma commented on KAFKA-13752: - cc [~jolshan] > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > {code:java} > Uuid.ZERO_UUID == new Uuid(0L, 0L){code} > is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > {code:java} > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one > separately and ensure the error is thrown. > List topics = Arrays.asList( > new > MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an > error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); > }) > ); > }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaobing Fang updated KAFKA-13752: -- Description: {code:java} Uuid.ZERO_UUID == new Uuid(0L, 0L){code} is true in scala, but in java is false. So this test run sccessfully. Is this the expected situation?? {code:java} @Test public void testTopicIdAndNullTopicNameRequests() { // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. List topics = Arrays.asList( new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new Uuid(0L, 0L))); // if version is 10 or 11, the invalid topic metadata should return an error List invalidVersions = Arrays.asList((short) 10, (short) 11); invalidVersions.forEach(version -> topics.forEach(topic -> { MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); }) ); }{code} was: `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. So this test run sccessfully. Is this the expected situation?? ``` @Test public void testTopicIdAndNullTopicNameRequests() { // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. List topics = Arrays.asList( new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new Uuid(0L, 0L))); // if version is 10 or 11, the invalid topic metadata should return an error List invalidVersions = Arrays.asList((short) 10, (short) 11); invalidVersions.forEach(version -> topics.forEach(topic -> { MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); } ) ); } ``` > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > {code:java} > Uuid.ZERO_UUID == new Uuid(0L, 0L){code} > is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > {code:java} > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one > separately and ensure the error is thrown. > List topics = Arrays.asList( > new > MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an > error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); > }) > ); > }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaobing Fang updated KAFKA-13752: -- Description: `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. So this test run sccessfully. Is this the expected situation?? ``` @Test public void testTopicIdAndNullTopicNameRequests() { // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. List topics = Arrays.asList( new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new Uuid(0L, 0L))); // if version is 10 or 11, the invalid topic metadata should return an error List invalidVersions = Arrays.asList((short) 10, (short) 11); invalidVersions.forEach(version -> topics.forEach(topic -> { MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); } ) ); } ``` was: `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. So this test run sccessfully. Is it excepted ? ``` @Test public void testTopicIdAndNullTopicNameRequests() { // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. List topics = Arrays.asList( new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new Uuid(0L, 0L))); // if version is 10 or 11, the invalid topic metadata should return an error List invalidVersions = Arrays.asList((short) 10, (short) 11); invalidVersions.forEach(version -> topics.forEach(topic -> { MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); }) ); } > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. > > So this test run sccessfully. Is this the expected situation?? > ``` > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one separately > and ensure the error is thrown. > List topics = Arrays.asList( > new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> > { MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> > builder.build(version)); } > ) > ); > } > ``` -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
[ https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaobing Fang updated KAFKA-13752: -- Description: `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. So this test run sccessfully. Is it excepted ? ``` @Test public void testTopicIdAndNullTopicNameRequests() { // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. List topics = Arrays.asList( new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new Uuid(0L, 0L))); // if version is 10 or 11, the invalid topic metadata should return an error List invalidVersions = Arrays.asList((short) 10, (short) 11); invalidVersions.forEach(version -> topics.forEach(topic -> { MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); }) ); } was:`Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. > Using `equals` instead of `==` when Uuid compare in Java > > > Key: KAFKA-13752 > URL: https://issues.apache.org/jira/browse/KAFKA-13752 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. > > So this test run sccessfully. Is it excepted ? > ``` > @Test > public void testTopicIdAndNullTopicNameRequests() { > // Construct invalid MetadataRequestTopics. We will build each one separately > and ensure the error is thrown. > List topics = Arrays.asList( > new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new > Uuid(0L, 0L))); > // if version is 10 or 11, the invalid topic metadata should return an error > List invalidVersions = Arrays.asList((short) 10, (short) 11); > invalidVersions.forEach(version -> > topics.forEach(topic -> { > MetadataRequestData metadataRequestData = new > MetadataRequestData().setTopics(Collections.singletonList(topic)); > MetadataRequest.Builder builder = new > MetadataRequest.Builder(metadataRequestData); > assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); > }) > ); > } -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dengziming commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file
dengziming commented on a change in pull request #11471: URL: https://github.com/apache/kafka/pull/11471#discussion_r829040670 ## File path: connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java ## @@ -73,7 +75,6 @@ public void teardown() { } private void replay() { Review comment: Yeah, we have been stuck into the old code, this is the simplest way. 😂, Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java
Xiaobing Fang created KAFKA-13752: - Summary: Using `equals` instead of `==` when Uuid compare in Java Key: KAFKA-13752 URL: https://issues.apache.org/jira/browse/KAFKA-13752 Project: Kafka Issue Type: Improvement Components: clients Reporter: Xiaobing Fang `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mimaison commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file
mimaison commented on a change in pull request #11471: URL: https://github.com/apache/kafka/pull/11471#discussion_r829001380 ## File path: connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java ## @@ -73,7 +75,6 @@ public void teardown() { } private void replay() { Review comment: I wonder if it would be simpler to get rid of the `replay()` method and just call `verifyAll()` in the tests that need it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs
dajac commented on pull request #11909: URL: https://github.com/apache/kafka/pull/11909#issuecomment-1070707701 @jolshan Would you have a link to a successful run of the tests with your change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell
dajac commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r828928713 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -103,59 +104,77 @@ object GetOffsetShell { throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions") } -val listOffsetsTimestamp = options.valueOf(timeOpt).longValue +val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt)) val topicPartitionFilter = if (options.has(topicPartitionsOpt)) { - createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), excludeInternalTopics) + createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt)) } else { - val partitionIdsRequested = createPartitionSet(options.valueOf(partitionsOpt)) - createTopicPartitionFilterWithTopicAndPartitionPattern( if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, -excludeInternalTopics, -partitionIdsRequested +options.valueOf(partitionsOpt) ) } val config = if (options.has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties -config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) -config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) -val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) +config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) +config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) +val adminClient = Admin.create(config) try { - val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter) + val partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics) if (partitionInfos.isEmpty) { throw new IllegalArgumentException("Could not match any topic-partitions with the specified filters") } - val topicPartitions = partitionInfos.flatMap { p => -if (p.leader == null) { - System.err.println(s"Error: topic-partition ${p.topic}:${p.partition} does not have a leader. Skip getting offsets") - None -} else - Some(new TopicPartition(p.topic, p.partition)) - } + val timestampsToSearch = partitionInfos.map(tp => tp -> offsetSpec).toMap.asJava - /* Note that the value of the map can be null */ - val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { -case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala -case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala -case _ => - val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava - consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) => -if (x == null) (k, null) else (k, x.offset: java.lang.Long) - } + val listOffsetsResult = adminClient.listOffsets(timestampsToSearch) + val partitionOffsets = partitionInfos.flatMap { tp => +try { + val partitionInfo = listOffsetsResult.partitionResult(tp).get + Some((tp, partitionInfo.offset)) +} catch { + case e: ExecutionException => +e.getCause match { + case _: LeaderNotAvailableException => +System.err.println(s"Skip getting offsets for: topic-partition ${tp.topic}:${tp.partition} since it does not have a leader right now.") + case _ => +System.err.println(s"Error while getting offset for topic-partition ${tp.topic}:${tp.partition}") +e.printStackTrace() Review comment: We should not print the stack trace like this with `e.printStackTrace()`. It would be better to include the error name or the error message in the `System.err.println`. How about having a generic error for all `KafkaException`? Something like `Error: Skip getting offsets for topic-partition ${p.topic}:${p.partition due to: ${e.getMessage}. If we would get any non `KafkaException` in the cause, we should throw if further, in my opinion. I have not sure if it can really happen but let's be on the safe side. ## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ## @@ -109,6 +111,61 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging { ) } + @ParameterizedTest + @ValueSource(strings = Array("-1", "latest")) + def testGetLatestOffsets(time: String): Unit = { +val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) +assertEquals( + List( +("topic1", 0, Some(1)), +("topic2", 0,
[jira] [Updated] (KAFKA-13751) On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
[ https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RivenSun updated KAFKA-13751: - Reviewer: Ismael Juma (was: Luke Chen) > On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms > > > Key: KAFKA-13751 > URL: https://issues.apache.org/jira/browse/KAFKA-13751 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 3.0.1 >Reporter: RivenSun >Priority: Critical > > h1. Phenomenon: > SASL/OAUTHBEARER, whether implemented by default or customized by the user, > is not compatible with other SASL mechanisms. > h3. > case1: > kafka_server_jaas_oauth.conf > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="admin" > user_admin="admin" > user_alice="alice"; >org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required; >org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin" > password="admin_scram"; > }; {code} > server.properties > {code:java} > advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669 > > listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code} > Error when starting kafka: > server.log > {code:java} > [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Must supply exactly 1 non-null JAAS mechanism configuration (size was 3) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172) > at kafka.network.Processor.(SocketServer.scala:724) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:367) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211) > at kafka.network.SocketServer.startup(SocketServer.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:266) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null > JAAS mechanism configuration (size was 3) > at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139) > ... 17 more > [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down > (kafka.server.KafkaServer) > [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket > server request processors (kafka.network.SocketServer) {code} > The default implementation class of oauthbearer's > `sasl.server.callback.handler.class` is > OAuthBearerUnsecuredValidatorCallbackHandler. > In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, > the jaasConfigEntries parameter is verified. > What I want to say is that {*}the verification logic here is completely > reasonable{*}, but the jaasConfigEntries passed in from the upper layer > should not contain the AppConfigurationEntry of other loginModules. There are > several other codes for the check of the same keyword *"Must supply exactly 1 > non-null JAAS mechanism configuration".* > Rootcause elaborates later. > By the way, at present, KafkaServer allows {*}the same LoginModule to be > configured multiple times in kafkaJaasConfigFile{*}, which will also lead to > the phenomenon of case1. > kafka_server_jaas_oauth.conf eg: > {code:java} > KafkaServer { >org.apache.kafka.common.security.oauthbearer.OAu
[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect
[ https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508083#comment-17508083 ] RivenSun commented on KAFKA-13689: -- Hi [~showuon] [~guozhang] After rethinking the discussion in PR-11800, luke, I think it might be simpler as you said at the beginning to make the log output a bit more informative, and we don't need a KIP for that. AbstractConfig#logUnused() {code:java} public void logUnused() { for (String key : unused()) log.warn("The configuration '{}' was supplied but is not used. It could because it is not a known config," + " or some components are not enabled.", key); } {code} WDYT? Thanks. > AbstractConfig log print information is incorrect > - > > Key: KAFKA-13689 > URL: https://issues.apache.org/jira/browse/KAFKA-13689 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 3.0.0 >Reporter: RivenSun >Assignee: RivenSun >Priority: Major > Fix For: 3.2.0 > > > h1. 1.Example > KafkaClient version is 3.1.0, KafkaProducer init properties: > > {code:java} > Properties props = new Properties(); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code} > > > Partial log of KafkaProducer initialization: > {code:java} > ssl.truststore.location = C:\Personal > File\documents\KafkaSSL\client.truststore.jks > ssl.truststore.password = [hidden] > ssl.truststore.type = JKS > transaction.timeout.ms = 60003 > transactional.id = null > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer[main] INFO > org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully > logged in. > [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The > configuration 'transaction.timeout.ms' was supplied but isn't a known config. > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 37edeed0777bacb3 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1645602332999 {code} > From the above log, you can see that KafkaProducer has applied the user's > configuration, {*}transaction.timeout.ms=60003{*}, the default value of this > configuration is 6. > But we can see another line of log: > [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The > configuration *'transaction.timeout.ms'* was supplied but isn't a > *{color:#ff}known{color}* config. > > h1. 2.RootCause: > 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}. > So the configurations related to the KafkaProducer transaction will not be > requested. > See the source code: KafkaProducer#configureTransactionState(...) . > 2) AbstractConfig#logUnused() -> AbstractConfig#unused() > {code:java} > public Set unused() { > Set keys = new HashSet<>(originals.keySet()); > keys.removeAll(used); > return keys; > } {code} > If a configuration has not been requested, the configuration will not be put > into the used variable. SourceCode see as below: > AbstractConfig#get(String key) > > {code:java} > protected Object get(String key) { > if (!values.containsKey(key)) > throw new ConfigException(String.format("Unknown configuration '%s'", > key)); > used.add(key); > return values.get(key); > } {code} > h1. > h1. Solution: > 1. AbstractConfig#logUnused() method > Modify the log printing information of this method,and the unused > configuration log print level can be changed to {*}INFO{*}, what do you think? > {code:java} > /** > * Log infos for any unused configurations > */ > public void logUnused() { for (String key : unused()) > log.info("The configuration '{}' was supplied but isn't a used > config.", key); > }{code} > > > 2. AbstractConfig provides two new methods: logUnknown() and unknown() > {code:java} > /** > * Log warnings for any unknown configurations > */ > public void logUnknown() { > for (String key : unknown()) > log.warn("The configuration '{}' was supplied but isn't a known > config.", key); > } {code} > > {code:java} > public Set unknown() { > Set keys = new HashSet<>(originals.keySet()); > keys.removeAll(values.keySet()); > return keys; > } {code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] RivenSun2 commented on pull request #11911: KAFKA-13463: Make pause behavior consistent between cooperative and eager protocols
RivenSun2 commented on pull request #11911: URL: https://github.com/apache/kafka/pull/11911#issuecomment-1070603378 Hi @guozhangwang and @showuon please help to review the PR . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 opened a new pull request #11911: KAFKA-13463: Make pause behavior consistent between cooperative and eager protocols
RivenSun2 opened a new pull request #11911: URL: https://github.com/apache/kafka/pull/11911 Make pause behavior consistent between cooperative and eager protocols -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #11910: KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode
dengziming opened a new pull request #11910: URL: https://github.com/apache/kafka/pull/11910 *More detailed description of your change* In zk mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric names, we should implement this in KRaft mode. Add an itcase in TopicCommandIntegrationTest and change TopicCommandIntegrationTest to support KRaft mode. *Summary of testing strategy (including rationale)* Added a unit test, and also an integration test case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-13740) kafka-stream-client-shutdown
[ https://issues.apache.org/jira/browse/KAFKA-13740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna closed KAFKA-13740. - > kafka-stream-client-shutdown > > > Key: KAFKA-13740 > URL: https://issues.apache.org/jira/browse/KAFKA-13740 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0 >Reporter: Prashanth Joseph Babu >Priority: Major > > I have an apache kafka streams application . I notice that it sometimes > shutsdown when a rebalancing occurs with no real reason for the shutdown . It > doesn't even throw an exception. > Here are some logs on the same > {code:java} > [2022-03-08 17:13:37,024] INFO [Consumer > clientId=svc-stream-collector-StreamThread-1-consumer, > groupId=svc-stream-collector] Adding newly assigned partitions: > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [2022-03-08 17:13:37,024] ERROR stream-thread > [svc-stream-collector-StreamThread-1] A Kafka Streams client in this Kafka > Streams application is requesting to shutdown the application > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,030] INFO stream-client [svc-stream-collector] State > transition from REBALANCING to PENDING_ERROR > (org.apache.kafka.streams.KafkaStreams) > old state:REBALANCING new state:PENDING_ERROR > [2022-03-08 17:13:37,031] INFO [Consumer > clientId=svc-stream-collector-StreamThread-1-consumer, > groupId=svc-stream-collector] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2022-03-08 17:13:37,032] INFO stream-thread > [svc-stream-collector-StreamThread-1] Informed to shut down > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,032] INFO stream-thread > [svc-stream-collector-StreamThread-1] State transition from > PARTITIONS_REVOKED to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,067] INFO stream-thread > [svc-stream-collector-StreamThread-1] Thread state is already > PENDING_SHUTDOWN, skipping the run once call after poll request > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,067] WARN stream-thread > [svc-stream-collector-StreamThread-1] Detected that shutdown was requested. > All clients in this app will now begin to shutdown > (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > I'm suspecting its because there are no `newly assigned partitions in the log > below` > {code:java} > [2022-03-08 17:13:37,024] INFO [Consumer > clientId=svc-stream-collector-StreamThread-1-consumer, > groupId=svc-stream-collector] Adding newly assigned partitions: > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > {code} > However I'm not exactly sure why this error occurs . Any help would be > appreciated. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13740) kafka-stream-client-shutdown
[ https://issues.apache.org/jira/browse/KAFKA-13740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508043#comment-17508043 ] Bruno Cadonna commented on KAFKA-13740: --- [~prashanthjbabu] Glad that you could sort it out! > kafka-stream-client-shutdown > > > Key: KAFKA-13740 > URL: https://issues.apache.org/jira/browse/KAFKA-13740 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0 >Reporter: Prashanth Joseph Babu >Priority: Major > > I have an apache kafka streams application . I notice that it sometimes > shutsdown when a rebalancing occurs with no real reason for the shutdown . It > doesn't even throw an exception. > Here are some logs on the same > {code:java} > [2022-03-08 17:13:37,024] INFO [Consumer > clientId=svc-stream-collector-StreamThread-1-consumer, > groupId=svc-stream-collector] Adding newly assigned partitions: > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [2022-03-08 17:13:37,024] ERROR stream-thread > [svc-stream-collector-StreamThread-1] A Kafka Streams client in this Kafka > Streams application is requesting to shutdown the application > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,030] INFO stream-client [svc-stream-collector] State > transition from REBALANCING to PENDING_ERROR > (org.apache.kafka.streams.KafkaStreams) > old state:REBALANCING new state:PENDING_ERROR > [2022-03-08 17:13:37,031] INFO [Consumer > clientId=svc-stream-collector-StreamThread-1-consumer, > groupId=svc-stream-collector] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2022-03-08 17:13:37,032] INFO stream-thread > [svc-stream-collector-StreamThread-1] Informed to shut down > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,032] INFO stream-thread > [svc-stream-collector-StreamThread-1] State transition from > PARTITIONS_REVOKED to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,067] INFO stream-thread > [svc-stream-collector-StreamThread-1] Thread state is already > PENDING_SHUTDOWN, skipping the run once call after poll request > (org.apache.kafka.streams.processor.internals.StreamThread) > [2022-03-08 17:13:37,067] WARN stream-thread > [svc-stream-collector-StreamThread-1] Detected that shutdown was requested. > All clients in this app will now begin to shutdown > (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > I'm suspecting its because there are no `newly assigned partitions in the log > below` > {code:java} > [2022-03-08 17:13:37,024] INFO [Consumer > clientId=svc-stream-collector-StreamThread-1-consumer, > groupId=svc-stream-collector] Adding newly assigned partitions: > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > {code} > However I'm not exactly sure why this error occurs . Any help would be > appreciated. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13136) kafka-connect task.max : active task in consumer group is limited by the bigger topic to consume
[ https://issues.apache.org/jira/browse/KAFKA-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508032#comment-17508032 ] raphaelauv edited comment on KAFKA-13136 at 3/17/22, 8:01 AM: -- thank you very much for that explanation , it's very clear :+1 yes we can close that ticket, thank you was (Author: raphaelauv): thank you very much for that explaination , it's very clear :+1 yes we can close that ticket, thank you > kafka-connect task.max : active task in consumer group is limited by the > bigger topic to consume > > > Key: KAFKA-13136 > URL: https://issues.apache.org/jira/browse/KAFKA-13136 > Project: Kafka > Issue Type: Bug >Reporter: raphaelauv >Priority: Major > > In kafka-connect 2.7 > *The maximum number of active task for a sink connector is equal to the topic > with the biggest number of partitions to consume* > An active task is a task with partitions attributed in the consumer-group of > the sink connector > example : > With 2 topics where each have 10 partitions ( 20 partitions in total ) > The maximum number of active task is 10 ( if I set task.max at 12 ,there is > 10 members of the consumer group consuming partitions and 2 members in the > consumer-group that do not have partitions to consume). > If I add a third topic with 15 partitions to the connector conf then the 12 > members of the consumer group are consuming partitions, and then if I set now > task.max at 17 only 15 members are active in the consumer-group. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13136) kafka-connect task.max : active task in consumer group is limited by the bigger topic to consume
[ https://issues.apache.org/jira/browse/KAFKA-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508032#comment-17508032 ] raphaelauv commented on KAFKA-13136: thank you very much for that explaination , it's very clear :+1 yes we can close that ticket, thank you > kafka-connect task.max : active task in consumer group is limited by the > bigger topic to consume > > > Key: KAFKA-13136 > URL: https://issues.apache.org/jira/browse/KAFKA-13136 > Project: Kafka > Issue Type: Bug >Reporter: raphaelauv >Priority: Major > > In kafka-connect 2.7 > *The maximum number of active task for a sink connector is equal to the topic > with the biggest number of partitions to consume* > An active task is a task with partitions attributed in the consumer-group of > the sink connector > example : > With 2 topics where each have 10 partitions ( 20 partitions in total ) > The maximum number of active task is 10 ( if I set task.max at 12 ,there is > 10 members of the consumer group consuming partitions and 2 members in the > consumer-group that do not have partitions to consume). > If I add a third topic with 15 partitions to the connector conf then the 12 > members of the consumer group are consuming partitions, and then if I set now > task.max at 17 only 15 members are active in the consumer-group. -- This message was sent by Atlassian Jira (v8.20.1#820001)