[GitHub] [kafka] dajac commented on a change in pull request #10234: MINOR; Clean up LeaderAndIsrResponse construction in `ReplicaManager#becomeLeaderOrFollower`
dajac commented on a change in pull request #10234: URL: https://github.com/apache/kafka/pull/10234#discussion_r586188217 ## File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ## @@ -145,24 +145,22 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setErrorCode(error.code())); } responseData.setPartitionErrors(partitions); -return new LeaderAndIsrResponse(responseData, version()); -} - -List topics = new ArrayList<>(data.topicStates().size()); -Map topicIds = topicIds(); -for (LeaderAndIsrTopicState topicState : data.topicStates()) { -LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError(); -topicError.setTopicId(topicIds.get(topicState.topicName())); -List partitions = new ArrayList<>(topicState.partitionStates().size()); -for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) { -partitions.add(new LeaderAndIsrPartitionError() +} else { +Map topicIds = topicIds(); +for (LeaderAndIsrTopicState topicState : data.topicStates()) { +List partitions = new ArrayList<>( +topicState.partitionStates().size()); +for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) { +partitions.add(new LeaderAndIsrPartitionError() .setPartitionIndex(partition.partitionIndex()) .setErrorCode(error.code())); +} +responseData.topics().add(new LeaderAndIsrTopicError() +.setTopicId(topicIds.get(topicState.topicName())) Review comment: Good catch. I missed this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
chia7712 commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r586131042 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data, + request.context.apiVersion, + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator)). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { +// Check if topic deletion is enabled at all. +if (!config.deleteTopicEnable) { + if (apiVersion < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +// The first step is to load up the names and IDs that have been provided by the +// request. This is a bit messy because we support multiple ways of referring to +// topics (both by name and by id) and because we need to check for duplicates or +// other invalid inputs. +val responses = new util.ArrayList[DeletableTopicResult] +def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). +setName(name). +setTopicId(id). +setErrorCode(error.error.code). +setErrorMessage(error.message)) +} +val providedNames = new util.HashSet[String] +val duplicateProvidedNames = new util.HashSet[String] +val providedIds = new util.HashSet[Uuid] +val duplicateProvidedIds = new util.HashSet[Uuid] +def addProvidedName(name: String): Unit = { + if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) { +duplicateProvidedNames.add(name) +providedNames.remove(name) + } +} +request.topicNames.forEach(addProvidedName) +request.topics.forEach { + topic => if (topic.name == null) { +if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, +"Neither topic name nor id were specified.")) +} else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) { + duplicateProvidedIds.add(topic.topicId) + providedIds.remove(topic.topicId) +} + } else { +if (topic.topicId.equals(ZERO_UUID)) { + addProvidedName(topic.name) +} else { + appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST, +"You may not specify both topic name and topic id.")) +} + } +} +// Create error responses for duplicates. +duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID, + new ApiError(INVALID_REQUEST, "Duplicate topic name."))) +duplicateProvidedIds.forEach(id => appendResponse(null, id, + new ApiError(INVALID_REQUEST, "Duplicate topic id."))) +// At this point we have all the valid names and IDs that have been provided. +// However, the Authorizer needs topic names as inputs, not topic IDs. So +// we need to resolve all IDs to names. +val toAuthenticate = new util.HashSet[String] +toAuthenticate.addAll(providedNames) +val idToName = new util.HashMap[Uuid, String] +controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) => + if (nameOrError.isError) { +appendResponse(null, id, nameOrError.error()) + } else { +toAuthenticate.add(nameOrError.result()) +idToName.put(id, nameOrError.result()) + } +} +// Get the list of deletable topics (those we can delete) and the list of describeable +// topics. If a topic can't be deleted or described, we have to act like it doesn't +// exist, even when it does. +val topicsToAuthenticate = toAuthenticate.asScala +val (describeable, deletable) = if (hasClusterAuth) { + (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet) +} else { +
[GitHub] [kafka] satishd commented on pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.
satishd commented on pull request #10173: URL: https://github.com/apache/kafka/pull/10173#issuecomment-789442937 @junrao thanks for the comment, added all public classes to javadoc section in build.gradle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.
satishd commented on a change in pull request #10173: URL: https://github.com/apache/kafka/pull/10173#discussion_r586119132 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. + * + * This class can be plugged in to Kafka cluster by adding the implementation class as + * remote.log.metadata.manager.class.name property value. There is an inbuilt implementation backed by + * topic storage in the local cluster. This is used as the default implementation if + * remote.log.metadata.manager.class.name is not configured. + * + * + * remote.log.metadata.manager.class.path property is about the class path of the RemoteLogStorageManager + * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded + * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this + * parameter is same with the standard Java class path string. + * + * + * remote.log.metadata.manager.listener.name property is about listener name of the local broker to which + * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other + * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener. + * + * "cluster.id", "broker.id" and all other properties prefixed with "remote.log.metadata." are passed when + * {@link #configure(Map)} is invoked on this instance. + * + */ +@InterfaceStability.Evolving +public interface RemoteLogMetadataManager extends Configurable, Closeable { Review comment: This method is renamed to `putRemotePartitionDeleteMetadata`, which was added in the current interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294278#comment-17294278 ] A. Sophie Blee-Goldman edited comment on KAFKA-10340 at 3/3/21, 4:58 AM: - Temporarily removed 2.6.2 from the Fix Version so I can proceed with the 2.6.2 release, since it's already been cherrypicked to 2.6 This should be added back once the ticket is resolved was (Author: ableegoldman): Temporarily removed 2.6.2 from the Fix Version so I can proceed with the 2.6.2 release, this should be added back once the ticket is resolved > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received
hachikuji opened a new pull request #10252: URL: https://github.com/apache/kafka/pull/10252 This patch implements additional handling logic for `RemoveTopic` records: - Update `MetadataPartitions` to ensure addition of deleted partitions to `localRemoved` set - Ensure topic configs are removed from `ConfigRepository` - Propagate deleted partitions to `GroupCoordinator` so that corresponding offset commits can be removed ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294278#comment-17294278 ] A. Sophie Blee-Goldman commented on KAFKA-10340: Temporarily removed 2.6.2 from the Fix Version so I can proceed with the 2.6.2 release, this should be added back once the ticket is resolved > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-10340: --- Fix Version/s: (was: 2.6.2) > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12400) Upgrade jetty to fix CVE-2020-27223
[ https://issues.apache.org/jira/browse/KAFKA-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-12400. --- Resolution: Fixed Issue resolved by pull request 10245 [https://github.com/apache/kafka/pull/10245] > Upgrade jetty to fix CVE-2020-27223 > --- > > Key: KAFKA-12400 > URL: https://issues.apache.org/jira/browse/KAFKA-12400 > Project: Kafka > Issue Type: Improvement >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Fix For: 2.7.1, 2.6.2, 2.8.0 > > > h3. CVE-2020-27223 Detail > In Eclipse Jetty 9.4.6.v20170531 to 9.4.36.v20210114 (inclusive), 10.0.0, and > 11.0.0 when Jetty handles a request containing multiple Accept headers with a > large number of quality (i.e. q) parameters, the server may enter a denial of > service (DoS) state due to high CPU usage processing those quality values, > resulting in minutes of CPU time exhausted processing those quality values. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy closed pull request #10245: KAFKA-12400: Upgrade jetty to fix CVE-2020-27223
omkreddy closed pull request #10245: URL: https://github.com/apache/kafka/pull/10245 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10250: MINOR: Fix null exception in coordinator log
chia7712 merged pull request #10250: URL: https://github.com/apache/kafka/pull/10250 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10250: MINOR: Fix null exception in coordinator log
chia7712 commented on pull request #10250: URL: https://github.com/apache/kafka/pull/10250#issuecomment-789420972 > do you want to merge this? We should also cherrypick it all the way back to 2.6 Sure. will merge and backport this patch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10250: MINOR: Fix null exception in coordinator log
ableegoldman commented on pull request #10250: URL: https://github.com/apache/kafka/pull/10250#issuecomment-789420651 @chia7712 do you want to merge this? We should also cherrypick it all the way back to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10250: MINOR: Fix null exception in coordinator log
ableegoldman commented on pull request #10250: URL: https://github.com/apache/kafka/pull/10250#issuecomment-789420529 One unrelated test failure which is known to be flaky: `TransactionsBounceTest.testWithGroupMetadata` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10251: MINOR: add missing docs for record-e2e-latency metrics
ableegoldman commented on a change in pull request #10251: URL: https://github.com/apache/kafka/pull/10251#discussion_r586100402 ## File path: docs/ops.html ## @@ -2356,6 +2356,26 @@
[jira] [Resolved] (KAFKA-12389) Upgrade of netty-codec due to CVE-2021-21290
[ https://issues.apache.org/jira/browse/KAFKA-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-12389. --- Fix Version/s: 2.8.0 2.6.2 2.7.1 Resolution: Fixed Issue resolved by pull request 10235 [https://github.com/apache/kafka/pull/10235] > Upgrade of netty-codec due to CVE-2021-21290 > > > Key: KAFKA-12389 > URL: https://issues.apache.org/jira/browse/KAFKA-12389 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.7.0 >Reporter: Dominique Mongelli >Assignee: Dongjin Lee >Priority: Major > Fix For: 2.7.1, 2.6.2, 2.8.0 > > > Our security tool raised the following security flaw on kafka 2.7: > [https://nvd.nist.gov/vuln/detail/CVE-2021-21290] > It is a vulnerability related to jar *netty-codec-4.1.51.Final.jar*. > Looking at source code, the netty-codec in trunk and 2.7.0 branches are still > vulnerable. > Based on netty issue tracker, the vulnerability is fixed in 4.1.59.Final: > https://github.com/netty/netty/security/advisories/GHSA-5mcr-gq6c-3hq2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dongjinleekr commented on pull request #10235: KAFKA-12389: Upgrade of netty-codec due to CVE-2021-21290
dongjinleekr commented on pull request #10235: URL: https://github.com/apache/kafka/pull/10235#issuecomment-789416305 @ableegoldman FYI. :smiley: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10251: MINOR: add missing docs for record-e2e-latency metrics
ableegoldman commented on pull request #10251: URL: https://github.com/apache/kafka/pull/10251#issuecomment-789415834 call for review @cadonna @vvcephei @guozhangwang -- in theory we should cherrypick this back to 2.8 at least, but I can prepare a separate PR against `kafka-site` if we don't get this merged in time (not worth blocking 2.8 over obviously) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10251: MINOR: add missing docs for record-e2e-latency metrics
ableegoldman opened a new pull request #10251: URL: https://github.com/apache/kafka/pull/10251 I missed updating the documentation for these metrics since I didn't notice we had Streams metrics docs outside of the usual Streams docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #10235: KAFKA-12389: Upgrade of netty-codec due to CVE-2021-21290
omkreddy closed pull request #10235: URL: https://github.com/apache/kafka/pull/10235 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10193: MINOR: correct the error message of validating uint32
chia7712 commented on a change in pull request #10193: URL: https://github.com/apache/kafka/pull/10193#discussion_r586082468 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java ## @@ -320,7 +320,7 @@ public Long validate(Object item) { if (item instanceof Long) return (Long) item; else -throw new SchemaException(item + " is not a Long."); +throw new SchemaException(item + " is not an a Long (encoding an unsigned integer)."); Review comment: > There is a typo here, "an a". My bad :( > Also, I think it reads a bit weird. Not clear that "encoding an unsigned integer" in brackets means when reading the message. I will update it in #10248 @ijuma thanks for your reviews This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #10240: KAFKA-12381: only return leader not available for internal topic creation
abbccdda commented on pull request #10240: URL: https://github.com/apache/kafka/pull/10240#issuecomment-789380164 After offline sync with @hachikuji , we decided that the invalid replication factor check would be redundant to be performed on the forwarding broker. Will remove that logic to ensure we don't accidentally return any wrong error code to the client, due to the staleness of metadata cache. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10250: MINOR: Fix null exception in coordinator log
dengziming commented on pull request #10250: URL: https://github.com/apache/kafka/pull/10250#issuecomment-789374413 @ableegoldman PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #10250: MINOR: Fix null exception in coordinator log
dengziming opened a new pull request #10250: URL: https://github.com/apache/kafka/pull/10250 *More detailed description of your change* Found that the `fatalException` is always null when calling `log.info("xxx", fatalException)`, maybe we should first assign a value to it. *Summary of testing strategy (including rationale)* Test locally. from ``` [2021-03-03 10:18:06,203] INFO FindCoordinator request hit fatal exception (org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator:260) ``` to ``` [2021-03-03 10:17:37,123] INFO FindCoordinator request hit fatal exception (org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator:260) org.apache.kafka.common.errors.AuthenticationException: Authentication failed ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming closed pull request #10247: MINOR: Fix log format in AbstractCoordinator
dengziming closed pull request #10247: URL: https://github.com/apache/kafka/pull/10247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10247: MINOR: Fix log format in AbstractCoordinator
dengziming commented on a change in pull request #10247: URL: https://github.com/apache/kafka/pull/10247#discussion_r586046025 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -860,7 +860,7 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { @Override public void onFailure(RuntimeException e, RequestFuture future) { -log.debug("FindCoordinator request failed due to {}", e); +log.debug("FindCoordinator request failed due to {}", e.getMessage()); Review comment: Thank you, the #10232 LGTM, will close this pr. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294210#comment-17294210 ] Luke Chen commented on KAFKA-10251: --- Interesting. Let me check it. Thanks. > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
rondagostino commented on pull request #10184: URL: https://github.com/apache/kafka/pull/10184#issuecomment-789350896 ``` [2021-03-02T22:42:17.438Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10184/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:76:8: Unused import - java.util.Set. [UnusedImports] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #10245: KAFKA-12400: Upgrade jetty to fix CVE-2020-27223
dongjinleekr commented on pull request #10245: URL: https://github.com/apache/kafka/pull/10245#issuecomment-789347793 @ableegoldman I think this should be a blocker, since it is security vulnerability. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down
ableegoldman commented on pull request #10215: URL: https://github.com/apache/kafka/pull/10215#issuecomment-789326289 Merged to trunk and cherrypicked to 2.8 cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down
ableegoldman merged pull request #10215: URL: https://github.com/apache/kafka/pull/10215 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down
ableegoldman commented on pull request #10215: URL: https://github.com/apache/kafka/pull/10215#issuecomment-789323390 One unrelated failure `TransactionsBounceTest.testWithGroupId() ` (known to be flaky, see KAFKA-10251) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-10251: Looks like it's still failing – saw at least one test failure on a build which was kicked off since merging this PR. [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10215/5/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8___testWithGroupId__/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed] Reopening the ticket for further investigation > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs
[ https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-12254: --- Fix Version/s: (was: 3.0.0) > MirrorMaker 2.0 creates destination topic with default configs > -- > > Key: KAFKA-12254 > URL: https://issues.apache.org/jira/browse/KAFKA-12254 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Blocker > Fix For: 2.8.0 > > > `MirrorSourceConnector` implements the logic for replicating data, > configurations, and other metadata between the source and destination > clusters. This includes the tasks below: > # `refreshTopicPartitions` for syncing topics / partitions from source to > destination. > # `syncTopicConfigs` for syncing topic configurations from source to > destination. > A limitation is that `computeAndCreateTopicPartitions` creates topics with > default configurations on the destination cluster. A separate async task > `syncTopicConfigs` is responsible for syncing the topic configs. Before that > sync happens, topic configurations could be out of sync between the two > clusters. > In the worst case, this could lead to data loss eg. when we have a compacted > topic being mirrored between clusters which is incorrectly created with the > default configuration of `cleanup.policy = delete` on the destination before > the configurations are sync'd via `syncTopicConfigs`. > Here is an example of the divergence: > Source Topic: > ``` > Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: > cleanup.policy=compact,segment.bytes=1073741824 > ``` > Destination Topic: > ``` > Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: > segment.bytes=1073741824 > ``` > A safer approach is to ensure that the right configurations are set on the > destination cluster before data is replicated to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10245: KAFKA-12400: Upgrade jetty to fix CVE-2020-27223
ableegoldman commented on pull request #10245: URL: https://github.com/apache/kafka/pull/10245#issuecomment-789260883 What's the status here? Is this a blocker for the 2.6.2 release? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on pull request #10184: URL: https://github.com/apache/kafka/pull/10184#issuecomment-789256987 Thanks for the reviews! I reworked the authentication, validation, and de-duplication code a lot. The new logic should take into account the issues pointed out here. I resolved a few comment threads since they refer to code that was refactored-- please take another look if you get a chance. To clarify a bit, `RemoveTopicRecord` should imply some other effects: * All topic configs for the affected topic should be deleted * We should delete all the partitions of the deleted topic * We should remove the topic from `brokersToIsrs` The fact that it wasn't doing these things was a bug... it's fixed now. This should also allow the ducktape test to work (cc @rondagostino ) We also have a JIRA to follow up on the broker side: https://issues.apache.org/jira/browse/KAFKA-12403 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
hachikuji commented on pull request #10184: URL: https://github.com/apache/kafka/pull/10184#issuecomment-789254543 @rondagostino I believe that error will be fixed by https://issues.apache.org/jira/browse/KAFKA-12403. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r585942960 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -809,6 +824,27 @@ private QuorumController(LogContext logContext, () -> replicationControl.unregisterBroker(brokerId)); } +@Override +public CompletableFuture>> findTopicIds(Collection names) { +if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); +return appendReadEvent("findTopicIds", +() -> replicationControl.findTopicIds(lastCommittedOffset, names)); +} + +@Override +public CompletableFuture>> findTopicNames(Collection ids) { +if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); +return appendReadEvent("findTopicNames", +() -> replicationControl.findTopicNames(lastCommittedOffset, ids)); +} + +@Override +public CompletableFuture> deleteTopics(Collection ids) { +if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); +return appendWriteEvent("deleteTopics", +() -> replicationControl.deleteTopics(ids)); Review comment: Yes, it should be implicit based on the DeleteTopic record. I will fix the controller to do the right thing here. We'll also need to have the broker do that too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r585942730 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } +public void replay(RemoveTopicRecord record) { +TopicControlInfo topic = topics.remove(record.topicId()); +if (topic == null) { +throw new RuntimeException("Can't find topic with ID " + record.topicId() + +" to remove."); +} +topicsByName.remove(topic.name); Review comment: Sorry, you're right: we need to remove this from `brokersToIsrs`. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3988) KafkaConfigBackingStore assumes configs will be stored as schemaless maps
[ https://issues.apache.org/jira/browse/KAFKA-3988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294009#comment-17294009 ] Ewen Cheslack-Postava commented on KAFKA-3988: -- [~pachima...@gmail.com] It's marked WONTFIX because we concluded it didn't make much sense to – as mentioned above, KIP-174 gets rid of the ability to even configure the internal converters. You should remove the internal.converter settings that specify schemas.enable=true. > KafkaConfigBackingStore assumes configs will be stored as schemaless maps > - > > Key: KAFKA-3988 > URL: https://issues.apache.org/jira/browse/KAFKA-3988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Original Estimate: 4h > Remaining Estimate: 4h > > If you use an internal key/value converter that drops schema information (as > is the default in the config files we provide since we use JsonConverter with > schemas.enable=false), the schemas we use that are structs get converted to > maps since we don't know the structure to decode them to. Because our tests > run with these settings, we haven't validated that the code works if schemas > are preserved. > When they are preserved, we'll hit an error message like this > {quote} > [2016-07-25 07:36:34,828] ERROR Found connector configuration > (connector-test-mysql-jdbc) in wrong format: class > org.apache.kafka.connect.data.Struct > (org.apache.kafka.connect.storage.KafkaConfigBackingStore:498) > {quote} > because the code currently checks that it is working with a map. We should > actually be checking for either a Struct or a Map. This same problem probably > affects a couple of other types of data in the same class as Connector > configs, Task configs, Connect task lists, and target states are all Structs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.
junrao commented on a change in pull request #10173: URL: https://github.com/apache/kafka/pull/10173#discussion_r585934097 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. + * + * This class can be plugged in to Kafka cluster by adding the implementation class as + * remote.log.metadata.manager.class.name property value. There is an inbuilt implementation backed by + * topic storage in the local cluster. This is used as the default implementation if + * remote.log.metadata.manager.class.name is not configured. + * + * + * remote.log.metadata.manager.class.path property is about the class path of the RemoteLogStorageManager + * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded + * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this + * parameter is same with the standard Java class path string. + * + * + * remote.log.metadata.manager.listener.name property is about listener name of the local broker to which + * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other + * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener. + * + * "cluster.id", "broker.id" and all other properties prefixed with "remote.log.metadata." are passed when + * {@link #configure(Map)} is invoked on this instance. + * + */ +@InterfaceStability.Evolving +public interface RemoteLogMetadataManager extends Configurable, Closeable { Review comment: The KIP has the following method and is missing in the PR. `void updateRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)` ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Closeable; +import java.io.InputStream; + +/** + * This interface provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote + * storage. + * + * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId} + * which is universally unique even for the same topic partition and offsets. + * + * {@link RemoteLogSegmentMetadata} is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on + * {@link RemoteStorageManager} with the respective {@link RemoteLogSegmentState}. {@link RemoteLogMetadataManager} is + * responsible for storing and fetching metadata about the remote
[GitHub] [kafka] rondagostino commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
rondagostino commented on pull request #10184: URL: https://github.com/apache/kafka/pull/10184#issuecomment-789243745 The below, if added as `tests/kafkatest/sanity_checks/test_delete_topic.py`, fails for the Raft cases on this PR branch as of this moment because the broker fails to shutdown. The following appears in the controller log: ``` [2021-03-02 21:41:13,354] INFO [Controller 1] Unfenced broker 1 has requested and been granted a controlled shutdown. (org.apache.kafka.controller.BrokerHeartbeatManager) [2021-03-02 21:41:13,355] WARN [Controller 1] org.apache.kafka.controller.QuorumController@3fa533f1: failed with unknown server exception RuntimeException at epoch 1 in 802 us. Reverting to last committed offset 5. (org.apache.kafka.controller.QuorumController) java.lang.RuntimeException: Topic ID VnD54LHq2t3qq_m1WLasZg existed in isrMembers, but not in the topics map. at org.apache.kafka.controller.ReplicationControlManager.handleNodeDeactivated(ReplicationControlManager.java:752) at org.apache.kafka.controller.ReplicationControlManager.processBrokerHeartbeat(ReplicationControlManager.java:931) at org.apache.kafka.controller.QuorumController$1.generateRecordsAndResult(QuorumController.java:911) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:419) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) at java.lang.Thread.run(Thread.java:748) ``` Maybe add this system test to this PR as `tests/kafkatest/sanity_checks/test_delete_topic.py`? ``` # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # #http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from ducktape.mark import matrix from ducktape.mark.resource import cluster from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.zookeeper import ZookeeperService import time class TestDeleteTopic(Test): """Sanity checks that we can create and delete a topic and then shutdown.""" def __init__(self, test_context): super(TestDeleteTopic, self).__init__(test_context) self.topic = "test_topic" self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {"partitions": 1, "replication-factor": 1}}, controller_num_nodes_override=1) def setUp(self): if self.zk: self.zk.start() @cluster(num_nodes=2) @matrix(metadata_quorum=quorum.all) def test_delete_topic(self, metadata_quorum): """ Test that we can create and delete a topic and then shutdown """ self.kafka.start() self.kafka.delete_topic(self.topic) time.sleep(10) # give it a bit to take effect self.kafka.stop() # explicit stop so that failure to stop fails the test ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3988) KafkaConfigBackingStore assumes configs will be stored as schemaless maps
[ https://issues.apache.org/jira/browse/KAFKA-3988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294004#comment-17294004 ] Laxman Pachimadla commented on KAFKA-3988: -- [~ewencp] Hi Ewen, do you have any latest update on this. We have posted issue recently [https://github.com/awslabs/kinesis-kafka-connector/issues/46.] Please look in this and provide some solution > KafkaConfigBackingStore assumes configs will be stored as schemaless maps > - > > Key: KAFKA-3988 > URL: https://issues.apache.org/jira/browse/KAFKA-3988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Original Estimate: 4h > Remaining Estimate: 4h > > If you use an internal key/value converter that drops schema information (as > is the default in the config files we provide since we use JsonConverter with > schemas.enable=false), the schemas we use that are structs get converted to > maps since we don't know the structure to decode them to. Because our tests > run with these settings, we haven't validated that the code works if schemas > are preserved. > When they are preserved, we'll hit an error message like this > {quote} > [2016-07-25 07:36:34,828] ERROR Found connector configuration > (connector-test-mysql-jdbc) in wrong format: class > org.apache.kafka.connect.data.Struct > (org.apache.kafka.connect.storage.KafkaConfigBackingStore:498) > {quote} > because the code currently checks that it is working with a map. We should > actually be checking for either a Struct or a Map. This same problem probably > affects a couple of other types of data in the same class as Connector > configs, Task configs, Connect task lists, and target states are all Structs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500
rondagostino commented on a change in pull request #10227: URL: https://github.com/apache/kafka/pull/10227#discussion_r585924508 ## File path: KIP-500.md ## @@ -0,0 +1,131 @@ +KIP-500 Early Access Release + + +# Introduction +It is now possible to run Apache Kafka without Apache ZooKeeper! We call this mode [self-managed mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum). It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it is available for testing in the Kafka 2.8 release. + +When the Kafka cluster is in self-managed mode, it does not store its metadata in ZooKeeper. In fact, you do not have to run ZooKeeper at all, because it stores its metadata in a Raft quorum of controller nodes. + +Self-managed mode has many benefits-- some obvious, and some not so obvious. Clearly, it is nice to manage and configure one service rather than two services. In addition, you can now run a single process Kafka cluster. Most important of all, self-managed mode is more scalable. We expect to be able to [support many more topics and partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/) in this mode. + +# Quickstart + +## Warning +Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.0 is released, it may not even be possible to upgrade your self-managed clusters from 2.8 to 3.0 without downtime. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the early access release of KIP-500. + +## Generate a cluster ID +The first step is to generate an ID for your new cluster, using the kafka-storage tool: + + +$ ./bin/kafka-storage.sh random-uuid +xtzWWN4bTjitpL3kfd9s5g + + +## Format Storage Directories +The next step is to format your storage directories. If you are running in single-node mode, you can do this with one command: + + +$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c ./config/raft-combined.properties Review comment: > Do we even need a prefix at all though? We already have server.properties=ZK, adding new broker.properties, controller.properties and combined.properties (or controller+broker.properties) seems to me to be self-describing and both backwards and forwards compatible I tend to agree with this suggestion. I think using `{broker,controller,combined}.properties` for names would be a good way to go here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12377) Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener
[ https://issues.apache.org/jira/browse/KAFKA-12377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293992#comment-17293992 ] Matthias J. Sax commented on KAFKA-12377: - Failed again. > Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener > > > Key: KAFKA-12377 > URL: https://issues.apache.org/jira/browse/KAFKA-12377 > Project: Kafka > Issue Type: Test > Components: core, security, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote}org.opentest4j.AssertionFailedError: expected: > but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at > org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckClientConnectionFailure(SaslAuthenticatorTest.java:2187) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckSslAuthenticationFailure(SaslAuthenticatorTest.java:2210) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.verifySslClientAuthForSaslSslListener(SaslAuthenticatorTest.java:1846) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testSslClientAuthRequiredForSaslSslListener(SaslAuthenticatorTest.java:1800){quote} > STDOUT > {quote}[2021-02-26 07:18:57,220] ERROR Extensions provided in login context > without a token > (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) > java.io.IOException: Extensions provided in login context without a token at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165) > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) > [...] > Caused by: > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException: > Extensions provided in login context without a token at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:192) > at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163) > ... 116 more{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12177) Retention is not idempotent
[ https://issues.apache.org/jira/browse/KAFKA-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-12177. - Fix Version/s: 3.0.0 Assignee: Lucas Bradstreet Resolution: Fixed merged the PR to trunk > Retention is not idempotent > --- > > Key: KAFKA-12177 > URL: https://issues.apache.org/jira/browse/KAFKA-12177 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Bradstreet >Assignee: Lucas Bradstreet >Priority: Minor > Fix For: 3.0.0 > > > Kafka today applies retention in the following order: > # Time > # Size > # Log start offset > Today it is possible for a segment with offsets less than the log start > offset to contain data that is not deletable due to time retention. This > means that it's possible for log start offset retention to unblock further > deletions as a result of time based retention. Note that this does require a > case where the max timestamp for each segment increases, decreases and then > increases again. Even so it would be nice to make retention idempotent by > applying log start offset retention first, followed by size and time. This > would also be potentially cheaper to perform as neither log start offset and > size retention require the maxTimestamp for a segment to be loaded from disk > after a broker restart. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #10216: KAFKA-12177: apply log start offset retention before time and size based retention
junrao merged pull request #10216: URL: https://github.com/apache/kafka/pull/10216 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12238) Implement DescribeProducers API
[ https://issues.apache.org/jira/browse/KAFKA-12238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12238: Fix Version/s: 2.8.0 > Implement DescribeProducers API > --- > > Key: KAFKA-12238 > URL: https://issues.apache.org/jira/browse/KAFKA-12238 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.8.0 > > > Implement the changes described in KIP-664 for the `DescribeProducers` API. > This is only the server-side implementation and not the changes to `Admin`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12267) Implement DescribeTransactions API
[ https://issues.apache.org/jira/browse/KAFKA-12267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12267: Fix Version/s: 3.0.0 > Implement DescribeTransactions API > -- > > Key: KAFKA-12267 > URL: https://issues.apache.org/jira/browse/KAFKA-12267 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12369) Implement ListTransactions API
[ https://issues.apache.org/jira/browse/KAFKA-12369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12369. - Fix Version/s: 3.0.0 Resolution: Fixed > Implement ListTransactions API > -- > > Key: KAFKA-12369 > URL: https://issues.apache.org/jira/browse/KAFKA-12369 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > This tracks the implementation of the `ListTransactions` API documented by > KIP-664: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. > This API is similar to `ListGroups` for consumer groups. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji merged pull request #10206: URL: https://github.com/apache/kafka/pull/10206 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist
ijuma commented on a change in pull request #10141: URL: https://github.com/apache/kafka/pull/10141#discussion_r585901281 ## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ## @@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging { proposedAssignments } + private def describeTopics(adminClient: Admin, + topics: Set[String]) + : Map[String, TopicDescription] = { +adminClient.describeTopics(topics.asJava).values.asScala.map { + case (topicName, topicDescriptionFuture) => +try { + topicName -> topicDescriptionFuture.get +} +catch { + case t: ExecutionException => +if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) { Review comment: Also, you can have the `if` in the `case t` line and then a second `case` for the rethrow case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist
ijuma commented on a change in pull request #10141: URL: https://github.com/apache/kafka/pull/10141#discussion_r585900837 ## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ## @@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging { proposedAssignments } + private def describeTopics(adminClient: Admin, + topics: Set[String]) + : Map[String, TopicDescription] = { +adminClient.describeTopics(topics.asJava).values.asScala.map { + case (topicName, topicDescriptionFuture) => +try { + topicName -> topicDescriptionFuture.get +} +catch { + case t: ExecutionException => +if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) { Review comment: Hmm. why not write this as `t.getCause.isInstanceOf[UnknownTopicOrPartitionException]`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10193: MINOR: correct the error message of validating uint32
ijuma commented on a change in pull request #10193: URL: https://github.com/apache/kafka/pull/10193#discussion_r585899128 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java ## @@ -320,7 +320,7 @@ public Long validate(Object item) { if (item instanceof Long) return (Long) item; else -throw new SchemaException(item + " is not a Long."); +throw new SchemaException(item + " is not an a Long (encoding an unsigned integer)."); Review comment: Also, I think it reads a bit weird. Not clear that "encoding an unsigned integer" in brackets means when reading the message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10193: MINOR: correct the error message of validating uint32
ijuma commented on a change in pull request #10193: URL: https://github.com/apache/kafka/pull/10193#discussion_r585897929 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java ## @@ -320,7 +320,7 @@ public Long validate(Object item) { if (item instanceof Long) return (Long) item; else -throw new SchemaException(item + " is not a Long."); +throw new SchemaException(item + " is not an a Long (encoding an unsigned integer)."); Review comment: There is a typo here, "an a". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
jolshan commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r585895209 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { +if (!config.deleteTopicEnable) { + if (apiVersion < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val responses = new util.ArrayList[DeletableTopicResult] +val duplicatedTopicNames = new util.HashSet[String] +val topicNamesToResolve = new util.HashSet[String] +val topicIdsToResolve = new util.HashSet[Uuid] +val duplicatedTopicIds = new util.HashSet[Uuid] + +def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). +setName(name). +setTopicId(id). +setErrorCode(error.error().code()). +setErrorMessage(error.message())) +} + +def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { +appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) +topicNamesToResolve.remove(name) +duplicatedTopicNames.add(name) + } +} + +def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { +appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) +topicIdsToResolve.remove(id) +duplicatedTopicIds.add(id) + } +} + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + +request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { +if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, +"Neither topic name nor id were specified.")) +} else { + maybeAppendToIdsToResolve(topic.topicId()) +} + } else { +if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) +} else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, +"You may not specify both topic name and topic id.")) +} + } +} + +val idToName = new util.HashMap[Uuid, String] + +def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } +} +controller.findTopicIds(topicNamesToResolve).get().asScala.foreach { + case (name, idOrError) => if (idOrError.isError) { +appendResponse(name, ZERO_UUID, idOrError.error()) + } else { +maybeAppendToIdToName(idOrError.result(), name) + } +} +controller.findTopicNames(topicIdsToResolve).get().asScala.foreach { + case (id, nameOrError) => if (nameOrError.isError) { +appendResponse(null, id, nameOrError.error()) + } else { +maybeAppendToIdToName(id, nameOrError.result()) + } +} + +if (!hasClusterAuth) { + val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala) + val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala) + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { +val entry = iterator.next() +val topicName =
[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
jolshan commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r585895209 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { +if (!config.deleteTopicEnable) { + if (apiVersion < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val responses = new util.ArrayList[DeletableTopicResult] +val duplicatedTopicNames = new util.HashSet[String] +val topicNamesToResolve = new util.HashSet[String] +val topicIdsToResolve = new util.HashSet[Uuid] +val duplicatedTopicIds = new util.HashSet[Uuid] + +def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). +setName(name). +setTopicId(id). +setErrorCode(error.error().code()). +setErrorMessage(error.message())) +} + +def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { +appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) +topicNamesToResolve.remove(name) +duplicatedTopicNames.add(name) + } +} + +def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { +appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) +topicIdsToResolve.remove(id) +duplicatedTopicIds.add(id) + } +} + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + +request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { +if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, +"Neither topic name nor id were specified.")) +} else { + maybeAppendToIdsToResolve(topic.topicId()) +} + } else { +if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) +} else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, +"You may not specify both topic name and topic id.")) +} + } +} + +val idToName = new util.HashMap[Uuid, String] + +def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } +} +controller.findTopicIds(topicNamesToResolve).get().asScala.foreach { + case (name, idOrError) => if (idOrError.isError) { +appendResponse(name, ZERO_UUID, idOrError.error()) + } else { +maybeAppendToIdToName(idOrError.result(), name) + } +} +controller.findTopicNames(topicIdsToResolve).get().asScala.foreach { + case (id, nameOrError) => if (nameOrError.isError) { +appendResponse(null, id, nameOrError.error()) + } else { +maybeAppendToIdToName(id, nameOrError.result()) + } +} + +if (!hasClusterAuth) { + val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala) + val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala) + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { +val entry = iterator.next() +val topicName =
[GitHub] [kafka] ijuma commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0
ijuma commented on a change in pull request #10203: URL: https://github.com/apache/kafka/pull/10203#discussion_r585894119 ## File path: build.gradle ## @@ -1586,15 +1605,17 @@ project(':streams:test-utils') { archivesBaseName = "kafka-streams-test-utils" dependencies { -compile project(':streams') -compile project(':clients') +implementation project(':streams') Review comment: Yes, agreed. This is even more important since this module is public API and it exposes classes from streams and clients in said API (e.g. `Deserializer` and `TopologyException`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r585891911 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { +if (!config.deleteTopicEnable) { + if (apiVersion < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val responses = new util.ArrayList[DeletableTopicResult] +val duplicatedTopicNames = new util.HashSet[String] +val topicNamesToResolve = new util.HashSet[String] +val topicIdsToResolve = new util.HashSet[Uuid] +val duplicatedTopicIds = new util.HashSet[Uuid] + +def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). +setName(name). +setTopicId(id). +setErrorCode(error.error().code()). +setErrorMessage(error.message())) +} + +def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { +appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) +topicNamesToResolve.remove(name) +duplicatedTopicNames.add(name) + } +} + +def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { +appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) +topicIdsToResolve.remove(id) +duplicatedTopicIds.add(id) + } +} + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + +request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { +if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, +"Neither topic name nor id were specified.")) +} else { + maybeAppendToIdsToResolve(topic.topicId()) +} + } else { +if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) +} else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, +"You may not specify both topic name and topic id.")) +} + } +} + +val idToName = new util.HashMap[Uuid, String] + +def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } +} +controller.findTopicIds(topicNamesToResolve).get().asScala.foreach { + case (name, idOrError) => if (idOrError.isError) { +appendResponse(name, ZERO_UUID, idOrError.error()) + } else { +maybeAppendToIdToName(idOrError.result(), name) + } +} +controller.findTopicNames(topicIdsToResolve).get().asScala.foreach { + case (id, nameOrError) => if (nameOrError.isError) { +appendResponse(null, id, nameOrError.error()) + } else { +maybeAppendToIdToName(id, nameOrError.result()) + } +} + +if (!hasClusterAuth) { + val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala) + val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala) + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { +val entry = iterator.next() +val topicName =
[GitHub] [kafka] ijuma commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0
ijuma commented on a change in pull request #10203: URL: https://github.com/apache/kafka/pull/10203#discussion_r585891145 ## File path: release.py ## @@ -631,7 +631,7 @@ def select_gpg_key(): contents = f.read() if not user_ok("Going to build and upload mvn artifacts based on these settings:\n" + contents + '\nOK (y/n)?: '): fail("Retry again later") -cmd("Building and uploading archives", "./gradlewAll uploadArchives", cwd=kafka_dir, env=jdk8_env, shell=True) +cmd("Building and uploading archives", "./gradlewAll publish", cwd=kafka_dir, env=jdk8_env, shell=True) Review comment: Good catch. I had searched for this, but somehow missed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10249: MINOR: disable round_trip_fault_test system tests for Raft quorums
rondagostino opened a new pull request #10249: URL: https://github.com/apache/kafka/pull/10249 The KIP-500 early access release will not support creating a partition with a manual partition assignment that includes a broker that is not currently online. This patch disables system tests for Raft-based metadata quorums where the test depends on this functionality to pass. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12403) Broker handling of delete topic events
[ https://issues.apache.org/jira/browse/KAFKA-12403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12403: Labels: kip-500 (was: ) > Broker handling of delete topic events > -- > > Key: KAFKA-12403 > URL: https://issues.apache.org/jira/browse/KAFKA-12403 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: kip-500 > > This issue tracks completion of metadata listener support for the topic > deletion event. When a topic is deleted, the broker needs to stop replicas, > delete log data, and remove cached topic configurations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #10163: KAFKA-10357: Extract setup of changelog from Streams partition assignor
ableegoldman merged pull request #10163: URL: https://github.com/apache/kafka/pull/10163 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10163: KAFKA-10357: Extract setup of changelog from Streams partition assignor
ableegoldman commented on pull request #10163: URL: https://github.com/apache/kafka/pull/10163#issuecomment-789174791 Java11 failed with unrelated flaky `kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch`, Java15 failed with `Execution failed for task ':streams:test-utils:unitTest'` (this PR does not touch on the test-utils so this failure should be unrelated, however I ran them locally to verify) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12403) Broker handling of delete topic events
Jason Gustafson created KAFKA-12403: --- Summary: Broker handling of delete topic events Key: KAFKA-12403 URL: https://issues.apache.org/jira/browse/KAFKA-12403 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson This issue tracks completion of metadata listener support for the topic deletion event. When a topic is deleted, the broker needs to stop replicas, delete log data, and remove cached topic configurations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs
[ https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12254: - Fix Version/s: 2.8.0 > MirrorMaker 2.0 creates destination topic with default configs > -- > > Key: KAFKA-12254 > URL: https://issues.apache.org/jira/browse/KAFKA-12254 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Blocker > Fix For: 3.0.0, 2.8.0 > > > `MirrorSourceConnector` implements the logic for replicating data, > configurations, and other metadata between the source and destination > clusters. This includes the tasks below: > # `refreshTopicPartitions` for syncing topics / partitions from source to > destination. > # `syncTopicConfigs` for syncing topic configurations from source to > destination. > A limitation is that `computeAndCreateTopicPartitions` creates topics with > default configurations on the destination cluster. A separate async task > `syncTopicConfigs` is responsible for syncing the topic configs. Before that > sync happens, topic configurations could be out of sync between the two > clusters. > In the worst case, this could lead to data loss eg. when we have a compacted > topic being mirrored between clusters which is incorrectly created with the > default configuration of `cleanup.policy = delete` on the destination before > the configurations are sync'd via `syncTopicConfigs`. > Here is an example of the divergence: > Source Topic: > ``` > Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: > cleanup.policy=compact,segment.bytes=1073741824 > ``` > Destination Topic: > ``` > Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: > segment.bytes=1073741824 > ``` > A safer approach is to ensure that the right configurations are set on the > destination cluster before data is replicated to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10163: KAFKA-10357: Extract setup of changelog from Streams partition assignor
ableegoldman commented on a change in pull request #10163: URL: https://github.com/apache/kafka/pull/10163#discussion_r585863461 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; + +public class ChangelogTopics { + +private final InternalTopicManager internalTopicManager; +private final Map topicGroups; +private final Map> tasksForTopicGroup; +private final Map> changelogPartitionsForTask = new HashMap<>(); +private final Map> preExistingChangelogPartitionsForTask = new HashMap<>(); +private final Set preExistingNonSourceTopicBasedChangelogPartitions = new HashSet<>(); +private final Set sourceTopicBasedChangelogTopics = new HashSet<>(); +private final Set sourceTopicBasedChangelogTopicPartitions = new HashSet<>(); +private final Logger log; + +public ChangelogTopics(final InternalTopicManager internalTopicManager, + final Map topicGroups, + final Map> tasksForTopicGroup, + final String logPrefix) { +this.internalTopicManager = internalTopicManager; +this.topicGroups = topicGroups; +this.tasksForTopicGroup = tasksForTopicGroup; +final LogContext logContext = new LogContext(logPrefix); +log = logContext.logger(getClass()); +} + +public void setup() { Review comment: Sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293953#comment-17293953 ] A. Sophie Blee-Goldman commented on KAFKA-10251: Merged [https://github.com/apache/kafka/pull/10228,] please reopen this ticket if failures are seen on builds kicked off after 11:47am PST (March 2nd) > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #10228: KAFKA-10251: increase timeout for consumeing records
ableegoldman merged pull request #10228: URL: https://github.com/apache/kafka/pull/10228 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10228: KAFKA-10251: increase timeout for consumeing records
ableegoldman commented on pull request #10228: URL: https://github.com/apache/kafka/pull/10228#issuecomment-789166076 Since this test is failing pretty regularly, I think we should go ahead and merge this now. A 30s timeout is pretty standard and if there is a real issue, just bumping the timeout from 15s to 30s should not cover it up. We can reopen the ticket if new failures are seen This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10247: MINOR: Fix log format in AbstractCoordinator
ableegoldman commented on a change in pull request #10247: URL: https://github.com/apache/kafka/pull/10247#discussion_r585856635 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -860,7 +860,7 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { @Override public void onFailure(RuntimeException e, RequestFuture future) { -log.debug("FindCoordinator request failed due to {}", e); +log.debug("FindCoordinator request failed due to {}", e.getMessage()); Review comment: @guozhangwang has a PR open in which he fixes this on the side. See https://github.com/apache/kafka/pull/10232/files#r584478802 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10228: KAFKA-10251: increase timeout for consumeing records
ableegoldman commented on pull request #10228: URL: https://github.com/apache/kafka/pull/10228#issuecomment-789162142 Three unrelated failures: ``` Build / JDK 15 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining Build / JDK 15 / kafka.api.AuthorizerIntegrationTest.testFetchFollowerRequest() Build / JDK 15 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sknop commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
sknop commented on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-789161208 Yes of course, totally agree. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10237: MINOR: fix failing system test delegation_token_test
rondagostino commented on a change in pull request #10237: URL: https://github.com/apache/kafka/pull/10237#discussion_r585829658 ## File path: tests/kafkatest/services/security/security_config.py ## @@ -374,7 +374,8 @@ def interbroker_sasl_mechanism(self): def enabled_sasl_mechanisms(self): sasl_mechanisms = [] if self.is_sasl(self.security_protocol): -sasl_mechanisms += [self.client_sasl_mechanism] +# .csv is supported so be sure to account for that possibility +sasl_mechanisms += self.client_sasl_mechanism.strip().split(',') Review comment: > Not sure if we should bother in this PR, but the usages of client_sasl_mechanism could stand to be cleaned up I agree it needs to be cleaned up. Given we are past code freeze for 2.8, I've opened https://issues.apache.org/jira/browse/KAFKA-12402 for this and we can address it another time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12402) client_sasl_mechanism should be an explicit list instead of a .csv string
Ron Dagostino created KAFKA-12402: - Summary: client_sasl_mechanism should be an explicit list instead of a .csv string Key: KAFKA-12402 URL: https://issues.apache.org/jira/browse/KAFKA-12402 Project: Kafka Issue Type: Improvement Components: system tests Reporter: Ron Dagostino The SecurityConfig and the KafkaService system test classes both accept a client_sasl_mechanism parameter. This is typically a single value (e.g. PLAIN), but DelegationTokenTest sets self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'. If we need to support a list of mechanisms then the parameter should be an explicit list instead of a .csv string. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
[ https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293926#comment-17293926 ] Matthias J. Sax commented on KAFKA-12319: - Failed again. > Flaky test > ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() > - > > Key: KAFKA-12319 > URL: https://issues.apache.org/jira/browse/KAFKA-12319 > Project: Kafka > Issue Type: Test >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I've seen this test fail a few times locally. But recently I saw it fail on a > PR build on Jenkins. > > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/] > h3. Error Message > java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: > Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 > sec) ==> expected: <30.0> but was: <37.436825357209706> > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12394) Consider topic id existence and authorization errors
[ https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12394. - Fix Version/s: 2.8.0 Resolution: Fixed > Consider topic id existence and authorization errors > > > Key: KAFKA-12394 > URL: https://issues.apache.org/jira/browse/KAFKA-12394 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Fix For: 2.8.0 > > > We have historically had logic in the api layer to avoid leaking the > existence or non-existence of topics to clients which are not authorized to > describe them. The way we have done this is to always authorize the topic > name first before checking existence. > Topic ids make this more difficult because the resource (ie the topic name) > has to be derived. This means we have to check existence of the topic first. > If the topic does not exist, then our hands are tied and we have to return > UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the > client is authorized to describe it. The question comes then what we should > do if the client is not authorized? > The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this > is misleading and forces the client to retry even though they are doomed to > hit the same error. However, the client should generally handle this by > requesting Metadata using the topic name that they are interested in, which > would give them a chance to see the topic authorization error. Basically the > fact that you need describe permission in the first place to discover the > topic id makes this an unlikely scenario. > There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. > Basically we could take the stance that we do not care about leaking the > existence of topic IDs since they do not reveal anything about the underlying > topic. Additionally, there is little likelihood of a user discovering a valid > UUID by accident or even through brute force. The benefit of this is that > users get a clear error for cases where a topic Id may have been discovered > through some external means. For example, an administrator finds a topic ID > in the logging and attempts to delete it using the new `deleteTopicsWithIds` > Admin API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-10788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293907#comment-17293907 ] Rohit Deshpande commented on KAFKA-10788: - thanks [~cadonna] makes sense to me. Will post the update here. > Streamlining Tests in CachingInMemoryKeyValueStoreTest > -- > > Key: KAFKA-10788 > URL: https://issues.apache.org/jira/browse/KAFKA-10788 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Rohit Deshpande >Priority: Major > Labels: newbie > > While reviewing, kIP-614, it was decided that tests for > [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588] > need to be streamlined to use mocked underlyingStore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission
hachikuji merged pull request #10223: URL: https://github.com/apache/kafka/pull/10223 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vanhoale commented on pull request #10239: KAFKA-12372: Enhance TimestampCoverter Connect transformation to handle multiple timestamp or date fields
vanhoale commented on pull request #10239: URL: https://github.com/apache/kafka/pull/10239#issuecomment-789110876 I ran my test, checkstyle and spotbugs successful at my local, but not sure some checks failed here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r585800691 ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1813,17 +1813,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest { producer.beginTransaction() producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get +def assertListTransactionResult( + expectedTransactionalIds: Set[String] +): Unit = { + val listTransactionsRequest = new ListTransactionsRequest.Builder(new ListTransactionsRequestData()).build() + val listTransactionsResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) + assertEquals(Errors.NONE, Errors.forCode(listTransactionsResponse.data.errorCode)) + assertEquals(expectedTransactionalIds, listTransactionsResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) +} + // First verify that we can list the transaction -val listTransactionsRequest = new ListTransactionsRequest.Builder(new ListTransactionsRequestData()).build() -val authorizedResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) -assertEquals(Errors.NONE, Errors.forCode(authorizedResponse.data.errorCode)) -assertEquals(Set(transactionalId), authorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) +assertListTransactionResult(expectedTransactionalIds = Set(transactionalId)) // Now revoke authorization and verify that the transaction is no longer listable removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) -val unauthorizedResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) -assertEquals(Errors.NONE, Errors.forCode(unauthorizedResponse.data.errorCode)) -assertEquals(Set(), unauthorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) +assertListTransactionResult(expectedTransactionalIds = Set()) + +// The minimum permission needed is `Describe` +addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) Review comment: Ugh, yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293885#comment-17293885 ] Matthias J. Sax edited comment on KAFKA-10251 at 3/2/21, 6:11 PM: -- Two more. was (Author: mjsax): One more. > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12401) Flaky Test FeatureCommandTest#testUpgradeAllFeaturesSuccess
Matthias J. Sax created KAFKA-12401: --- Summary: Flaky Test FeatureCommandTest#testUpgradeAllFeaturesSuccess Key: KAFKA-12401 URL: https://issues.apache.org/jira/browse/KAFKA-12401 Project: Kafka Issue Type: Test Components: admin, unit tests Reporter: Matthias J. Sax {quote}kafka.admin.UpdateFeaturesException: 2 feature updates failed! at kafka.admin.FeatureApis.maybeApplyFeatureUpdates(FeatureCommand.scala:289) at kafka.admin.FeatureApis.upgradeAllFeatures(FeatureCommand.scala:191) at kafka.admin.FeatureCommandTest.$anonfun$testUpgradeAllFeaturesSuccess$3(FeatureCommandTest.scala:134){quote} STDOUT {quote}[Add] Feature: feature_1 ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 3 Result: OK [Add] Feature: feature_2 ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 5 Result: OK [Add] Feature: feature_1 ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 3 Result: OK [Add] Feature: feature_2 ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 5 Result: OK{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8003) Flaky Test TransactionsTest #testFencingOnTransactionExpiration
[ https://issues.apache.org/jira/browse/KAFKA-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293889#comment-17293889 ] Matthias J. Sax commented on KAFKA-8003: A third test method: {quote}org.opentest4j.AssertionFailedError: Consumed 0 records before timeout instead of the expected 1000 records at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:852) at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1476) at kafka.api.TransactionsTest.testMultipleMarkersOneLeader(TransactionsTest.scala:588){quote} STDOUT {quote}[2021-03-02 10:31:53,233] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-03-02 10:31:53,315] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka5762171763596437663.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094){quote} Seems to be the same issue as above? > Flaky Test TransactionsTest #testFencingOnTransactionExpiration > --- > > Key: KAFKA-8003 > URL: https://issues.apache.org/jira/browse/KAFKA-8003 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Assignee: Jason Gustafson >Priority: Critical > Labels: flaky-test > Fix For: 2.2.3 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/34/] > {quote}java.lang.AssertionError: expected:<1> but was:<0> at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:645) at > org.junit.Assert.assertEquals(Assert.java:631) at > kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:510){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch commented on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-789100759 If we don't clarify the documentation, then I think users will be very confused. Can you take a stab at improving/expanding the documentation a bit to clarify the input and output types? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293885#comment-17293885 ] Matthias J. Sax commented on KAFKA-10251: - One more. > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r585784412 ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1813,17 +1813,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest { producer.beginTransaction() producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get +def assertListTransactionResult( + expectedTransactionalIds: Set[String] +): Unit = { + val listTransactionsRequest = new ListTransactionsRequest.Builder(new ListTransactionsRequestData()).build() + val listTransactionsResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) + assertEquals(Errors.NONE, Errors.forCode(listTransactionsResponse.data.errorCode)) + assertEquals(expectedTransactionalIds, listTransactionsResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) +} + // First verify that we can list the transaction -val listTransactionsRequest = new ListTransactionsRequest.Builder(new ListTransactionsRequestData()).build() -val authorizedResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) -assertEquals(Errors.NONE, Errors.forCode(authorizedResponse.data.errorCode)) -assertEquals(Set(transactionalId), authorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) +assertListTransactionResult(expectedTransactionalIds = Set(transactionalId)) // Now revoke authorization and verify that the transaction is no longer listable removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) -val unauthorizedResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) -assertEquals(Errors.NONE, Errors.forCode(unauthorizedResponse.data.errorCode)) -assertEquals(Set(), unauthorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) +assertListTransactionResult(expectedTransactionalIds = Set()) + +// The minimum permission needed is `Describe` +addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) Review comment: Should permission be describe rather than write? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #10224: MINOR: Disable transactional/idempotent system tests for Raft quorums
mumrah merged pull request #10224: URL: https://github.com/apache/kafka/pull/10224 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10237: MINOR: fix failing system test delegation_token_test
mumrah commented on a change in pull request #10237: URL: https://github.com/apache/kafka/pull/10237#discussion_r585784850 ## File path: tests/kafkatest/services/security/security_config.py ## @@ -374,7 +374,8 @@ def interbroker_sasl_mechanism(self): def enabled_sasl_mechanisms(self): sasl_mechanisms = [] if self.is_sasl(self.security_protocol): -sasl_mechanisms += [self.client_sasl_mechanism] +# .csv is supported so be sure to account for that possibility +sasl_mechanisms += self.client_sasl_mechanism.strip().split(',') Review comment: Not sure if we should bother in this PR, but the usages of `client_sasl_mechanism` could stand to be cleaned up. In `SecurityConfig.__init__` we default it to a simple string, but as you found here (and as seen in `SecurityConfig.client_config`) we support it being a comma delimited string. https://github.com/apache/kafka/blob/58b3b1b557e9ba19cffde91bd117a89b947f1fc1/tests/kafkatest/services/security/security_config.py#L240-L253 It's probably a lot safer to declare this as a list in the class and not worry about having to do the `split(",")` everywhere. Though maybe there's a reason why we split it lazily.. not sure. Either way, this change looks good. However, you might consider doing something like: ```python sasl_mechanisms += [mechanism.strip() for mechanism in self.client_sasl_mechanism.split(',')] ``` since `self.client_sasl_mechanism.strip()` won't catch spaces in the middle of the string. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9717: KAFKA-10766: Unit test cases for RocksDBRangeIterator
guozhangwang commented on pull request #9717: URL: https://github.com/apache/kafka/pull/9717#issuecomment-789095693 LGTM. Merged to trunk and cherry-picked to 2.8 cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9717: KAFKA-10766: Unit test cases for RocksDBRangeIterator
guozhangwang merged pull request #9717: URL: https://github.com/apache/kafka/pull/9717 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore
guozhangwang commented on pull request #10052: URL: https://github.com/apache/kafka/pull/10052#issuecomment-789094844 Cherry-picked to 2.8 as well cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore
guozhangwang commented on pull request #10052: URL: https://github.com/apache/kafka/pull/10052#issuecomment-789094199 LGTM. Thanks for the contribution @vamossagar12 ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore
guozhangwang merged pull request #10052: URL: https://github.com/apache/kafka/pull/10052 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #10248: MINOR: main function of o.a.k.c.p.t.Type does not show all types
chia7712 opened a new pull request #10248: URL: https://github.com/apache/kafka/pull/10248 This PR includes following changes 1. rename `UNSIGNED_INT32` to `UINT32` (consistent to `UINT16`) 1. make sure Type.toHtml shows `UINT16`, `UINT32` and `COMPACT_RECORDS` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10234: MINOR; Clean up LeaderAndIsrResponse construction in `ReplicaManager#becomeLeaderOrFollower`
chia7712 commented on a change in pull request #10234: URL: https://github.com/apache/kafka/pull/10234#discussion_r585723099 ## File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ## @@ -145,24 +145,22 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setErrorCode(error.code())); } responseData.setPartitionErrors(partitions); -return new LeaderAndIsrResponse(responseData, version()); -} - -List topics = new ArrayList<>(data.topicStates().size()); -Map topicIds = topicIds(); -for (LeaderAndIsrTopicState topicState : data.topicStates()) { -LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError(); -topicError.setTopicId(topicIds.get(topicState.topicName())); -List partitions = new ArrayList<>(topicState.partitionStates().size()); -for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) { -partitions.add(new LeaderAndIsrPartitionError() +} else { +Map topicIds = topicIds(); +for (LeaderAndIsrTopicState topicState : data.topicStates()) { +List partitions = new ArrayList<>( +topicState.partitionStates().size()); +for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) { +partitions.add(new LeaderAndIsrPartitionError() .setPartitionIndex(partition.partitionIndex()) .setErrorCode(error.code())); +} +responseData.topics().add(new LeaderAndIsrTopicError() +.setTopicId(topicIds.get(topicState.topicName())) Review comment: Could it be replaced by `setTopicId(topicState.topicId())`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 closed pull request #9128: KAFKA-7540 reduce session timeout to evict dead member in time and so…
chia7712 closed pull request #9128: URL: https://github.com/apache/kafka/pull/9128 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 closed pull request #10086: MINOR: expose number of forwarding requests to metrics
chia7712 closed pull request #10086: URL: https://github.com/apache/kafka/pull/10086 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10240: KAFKA-12381: only return leader not available for internal topic creation
hachikuji commented on pull request #10240: URL: https://github.com/apache/kafka/pull/10240#issuecomment-789032940 Ok, I think I see what is going on now. The failing system test is verifying what happens when inter-broker communication no longer works. This results in different behavior because `AutoTopicCreationManager` relies on the `MetadataCache` in order to determine the number of live brokers while the old logic checked zk directly. That makes the `INVALID_REPLICATION_FACTOR` more dangerous since it is not retriable and the cache may be stale. In particular, when inter-broker communication is down, the cache will be empty and the broker will end up trying to auto-create all topics. I can think of a few options to address the problem: 1. Bring back the old logic to check Zookeeper for the live brokers. This might be fine for 2.8, but it does not address the problem for KIP-500. 2. Return a retriable error instead. Really `UNKNOWN_TOPIC_OR_PARTITION` would be a better error in this case. 3. Make `INVALID_REPLICATION_FACTOR` a retriable error. I guess we have to understand how clients My inclination is probably option 2. The downside is that the user would no longer get a clear error when a topic cannot be auto-created. But I feel overall it's the safest and most consistent way to handle this case. There might be other options though. It's interesting to note that this relates back to some of the discussion in the auto-create PR itself. We had discussed skipping the replication factor check on the broker and sending the request to the controller. But either way, we have to rely on the metadata cache locally at least to determine whether the topic already exists or not, so it might not have really helped. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down
wcarlson5 commented on a change in pull request #10215: URL: https://github.com/apache/kafka/pull/10215#discussion_r585710809 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) { closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); -threads.remove(deadThread); Review comment: since the name doesn't matter I wonder why we spent so much effort making sure it had the same name? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) { log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time"); timeout = true; +// Don't remove from threads until shutdown is complete. We will trim it from the +// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the +// shutdown then we should just consider this thread.id to be burned +} else { +threads.remove(streamThread); Review comment: I think we can leave it for now, if we should see problems this could be a fix, we don't run a single thread soak so we won't see this issue ourselves but there are many single thread applications that could start using this and we should see if they have problems This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org