[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
hachikuji commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r571362924 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -169,6 +169,8 @@ class DefaultAutoTopicCreationManager( } } +clearInflightRequests(creatableTopics) Review comment: Can you use a `try/finally` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10002: MINOR: remove the indent in security doc
showuon commented on pull request #10002: URL: https://github.com/apache/kafka/pull/10002#issuecomment-774401866 @omkreddy , thanks for comments. I've updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10002: MINOR: remove the indent in security doc
showuon commented on a change in pull request #10002: URL: https://github.com/apache/kafka/pull/10002#discussion_r571361102 ## File path: docs/security.html ## @@ -649,22 +649,22 @@ Configuring Kafka Brokers Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example: - KafkaServer { -org.apache.kafka.common.security.plain.PlainLoginModule required -username="admin" -password="admin-secret" -user_admin="admin-secret" -user_alice="alice-secret"; -}; +KafkaServer { +org.apache.kafka.common.security.plain.PlainLoginModule required +username="admin" +password="admin-secret" +user_admin="admin-secret" +user_alice="alice-secret"; +}; This configuration defines two users (admin and alice). The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. The set of properties user_userName defines the passwords for all users that connect to the broker and the broker validates all client connections including those from other brokers using these properties. Pass the JAAS config file location as JVM parameter to each Kafka broker: - -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf + -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf Configure SASL port and SASL mechanisms in server.properties as described here. For example: -listeners=SASL_SSL://host.name:port +listeners=SASL_SSL://host.name:port Review comment: Wow, I've reviewed it twice but didn't found it! Thanks for pointing out. Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #10074: KAFKA-12305: Fix Flatten SMT for array types
C0urante opened a new pull request #10074: URL: https://github.com/apache/kafka/pull/10074 [Jira](https://issues.apache.org/jira/browse/KAFKA-12305) (Copied from Jira): The `Flatten` SMT fails for array types. A sophisticated approach that tries to flatten arrays might be desirable in some cases, and may have been punted during the early design phase of the transform, but in the interim, it's probably not worth it to make array data and the SMT mutually exclusive. A naive approach that preserves arrays as-are and doesn't attempt to flatten them seems fair for now. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12305) Flatten SMT fails on arrays
[ https://issues.apache.org/jira/browse/KAFKA-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12305: -- Description: The {{Flatten}} SMT fails for array types. A sophisticated approach that tries to flatten arrays might be desirable in some cases, and may have been punted during the early design phase of the transform, but in the interim, it's probably not worth it to make array data and the SMT mutually exclusive. A naive approach that preserves arrays as-are and doesn't attempt to flatten them seems fair for now. > Flatten SMT fails on arrays > --- > > Key: KAFKA-12305 > URL: https://issues.apache.org/jira/browse/KAFKA-12305 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1, > 2.8.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > The {{Flatten}} SMT fails for array types. A sophisticated approach that > tries to flatten arrays might be desirable in some cases, and may have been > punted during the early design phase of the transform, but in the interim, > it's probably not worth it to make array data and the SMT mutually exclusive. > A naive approach that preserves arrays as-are and doesn't attempt to flatten > them seems fair for now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12305) Flatten SMT fails on arrays
Chris Egerton created KAFKA-12305: - Summary: Flatten SMT fails on arrays Key: KAFKA-12305 URL: https://issues.apache.org/jira/browse/KAFKA-12305 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.6.1, 2.7.0, 2.5.1, 2.4.1, 2.3.1, 2.2.2, 2.1.1, 2.0.1, 2.8.0 Reporter: Chris Egerton Assignee: Chris Egerton -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante opened a new pull request #10073: KAFKA-12303: Fix handling of null values by Flatten SMT
C0urante opened a new pull request #10073: URL: https://github.com/apache/kafka/pull/10073 [Jira](https://issues.apache.org/jira/browse/KAFKA-12303) Using `return` instead of `continue` when encountering null fields causes the remainder of the fields to be skipped. This PR addresses that, and adds a lightweight unit test that is able to reproduce the error on current versions of Connect, and verify the accuracy of the fix. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12304) Improve topic validation in auto topic creation
Boyang Chen created KAFKA-12304: --- Summary: Improve topic validation in auto topic creation Key: KAFKA-12304 URL: https://issues.apache.org/jira/browse/KAFKA-12304 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen The topic validation path should have a higher priority than other follow-up processes. Basically we should move it right after the topic authorization is done in metadata request handling. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12303) Flatten SMT drops some fields when null values are present
[ https://issues.apache.org/jira/browse/KAFKA-12303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12303: -- Summary: Flatten SMT drops some fields when null values are present (was: Flatten SMT drops some nested fields) > Flatten SMT drops some fields when null values are present > -- > > Key: KAFKA-12303 > URL: https://issues.apache.org/jira/browse/KAFKA-12303 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1, > 2.8.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > [This > line|https://github.com/apache/kafka/blob/0bc394cc1d19f1e41dd6646e9ac0e09b91fb1398/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java#L109] > should be {{continue}} instead of {{return}}; otherwise, the rest of the > entries in the currently-being-iterated map are skipped unnecessarily. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12303) Flatten SMT drops some nested fields
Chris Egerton created KAFKA-12303: - Summary: Flatten SMT drops some nested fields Key: KAFKA-12303 URL: https://issues.apache.org/jira/browse/KAFKA-12303 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.6.1, 2.7.0, 2.5.1, 2.4.1, 2.3.1, 2.2.2, 2.1.1, 2.0.1, 2.8.0 Reporter: Chris Egerton Assignee: Chris Egerton [This line|https://github.com/apache/kafka/blob/0bc394cc1d19f1e41dd6646e9ac0e09b91fb1398/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java#L109] should be {{continue}} instead of {{return}}; otherwise, the rest of the entries in the currently-being-iterated map are skipped unnecessarily. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #10000: KAFKA-9274: handle TimeoutException on task reset
mjsax merged pull request #1: URL: https://github.com/apache/kafka/pull/1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out
mjsax commented on a change in pull request #10072: URL: https://github.com/apache/kafka/pull/10072#discussion_r571331549 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -32,6 +33,7 @@ public final int topicGroupId; /** The ID of the partition. */ public final int partition; +public Task task; Review comment: I was not happy about it either... Any good suggestions how to do better? I could not come up with a better solution quickly unfortunately. :( We could add it to the constructor and make it mandatory, but the "splash radios" would be quite large... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10049: Refactor MetadataCache for Raft metadata
cmccabe merged pull request #10049: URL: https://github.com/apache/kafka/pull/10049 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset
ableegoldman commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r571317298 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -229,17 +231,22 @@ public void initializeIfNeeded() { } } +public void addPartitionsForOffsetReset(final Set partitionsForOffsetReset) { +mainConsumer.pause(partitionsForOffsetReset); +resetOffsetsForPartitions.addAll(partitionsForOffsetReset); +} + /** * @throws TimeoutException if fetching committed offsets timed out */ @Override -public void completeRestoration() { +public void completeRestoration(final java.util.function.Consumer> offsetResetter) { switch (state()) { case RUNNING: return; case RESTORING: -initializeMetadata(); +resetOffsetsIfNeededAndInitializeMetadata(offsetResetter); Review comment: cool, thanks, this seems much cleaner to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out
ableegoldman commented on a change in pull request #10072: URL: https://github.com/apache/kafka/pull/10072#discussion_r571317046 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -32,6 +33,7 @@ public final int topicGroupId; /** The ID of the partition. */ public final int partition; +public Task task; Review comment: I have to say, it makes me a little uncomfortable to stick the actual `Task` object inside the basic `TaskId` container class. Especially if 99% of the time it will be null, given that we use `TaskId` all over the place and only call `setTask` a handful of time. It will only get increasingly difficult about whether it's safe to assume a given `TaskId` object has an actual non-null `Task` or not, and therefore is safe to use 😬 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key
[ https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280044#comment-17280044 ] Matthias J. Sax commented on KAFKA-12213: - Works for me. Just wanted to point out, that [~bbejeck] also started to look into this (just learned about it), breaking it down into similar PRs. You should sync with him to avoid duplicate work. From my understanding, he started to prepare a PR for the `ValueJoiner` interface. > Kafka Streams aggregation Initializer to accept record key > -- > > Key: KAFKA-12213 > URL: https://issues.apache.org/jira/browse/KAFKA-12213 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Piotr Fras >Assignee: Piotr Fras >Priority: Minor > Labels: needs-kip > > Sometimes Kafka record key contains useful information for creating a zero > object in aggregation Initializer. This feature is to add kafka record key to > Initializer. > There were two approaches I considered to implement this feature, one > respecting backwards compatibility for internal and external APIs and the > other one which is not. I chose the latter one as it was more strait-forward. > We may want to validate this approach tho. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax opened a new pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out
mjsax opened a new pull request #10072: URL: https://github.com/apache/kafka/pull/10072 Part of KIP-572: follow up work to PR #9800. It's not save to retry a TX commit after a timeout, because it's unclear if the commit was successful or not, and thus on retry we might get an IllegalStateException. Instead, we will throw a TaskCorruptedException to retry the TX if the commit failed. Call for review @ableegoldman @vvcephei @abbccdda @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager
rondagostino commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r571309365 ## File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala ## @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.cluster.Partition +import kafka.log.LogManager +import kafka.server.QuotaFactory.QuotaManagers +import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints} +import kafka.server.metadata.{ConfigRepository, MetadataBroker, MetadataBrokers, MetadataImageBuilder, MetadataPartition} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Scheduler +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.Time + +import scala.collection.{Map, Set, mutable} + +class RaftReplicaManager(config: KafkaConfig, + metrics: Metrics, + time: Time, + scheduler: Scheduler, + logManager: LogManager, + isShuttingDown: AtomicBoolean, + quotaManagers: QuotaManagers, + brokerTopicStats: BrokerTopicStats, + metadataCache: MetadataCache, + logDirFailureChannel: LogDirFailureChannel, + delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], + delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], + delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], + delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader], + threadNamePrefix: Option[String], + configRepository: ConfigRepository, + alterIsrManager: AlterIsrManager) extends ReplicaManager( + config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers, + brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory, + delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) { + + if (config.requiresZookeeper) { +throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper") + } + + // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not + // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease. + // Changes are never deferred when using ZooKeeper. When true, this indicates that we should transition + // online partitions to the deferred state if we see a metadata update for that partition. + private var deferringMetadataChanges: Boolean = true + stateChangeLogger.debug(s"Metadata changes are initially being deferred") + + def beginMetadataChangeDeferral(): Unit = { +replicaStateChangeLock synchronized { + deferringMetadataChanges = true + stateChangeLogger.info(s"Metadata changes are now being deferred") +} + } + + def endMetadataChangeDeferral(): Unit = { +val startMs = time.milliseconds() +replicaStateChangeLock synchronized { + stateChangeLogger.info(s"Applying deferred metadata changes") + val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints) + val partitionsMadeFollower = mutable.Set[Partition]() + val partitionsMadeLeader = mutable.Set[Partition]() + val leadershipChangeCallbacks = +mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]() + try { +val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]() +val followerPartitionStates = mutable.Map[Partition, MetadataPartition]() +val partitionsAlreadyExisting = mutable.Set[MetadataPartition]() +val
[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager
rondagostino commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r571237871 ## File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala ## @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.cluster.Partition +import kafka.log.LogManager +import kafka.server.QuotaFactory.QuotaManagers +import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints} +import kafka.server.metadata.{ConfigRepository, MetadataBroker, MetadataBrokers, MetadataImageBuilder, MetadataPartition} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Scheduler +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.Time + +import scala.collection.{Map, Set, mutable} + +class RaftReplicaManager(config: KafkaConfig, + metrics: Metrics, + time: Time, + scheduler: Scheduler, + logManager: LogManager, + isShuttingDown: AtomicBoolean, + quotaManagers: QuotaManagers, + brokerTopicStats: BrokerTopicStats, + metadataCache: MetadataCache, + logDirFailureChannel: LogDirFailureChannel, + delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], + delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], + delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], + delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader], + threadNamePrefix: Option[String], + configRepository: ConfigRepository, + alterIsrManager: AlterIsrManager) extends ReplicaManager( + config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers, + brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory, + delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) { + + if (config.requiresZookeeper) { +throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper") + } + + // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not + // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease. + // Changes are never deferred when using ZooKeeper. When true, this indicates that we should transition + // online partitions to the deferred state if we see a metadata update for that partition. + private var deferringMetadataChanges: Boolean = true + stateChangeLogger.debug(s"Metadata changes are initially being deferred") + + def beginMetadataChangeDeferral(): Unit = { +replicaStateChangeLock synchronized { + deferringMetadataChanges = true + stateChangeLogger.info(s"Metadata changes are now being deferred") +} + } + + def endMetadataChangeDeferral(): Unit = { +val startMs = time.milliseconds() +replicaStateChangeLock synchronized { + stateChangeLogger.info(s"Applying deferred metadata changes") + val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints) + val partitionsMadeFollower = mutable.Set[Partition]() + val partitionsMadeLeader = mutable.Set[Partition]() + val leadershipChangeCallbacks = +mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]() + try { +val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]() +val followerPartitionStates = mutable.Map[Partition, MetadataPartition]() +val partitionsAlreadyExisting = mutable.Set[MetadataPartition]() +val
[jira] [Assigned] (KAFKA-9195) Use Randomized State Directory Names in Streams System Tests
[ https://issues.apache.org/jira/browse/KAFKA-9195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9195: -- Assignee: Matthias J. Sax > Use Randomized State Directory Names in Streams System Tests > - > > Key: KAFKA-9195 > URL: https://issues.apache.org/jira/browse/KAFKA-9195 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: Bruno Cadonna >Assignee: Matthias J. Sax >Priority: Major > > Currently, the state directory property in streams' system tests is set to > the {{PERSISTENT_ROOT}} variable. Since Streams applications in different > tests have the same application ID and the state directory path consists of > state directory property + application ID + task ID, it might happen that a > dirty state directory of one test is re-used by another test if the state > directory is not properly cleaned up. This may lead to unexpected results and > false positive and/or flaky failures. > The state directory property shall be set to a randomized path inside > {{PERSISTENT_ROOT}} to ensure that tests may not interfere with each other in > the case of missing state clean-ups. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key
[ https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280034#comment-17280034 ] Piotr Fras edited comment on KAFKA-12213 at 2/5/21, 11:09 PM: -- I am going to break down KIP-149 into five streams of work and submit individual pull request for each of the following interfaces, for both Java and Scala API: - ValueTransformer - ValueMapper - ValueJoiner - Initializer (attempt made to address in [https://github.com/apache/kafka/pull/9908]) - Reducer Let me know if that works for you. was (Author: moncalamari): I am going to break down KIP-149 into five streams of work and submit individual pull request for each of the following interfaces, for both Java and Scala API: - ValueTransformer - ValueMapper - ValueJoiner - Initializer (already addressed in [https://github.com/apache/kafka/pull/9908]) - Reducer Let me know if that works for you. > Kafka Streams aggregation Initializer to accept record key > -- > > Key: KAFKA-12213 > URL: https://issues.apache.org/jira/browse/KAFKA-12213 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Piotr Fras >Assignee: Piotr Fras >Priority: Minor > Labels: needs-kip > > Sometimes Kafka record key contains useful information for creating a zero > object in aggregation Initializer. This feature is to add kafka record key to > Initializer. > There were two approaches I considered to implement this feature, one > respecting backwards compatibility for internal and external APIs and the > other one which is not. I chose the latter one as it was more strait-forward. > We may want to validate this approach tho. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key
[ https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280034#comment-17280034 ] Piotr Fras commented on KAFKA-12213: I am going to break down KIP-149 into five streams of work and submit individual pull request for each of the following interfaces: - ValueTransformer - ValueMapper - ValueJoiner - Initializer (already addressed in [https://github.com/apache/kafka/pull/9908]) - Reducer Let me know if that works for you. > Kafka Streams aggregation Initializer to accept record key > -- > > Key: KAFKA-12213 > URL: https://issues.apache.org/jira/browse/KAFKA-12213 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Piotr Fras >Assignee: Piotr Fras >Priority: Minor > Labels: needs-kip > > Sometimes Kafka record key contains useful information for creating a zero > object in aggregation Initializer. This feature is to add kafka record key to > Initializer. > There were two approaches I considered to implement this feature, one > respecting backwards compatibility for internal and external APIs and the > other one which is not. I chose the latter one as it was more strait-forward. > We may want to validate this approach tho. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key
[ https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280034#comment-17280034 ] Piotr Fras edited comment on KAFKA-12213 at 2/5/21, 11:02 PM: -- I am going to break down KIP-149 into five streams of work and submit individual pull request for each of the following interfaces, for both Java and Scala API: - ValueTransformer - ValueMapper - ValueJoiner - Initializer (already addressed in [https://github.com/apache/kafka/pull/9908]) - Reducer Let me know if that works for you. was (Author: moncalamari): I am going to break down KIP-149 into five streams of work and submit individual pull request for each of the following interfaces: - ValueTransformer - ValueMapper - ValueJoiner - Initializer (already addressed in [https://github.com/apache/kafka/pull/9908]) - Reducer Let me know if that works for you. > Kafka Streams aggregation Initializer to accept record key > -- > > Key: KAFKA-12213 > URL: https://issues.apache.org/jira/browse/KAFKA-12213 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Piotr Fras >Assignee: Piotr Fras >Priority: Minor > Labels: needs-kip > > Sometimes Kafka record key contains useful information for creating a zero > object in aggregation Initializer. This feature is to add kafka record key to > Initializer. > There were two approaches I considered to implement this feature, one > respecting backwards compatibility for internal and external APIs and the > other one which is not. I chose the latter one as it was more strait-forward. > We may want to validate this approach tho. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration
[ https://issues.apache.org/jira/browse/KAFKA-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-10678. Assignee: A. Sophie Blee-Goldman Resolution: Fixed Resolved via [https://github.com/apache/kafka/pull/9978] [~thebearmayor] this should be fixed in the upcoming 2.8.0 release and 2.6.2 releases which are currently in progress (and in 2.7.1 but I'm not sure of the schedule for that yet). If/when you're able to upgrade to one of these, please verify that the task shuffling due to redeployment has been mitigated. And obviously, reopen this ticket if not – thanks! > Re-deploying Streams app causes rebalance and task migration > > > Key: KAFKA-10678 > URL: https://issues.apache.org/jira/browse/KAFKA-10678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0, 2.6.1 >Reporter: Bradley Peterson >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 2.8.0, 2.7.1, 2.6.2 > > Attachments: after, before, broker > > > Re-deploying our Streams app causes a rebalance, even when using static group > membership. Worse, the rebalance creates standby tasks, even when the > previous task assignment was balanced and stable. > Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but > we saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with > 4 streams threads, and data stored on persistent EBS volumes.. During a > redeploy, all EC2 instances are stopped, new instances are launched, and the > EBS volumes are attached to the new instances. We do not use interactive > queries. {{session.timeout.ms}} is set to 30 minutes, and the deployment > finishes well under that. {{num.standby.replicas}} is 0. > h2. Expected Behavior > Given a stable and balanced task assignment prior to deploying, we expect to > see the same task assignment after deploying. Even if a rebalance is > triggered, we do not expect to see new standby tasks. > h2. Observed Behavior > Attached are the "Assigned tasks to clients" log lines from before and after > deploying. The "before" is from over 24 hours ago, the task assignment is > well balanced and "Finished stable assignment of tasks, no followup > rebalances required." is logged. The "after" log lines show the same > assignment of active tasks, but some additional standby tasks. There are > additional log lines about adding and removing active tasks, which I don't > quite understand. > I've also included logs from the broker showing the rebalance was triggered > for "Updating metadata". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration
[ https://issues.apache.org/jira/browse/KAFKA-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-10678: --- Fix Version/s: 2.6.2 2.7.1 2.8.0 > Re-deploying Streams app causes rebalance and task migration > > > Key: KAFKA-10678 > URL: https://issues.apache.org/jira/browse/KAFKA-10678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0, 2.6.1 >Reporter: Bradley Peterson >Priority: Major > Fix For: 2.8.0, 2.7.1, 2.6.2 > > Attachments: after, before, broker > > > Re-deploying our Streams app causes a rebalance, even when using static group > membership. Worse, the rebalance creates standby tasks, even when the > previous task assignment was balanced and stable. > Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but > we saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with > 4 streams threads, and data stored on persistent EBS volumes.. During a > redeploy, all EC2 instances are stopped, new instances are launched, and the > EBS volumes are attached to the new instances. We do not use interactive > queries. {{session.timeout.ms}} is set to 30 minutes, and the deployment > finishes well under that. {{num.standby.replicas}} is 0. > h2. Expected Behavior > Given a stable and balanced task assignment prior to deploying, we expect to > see the same task assignment after deploying. Even if a rebalance is > triggered, we do not expect to see new standby tasks. > h2. Observed Behavior > Attached are the "Assigned tasks to clients" log lines from before and after > deploying. The "before" is from over 24 hours ago, the task assignment is > well balanced and "Finished stable assignment of tasks, no followup > rebalances required." is logged. The "after" log lines show the same > assignment of active tasks, but some additional standby tasks. There are > additional log lines about adding and removing active tasks, which I don't > quite understand. > I've also included logs from the broker showing the rebalance was triggered > for "Updating metadata". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key
[ https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280030#comment-17280030 ] Matthias J. Sax commented on KAFKA-12213: - It's totally fine to implement it incrementally across multiple releases. IIRC, when KIP-149 was done, we did not have a Scala API yet. If there are issues, we should discuss solutions on the dev mailing list as follow up, and update the KIP accordingly. > Kafka Streams aggregation Initializer to accept record key > -- > > Key: KAFKA-12213 > URL: https://issues.apache.org/jira/browse/KAFKA-12213 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Piotr Fras >Assignee: Piotr Fras >Priority: Minor > Labels: needs-kip > > Sometimes Kafka record key contains useful information for creating a zero > object in aggregation Initializer. This feature is to add kafka record key to > Initializer. > There were two approaches I considered to implement this feature, one > respecting backwards compatibility for internal and external APIs and the > other one which is not. I chose the latter one as it was more strait-forward. > We may want to validate this approach tho. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12288) Remove task-level filesystem locks
[ https://issues.apache.org/jira/browse/KAFKA-12288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12288: Component/s: streams > Remove task-level filesystem locks > -- > > Key: KAFKA-12288 > URL: https://issues.apache.org/jira/browse/KAFKA-12288 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > > Since you actually can't run multiple instances of a Kafka Streams > application on the same physical state directory, there's really no benefit > to the file locks we obtain as part of the task directory locking. The only > safety measures we need are to protect between threads within a process, not > across processes. The in-memory map of locks should be sufficient for the > StateDirectory to be safe. > So, we should take out all of the file-based locking of task directories. > This should allow us to greatly simplify the StateDirectory code, and > eliminate the source of some problems we've faced in the past, particularly > those finicky FS/OS dependent issues -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
hachikuji commented on pull request #9579: URL: https://github.com/apache/kafka/pull/9579#issuecomment-774326674 @abbccdda Thanks for the updates. I opened a PR with a few fixes to speed this along since we're trying to get it checked in today: https://github.com/abbccdda/kafka/pull/6. The tests that were previously failing now seem to be passing (at least when testing locally). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup
rondagostino commented on a change in pull request #10039: URL: https://github.com/apache/kafka/pull/10039#discussion_r571283864 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length -val jobsForDir = logsToLoad.map { logDir => +val jobsForDir = logsToLoad + .filter(logDir => Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) Review comment: We could get rid of the parameter in `startup()` and create a `TopicLogConfigOverrideRetriever` class that held the `ConfigRepository` and would retrieve any topic configs in a thread-safe manner -- then we just pass that to `loadLog()` and it asks for the overrides, and the class would retrieve iff they haven't been retrieved before. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup
rondagostino commented on a change in pull request #10039: URL: https://github.com/apache/kafka/pull/10039#discussion_r571283864 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length -val jobsForDir = logsToLoad.map { logDir => +val jobsForDir = logsToLoad + .filter(logDir => Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) Review comment: We could get rid of the parameter in `startup()` and create a 'TopicLogConfigOverrideRetriever` class that held the `ConfigRepository` and would retrieve any topic configs in a thread-safe manner -- then we just pass that to `loadLog()` and it asks for the overrides, and the class would retrieve iff they haven't been retrieved before. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup
rondagostino commented on a change in pull request #10039: URL: https://github.com/apache/kafka/pull/10039#discussion_r571278884 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length -val jobsForDir = logsToLoad.map { logDir => +val jobsForDir = logsToLoad + .filter(logDir => Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) Review comment: Yeah, we use the list of topics just to know what to check for log config overrides. This will see anything in the log dir. Now that I realize it, we could probably use this to know what topics we need to look at for log config overrides. But I don't want to change too much at this point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup
rondagostino commented on a change in pull request #10039: URL: https://github.com/apache/kafka/pull/10039#discussion_r571276941 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File], /** * Start the background threads to flush logs and do log cleanup */ - def startup(): Unit = { + def startup(retrieveTopicNames: => Set[String]): Unit = { Review comment: Actually, this isn't the list of topics to load -- it's the list of topics for which we will look for log config overrides. `retrieveTopicsForLogConfigOverrideCheck` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup
rondagostino commented on a change in pull request #10039: URL: https://github.com/apache/kafka/pull/10039#discussion_r571275654 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File], /** * Start the background threads to flush logs and do log cleanup */ - def startup(): Unit = { + def startup(retrieveTopicNames: => Set[String]): Unit = { Review comment: Would like to keep it as a verb: `retrieveTopicsToLoad` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup
rondagostino commented on a change in pull request #10039: URL: https://github.com/apache/kafka/pull/10039#discussion_r571274481 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File], /** * Start the background threads to flush logs and do log cleanup */ - def startup(): Unit = { + def startup(retrieveTopicNames: => Set[String]): Unit = { + startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames)) Review comment: Yes, `startup()` is invoked immediately after the instance is constructed when we are using ZooKeeper, and we can get the configs at that point since we can just query ZK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup
mumrah commented on a change in pull request #10039: URL: https://github.com/apache/kafka/pull/10039#discussion_r571268386 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File], /** * Start the background threads to flush logs and do log cleanup */ - def startup(): Unit = { + def startup(retrieveTopicNames: => Set[String]): Unit = { + startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames)) Review comment: We are able to load the topic configs right away because they are coming from ZK, right? ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File], /** * Start the background threads to flush logs and do log cleanup */ - def startup(): Unit = { + def startup(retrieveTopicNames: => Set[String]): Unit = { + startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames)) + } + + // visible for testing + private[log] def generateTopicLogConfigs(topicNames: Set[String]): Map[String, LogConfig] = { +val topicLogConfigs: mutable.Map[String, LogConfig] = mutable.Map() Review comment: nit: can move type info to right hand side and just have `val topicLogConfigs = ...` ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File], /** * Start the background threads to flush logs and do log cleanup */ - def startup(): Unit = { + def startup(retrieveTopicNames: => Set[String]): Unit = { Review comment: Feels slightly odd to pass in the set of topics to load here, but I can't think of a good way to avoid it. Perhaps we could pass MetadataCache into LogManager and let startup call MetadataCache#getAllTopics? That might be more risky though since it changes the startup order in KafkaServer, maybe we can look into this as a follow-up. Besides that, the name here seems strange. Maybe something like "topicsToLoad"? ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length -val jobsForDir = logsToLoad.map { logDir => +val jobsForDir = logsToLoad + .filter(logDir => Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) Review comment: Will the metadata topic get passed into LogManager? I would guess not since it's a Raft topic and not a regular Kafka topic. Also style nit: you can do `logsToLoad.filter { logDir => ... }` similar to the `map` below This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12302) Flaky Test SaslApiVersionsRequestTest#testApiVersionsRequestWithUnsupportedVersion
Matthias J. Sax created KAFKA-12302: --- Summary: Flaky Test SaslApiVersionsRequestTest#testApiVersionsRequestWithUnsupportedVersion Key: KAFKA-12302 URL: https://issues.apache.org/jira/browse/KAFKA-12302 Project: Kafka Issue Type: Test Components: core, unit tests Reporter: Matthias J. Sax [https://github.com/apache/kafka/pull/1/checks?check_run_id=1836684124] {quote}{{org.opentest4j.AssertionFailedError: Topic [__consumer_offsets] metadata not propagated after 6 ms at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.utils.TestUtils$.waitForAllPartitionsMetadata(TestUtils.scala:852) at kafka.utils.TestUtils$.createTopic(TestUtils.scala:367) at kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:429) at kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:109) at kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:84) at kafka.server.SaslApiVersionsRequestTest.setUp(SaslApiVersionsRequestTest.scala:41)}} {quote} {{}} {{}} STDOUT {quote}{{[2021-02-05 05ː26ː04,270] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka2215259725436410019.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094) [2021-02-05 05ː26ː04,280] ERROR [ZooKeeperClient] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-02-05 05ː26ː04,304] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka2215259725436410019.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094) [2021-02-05 05ː26ː04,331] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-02-05 05ː26ː05,337] ERROR [RequestSendThread controllerId=0] Controller 0 fails to send a request to broker localhost:36921 (id: 0 rack: null) (kafka.controller.RequestSendThread:76) java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1367) at java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:278) at kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:82) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:234) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [2021-02-05 05ː27ː09,650] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka12601197135790806695.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094) [2021-02-05 05ː27ː09,650] ERROR [ZooKeeperClient] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-02-05 05ː27ː09,654] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka12601197135790806695.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094) [2021-02-05 05ː27ː09,654] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-02-05 05ː27ː12,865] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka15543931612251532592.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094) [2021-02-05 05ː27ː12,865] ERROR [ZooKeeperClient] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-02-05 05ː27ː12,868] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka15543931612251532592.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094) [2021-02-05 05ː27ː12,869] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-02-05 06ː07ː42,597] WARN SASL configuration failed: javax.security.auth.login.Log
[jira] [Commented] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
[ https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280008#comment-17280008 ] Matthias J. Sax commented on KAFKA-12284: - Failed again: https://github.com/apache/kafka/pull/1/checks?check_run_id=1836563325 > Flaky Test > MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync > - > > Key: KAFKA-12284 > URL: https://issues.apache.org/jira/browse/KAFKA-12284 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470] > {quote} {{java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}} > [...] > > {{Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364) > ... 92 more > Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists.}} > {quote} > STDOUT > {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:354) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748)}}{quote} > {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state > info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}} > {{}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests
mumrah commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571258754 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +Zk, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); + +/** + * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is + * acting as the controller (since ZK controllers serve both broker and controller roles). + */ +Collection brokers(); + +/** + * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also + * currently the active controller. For Raft-based clusters, this will return all controller servers. + */ +Collection controllers(); + +/** + * Any one of the broker servers. + */ +Optional anyBroker(); + +/** + * Any one of the controller servers. + */ +Optional anyController(); + +/** + * The underlying object which is responsible for setting up and tearing down the cluster. + */ +Object getUnderlying(); + +default T getUnderlying(Class asClass) { +return asClass.cast(getUnderlying()); +} + +Admin createAdminClient(Properties configOverrides); + +default Admin createAdminClient() { +return createAdminClient(new Properties()); +} + +void start(); + +void stop(); Review comment: Originally I left these out of the interface because the framework handle starting and stopping the cluster. However, after @ijuma's feedback, I added `boolean autoStart` to ClusterConfig along with these methods so that a test could control cluster startup/shutdown explicitly. If a test does not call `stop`, the framework will stop the cluster. Stopping it a second time does nothing (per the implementation). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10069: MINOR: Add RaftReplicaManager
ijuma commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r571257684 ## File path: core/src/main/scala/kafka/server/DelayedDeleteRecords.scala ## @@ -84,7 +84,7 @@ class DelayedDeleteRecords(delayMs: Long, (false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK) } - case HostedPartition.Deferred(_) => + case HostedPartition.Deferred(_, _, _, _, _) => Review comment: It's cleaner to use `case _: HostedPartition.Deferred`. Otherwise, every time a parameter is added, you have to update this for no good reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests
mumrah commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571257075 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +Zk, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ Review comment: Yes, multiple listeners can be defined. This is something to solve/fix in this new framework. Only one security protocol and listener are exposed here in order to stay similar to how KafkaServerTestHarness works. It defines one security protocol and listener as members: ``` protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT protected def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol) ``` Of course, more listeners can be defined through configuration, but this one is exposed by the test harness and used by many tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571253465 ## File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.api.LeaderAndIsr +import kafka.controller.StateChangeLogger +import kafka.server.MetadataCache +import kafka.utils.CoreUtils.inLock +import kafka.utils.Logging +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantLock +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ + +object RaftMetadataCache { + def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, partitionId: Int): Boolean = { +partitionStates.get(topic).exists { infos => + infos.remove(partitionId) + if (infos.isEmpty) partitionStates.remove(topic) + true +} + } + + def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, + partitionId: Int, + stateInfo: UpdateMetadataPartitionState): Unit = { +val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty) +infos(partitionId) = stateInfo + } +} + + +class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging { + this.logIdent = s"[MetadataCache brokerId=$brokerId] " + + private val lock = new ReentrantLock() + + //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock) + //replace the value with a completely new one. this means reads (which are not under any lock) need to grab + //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. + //multiple reads of this value risk getting different snapshots. + @volatile private var _currentImage: MetadataImage = new MetadataImage() + + private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) + + // This method is the main hotspot when it comes to the performance of metadata requests, + // we should be careful about adding additional logic here. Relatedly, `brokers` is + // `List[Integer]` instead of `List[Int]` to avoid a collection copy. + // filterUnavailableEndpoints exists to support v0 MetadataResponses + private def maybeFilterAliveReplicas(image: MetadataImage, + brokers: java.util.List[Integer], + listenerName: ListenerName, + filterUnavailableEndpoints: Boolean): java.util.List[Integer] = { +if (!filterUnavailableEndpoints) { + brokers +} else { + val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size)) + for (brokerId <- brokers.asScala) { +if (hasAliveEndpoint(image, brokerId, listenerName)) + res.add(brokerId) + } + res +} + } + + def currentImage(): MetadataImage = _currentImage + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. + // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). + private def getPartitionMetadata(image: MetadataImage, topic: St
[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1727#comment-1727 ] Matthias J. Sax commented on KAFKA-9527: Thanks everyone. I'll try to review the PR soon – but we have 2.8.0 deadline and we want to do a 2.6.2 release that I need to take care of first. – We can discuss more details on the PR. And yes, we have flaky tests so don't worry about it. > Application Reset Tool Returns NPE when --to-datetime or --by-duration are > run on --input-topics with empty partitions > --- > > Key: KAFKA-9527 > URL: https://issues.apache.org/jira/browse/KAFKA-9527 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 2.3.0 >Reporter: jbfletch >Assignee: Marco Lotz >Priority: Minor > > When running the streams application reset tool with --by-duration or > --to-datetime if any partitions for a given input topic are empty a NPE is > thrown. I tested this with a topic with 3 partitions, I received a NPE until > all 3 partitions had at least one message. The behavior was the same for > both --to-datetime and --by-duration. > Error below: > Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input > topics [sample-cdc-topic]Following input topics offsets will be reset to (for > consumer group des-demo-stream)ERROR: > java.lang.NullPointerExceptionjava.lang.NullPointerException at > kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at > kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at > kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374) > at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at > kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at > kafka.tools.StreamsResetter.main(StreamsResetter.java:678) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571246502 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +Zk, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); + +/** + * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is + * acting as the controller (since ZK controllers serve both broker and controller roles). + */ +Collection brokers(); + +/** + * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also + * currently the active controller. For Raft-based clusters, this will return all controller servers. + */ +Collection controllers(); + +/** + * Any one of the broker servers. + */ +Optional anyBroker(); + +/** + * Any one of the controller servers. + */ +Optional anyController(); + +/** + * The underlying object which is responsible for setting up and tearing down the cluster. + */ +Object getUnderlying(); + +default T getUnderlying(Class asClass) { +return asClass.cast(getUnderlying()); +} + +Admin createAdminClient(Properties configOverrides); + +default Admin createAdminClient() { +return createAdminClient(new Properties()); +} + +void start(); + +void stop(); Review comment: should this implement `AutoCloseable`? Does code need to remember to close this cluster? Or is that handled automatically by the framework... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums
hachikuji merged pull request #10045: URL: https://github.com/apache/kafka/pull/10045 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571245971 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +Zk, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); + +/** + * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is + * acting as the controller (since ZK controllers serve both broker and controller roles). + */ +Collection brokers(); Review comment: I would prefer to call this method something like `brokerSocketServers`. After all, the `SocketServer` is just one small part of the broker, not the broker itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12301) Support for enum validation in configuration
Jeremy Custenborder created KAFKA-12301: --- Summary: Support for enum validation in configuration Key: KAFKA-12301 URL: https://issues.apache.org/jira/browse/KAFKA-12301 Project: Kafka Issue Type: Improvement Components: config Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Several configuration elements are mapped to internal enums. A typo in configuration will yield error messages that are not descriptive and require the user to find valid values. For example: {code:java} Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:479) at org.apache.kafka.clients.admin.Admin.create(Admin.java:61) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) ... Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol.SASL_PLAINTEXTA at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454) ... 7 more {code} This is easier to troubleshoot. {code:java} Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:479) at org.apache.kafka.clients.admin.Admin.create(Admin.java:61) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) ... Caused by: org.apache.kafka.common.config.ConfigException: Invalid value SASL_PLAINTEXTA for security.protocol. Enum value not found. Valid values are: PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454) ... 7 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571245403 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +Zk, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ Review comment: A cluster can have multiple listeners, right? Is this intended to be the listener that should be used by clients? If so, calling it the client listener makes sense. (Technically we could have multiple client listeners too, but that's much less common) Many clusters run with 3 listeners: * client * inter-broker NON-REPLICATION * replication and we are going to add a 4th: * controller This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #10071: KAFKA-12298: Create LeaderAndIsrRequestBenchmark
jolshan opened a new pull request #10071: URL: https://github.com/apache/kafka/pull/10071 I thought it would be useful to check how the changes to LeaderAndIsrRequests (for KIP-516) affect the handling of the request. This benchmark builds a LeaderAndIsrRequest and calls `kafkaApis.handleLeaderAndIsrRequest`. Other benchmarks for this type of request could be added here as well. I also slightly changed the MetadataRequestBenchmark since I noticed that the MetadataCache used in that benchmark did not add topic IDs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571244427 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { Review comment: Enum values should be capitalized This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571242927 ## File path: core/src/test/java/kafka/test/ClusterConfig.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Represents a requested configuration of a Kafka cluster for integration testing + */ +public class ClusterConfig { + +private final Type type; +private final int brokers; +private final int controllers; +private final String name; +private final boolean autoStart; + +private final String securityProtocol; Review comment: What's the advantage of making this a string rather than using the SecurityProtocol enum? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571241978 ## File path: core/src/test/java/kafka/test/annotation/Type.java ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +public enum Type { Review comment: Java Enums should be capitalized. Also can we add a JavaDoc for each value describing what it is? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration
[ https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman closed KAFKA-10716. -- > Streams processId is unstable across restarts resulting in task mass migration > -- > > Key: KAFKA-10716 > URL: https://issues.apache.org/jira/browse/KAFKA-10716 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Critical > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > The new high availability feature of KIP-441 relies on deterministic > assignment to produce an eventually-stable assignment. The > HighAvailabilityTaskAssignor assigns tasks based on the unique processId > assigned to each client, so if the same set of Kafka Streams applications > participate in a rebalance it should generate the same task assignment every > time. > Unfortunately the processIds aren't stable across restarts. We generate a > random UUID in the KafkaStreams constructor, so each time the process starts > up it would be assigned a completely different processId. Unless this new > processId happens to be in exactly the same order as the previous one, a > single bounce or crash/restart can result in a large scale shuffling of tasks > based on a completely different eventual assignment. > Ultimately we should fix this via KAFKA-10121, but that's a nontrivial > undertaking and this bug merits some immediate relief if we don't intend to > tackle the larger problem in the upcoming releases -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7
ableegoldman commented on pull request #10060: URL: https://github.com/apache/kafka/pull/10060#issuecomment-774266721 Merged to 2.7 and cherrypicked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
cmccabe commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571208259 ## File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala ## @@ -37,7 +36,15 @@ object MetadataBroker { endPoint.name() -> new Node(record.brokerId, endPoint.host, endPoint.port, record.rack) }.toMap, - true) + fenced = true) + } + + def apply(broker: Broker): MetadataBroker = { +new MetadataBroker(broker.id, broker.rack.orNull, + broker.endPoints.map { endpoint => +endpoint.listenerName.value -> new Node(broker.id, endpoint.host, endpoint.port, broker.rack.orNull) + }.toMap, + fenced = false) Review comment: as long as this is just used by the ZK code path there is no harm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12300) Log source broker when handling metadata response
alex wang created KAFKA-12300: - Summary: Log source broker when handling metadata response Key: KAFKA-12300 URL: https://issues.apache.org/jira/browse/KAFKA-12300 Project: Kafka Issue Type: Improvement Reporter: alex wang Sometimes it's helpful to log broker id when an error occurs during metadata request, this can yield insight into the possible cause of the metadata request error, such as a metadata propagation delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279941#comment-17279941 ] jbfletch commented on KAFKA-9527: - Thanks for taking this on [~marcolotz] > Application Reset Tool Returns NPE when --to-datetime or --by-duration are > run on --input-topics with empty partitions > --- > > Key: KAFKA-9527 > URL: https://issues.apache.org/jira/browse/KAFKA-9527 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 2.3.0 >Reporter: jbfletch >Assignee: Marco Lotz >Priority: Minor > > When running the streams application reset tool with --by-duration or > --to-datetime if any partitions for a given input topic are empty a NPE is > thrown. I tested this with a topic with 3 partitions, I received a NPE until > all 3 partitions had at least one message. The behavior was the same for > both --to-datetime and --by-duration. > Error below: > Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input > topics [sample-cdc-topic]Following input topics offsets will be reset to (for > consumer group des-demo-stream)ERROR: > java.lang.NullPointerExceptionjava.lang.NullPointerException at > kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at > kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at > kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374) > at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at > kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at > kafka.tools.StreamsResetter.main(StreamsResetter.java:678) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7
ableegoldman merged pull request #10060: URL: https://github.com/apache/kafka/pull/10060 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12279) Kafka 2.7 stream app issue
[ https://issues.apache.org/jira/browse/KAFKA-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12279. Resolution: Not A Bug > Kafka 2.7 stream app issue > -- > > Key: KAFKA-12279 > URL: https://issues.apache.org/jira/browse/KAFKA-12279 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: prabhu biradar >Priority: Critical > > After starting the stream application below exception is thrown and threads > stop processing. > 2021-02-02T22:29:10.416::AccountId ::Partition ::Offset > ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread > [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following > exception during processing and the thread is going to shut down: > 2021-02-02T22:29:10.416::AccountId ::Partition ::Offset > ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread > [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following > exception during processing and the thread is going to shut down: > java.lang.IllegalStateException: Tried to lookup lag for unknown task 0_13 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318) > at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.util.TreeMap.put(TreeMap.java:552) at > java.util.TreeSet.add(TreeSet.java:255) at > java.util.AbstractCollection.addAll(AbstractCollection.java:344) at > java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1299) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1213) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:939) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:393) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:801) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:763) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:614) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)2021-02-02T22:29:10.416::AccountId > ::Partition ::Offset > ::[com.xx.xx.xx.xx.xxx-client-StreamThread-
[jira] [Commented] (KAFKA-12279) Kafka 2.7 stream app issue
[ https://issues.apache.org/jira/browse/KAFKA-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279938#comment-17279938 ] A. Sophie Blee-Goldman commented on KAFKA-12279: Root cause: the source topic was deleted and recreated with a smaller number of partitions. However, the app was not reset and the local state was not cleared, which left the task directory for 0_13 on disk. Since the new input topics had fewer than 13 partitions, this task was no longer recognized. Since decreasing the partition number on-the-fly without resetting the application is not supported, I'm going to close this as "Not a Bug" > Kafka 2.7 stream app issue > -- > > Key: KAFKA-12279 > URL: https://issues.apache.org/jira/browse/KAFKA-12279 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: prabhu biradar >Priority: Critical > > After starting the stream application below exception is thrown and threads > stop processing. > 2021-02-02T22:29:10.416::AccountId ::Partition ::Offset > ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread > [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following > exception during processing and the thread is going to shut down: > 2021-02-02T22:29:10.416::AccountId ::Partition ::Offset > ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread > [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following > exception during processing and the thread is going to shut down: > java.lang.IllegalStateException: Tried to lookup lag for unknown task 0_13 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318) > at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.util.TreeMap.put(TreeMap.java:552) at > java.util.TreeSet.add(TreeSet.java:255) at > java.util.AbstractCollection.addAll(AbstractCollection.java:344) at > java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1299) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1213) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:939) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:393) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:801) > at > org.apach
[GitHub] [kafka] rondagostino commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums
rondagostino commented on a change in pull request #10045: URL: https://github.com/apache/kafka/pull/10045#discussion_r571174825 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3511,4 +3501,14 @@ object KafkaApis { FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet .iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava) } + + // visible for testing + private[server] def shouldNeverReceive(request: RequestChannel.Request): Exception = { Review comment: Ah, I hadn't tested for those, and I moved based on rewriting tests. Will move. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums
hachikuji commented on a change in pull request #10045: URL: https://github.com/apache/kafka/pull/10045#discussion_r571174163 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3511,4 +3501,14 @@ object KafkaApis { FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet .iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava) } + + // visible for testing + private[server] def shouldNeverReceive(request: RequestChannel.Request): Exception = { Review comment: Why move these and not `unsupported`/`notYetSupported`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571173703 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { +new ZkMetadataCache(brokerId) + } + + def raftMetadataCache(brokerId: Int): RaftMetadataCache = { +new RaftMetadataCache(brokerId) + } +} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class MetadataCache(brokerId: Int) extends Logging { +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: KAFKA-12299 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12299) Follow ups from MetadataCache changes
[ https://issues.apache.org/jira/browse/KAFKA-12299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-12299: - Description: When merging the Raft MetadataCache changes for 2.8, we identified a few follow up items during the code review. See: * https://github.com/apache/kafka/pull/10049 * https://github.com/apache/kafka/pull/10018 was: When merging the Raft MetadataCache changes for 2.8, we identified a few follow up items during the code review. See: https://github.com/apache/kafka/pull/10049 > Follow ups from MetadataCache changes > - > > Key: KAFKA-12299 > URL: https://issues.apache.org/jira/browse/KAFKA-12299 > Project: Kafka > Issue Type: Task >Reporter: David Arthur >Assignee: David Arthur >Priority: Minor > > When merging the Raft MetadataCache changes for 2.8, we identified a few > follow up items during the code review. > See: > * https://github.com/apache/kafka/pull/10049 > * https://github.com/apache/kafka/pull/10018 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571173506 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] Review comment: KAFKA-12299 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12299) Follow ups from MetadataCache changes
David Arthur created KAFKA-12299: Summary: Follow ups from MetadataCache changes Key: KAFKA-12299 URL: https://issues.apache.org/jira/browse/KAFKA-12299 Project: Kafka Issue Type: Task Reporter: David Arthur Assignee: David Arthur When merging the Raft MetadataCache changes for 2.8, we identified a few follow up items during the code review. See: https://github.com/apache/kafka/pull/10049 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe opened a new pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe opened a new pull request #10070: URL: https://github.com/apache/kafka/pull/10070 The quorum controller stores metadata in the KIP-500 metadata log, not in Apache ZooKeeper. Each controller node is a voter in the metadata quorum. The leader of the quorum is the active controller, which processes write requests. The followers are standby controllers, which replay the operations written to the log. If the active controller goes away, a standby controller can take its place. Like the ZooKeeper-based controller, the quorum controller is based on an event queue backed by a single-threaded executor. However, unlike the ZK-based controller, the quorum controller can have multiple operations in flight-- it does not need to wait for one operation to be finished before starting another. Therefore, calls into the QuorumController return CompleteableFuture objects which are completed with either a result or an error when the operation is done. The QuorumController will also time out operations that have been sitting on the queue too long without being processed. In this case, the future is completed with a TimeoutException. The controller uses timeline data structures to store multiple "versions" of its in-memory state simultaneously. "Read operations" read the latest committed state -- essentially, they are reading from a slightly older in-memory snapshot of the state. "Write operations" read and write the latest state. However, we can not return a successful result for a write operation until its state has been committed to the log. Therefore, if a client receives an RPC response, it knows that the requested operation has been performed, and can not be undone by a controller failover. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
hachikuji commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571169638 ## File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.api.LeaderAndIsr +import kafka.controller.StateChangeLogger +import kafka.server.MetadataCache +import kafka.utils.CoreUtils.inLock +import kafka.utils.Logging +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantLock +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ + +object RaftMetadataCache { + def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, partitionId: Int): Boolean = { +partitionStates.get(topic).exists { infos => + infos.remove(partitionId) + if (infos.isEmpty) partitionStates.remove(topic) + true +} + } + + def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, + partitionId: Int, + stateInfo: UpdateMetadataPartitionState): Unit = { +val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty) +infos(partitionId) = stateInfo + } +} + + +class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging { + this.logIdent = s"[MetadataCache brokerId=${brokerId}] " + + private val lock = new ReentrantLock() + + //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock) + //replace the value with a completely new one. this means reads (which are not under any lock) need to grab + //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. + //multiple reads of this value risk getting different snapshots. + @volatile private var _currentImage: MetadataImage = new MetadataImage() + + private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) + + // This method is the main hotspot when it comes to the performance of metadata requests, + // we should be careful about adding additional logic here. Relatedly, `brokers` is + // `List[Integer]` instead of `List[Int]` to avoid a collection copy. + // filterUnavailableEndpoints exists to support v0 MetadataResponses + private def maybeFilterAliveReplicas(image: MetadataImage, + brokers: java.util.List[Integer], + listenerName: ListenerName, + filterUnavailableEndpoints: Boolean): java.util.List[Integer] = { +if (!filterUnavailableEndpoints) { + brokers +} else { + val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size)) + for (brokerId <- brokers.asScala) { +if (hasAliveEndpoint(image, brokerId, listenerName)) + res.add(brokerId) + } + res +} + } + + def currentImage(): MetadataImage = _currentImage + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. + // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). + private def getPartitionMetadata(image: MetadataImage, topi
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571163776 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -485,7 +494,7 @@ class MetadataCacheTest { @Test Review comment: This test is failing in Raft mode. I'll investigate This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571163226 ## File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala ## @@ -37,7 +36,15 @@ object MetadataBroker { endPoint.name() -> new Node(record.brokerId, endPoint.host, endPoint.port, record.rack) }.toMap, - true) + fenced = true) + } + + def apply(broker: Broker): MetadataBroker = { +new MetadataBroker(broker.id, broker.rack.orNull, + broker.endPoints.map { endpoint => +endpoint.listenerName.value -> new Node(broker.id, endpoint.host, endpoint.port, broker.rack.orNull) + }.toMap, + fenced = false) Review comment: I wanted to go ahead and conform to MetadataBroker for both implementations. One side-effect is we are exposing the fenced flag to ZK-based clusters. I've set it to false here since we don't have any notion of broker fencing in the ZK-based metadata. Is there any harm in this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
hachikuji commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571162641 ## File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.api.LeaderAndIsr +import kafka.controller.StateChangeLogger +import kafka.server.MetadataCache +import kafka.utils.CoreUtils.inLock +import kafka.utils.Logging +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantLock +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ + +object RaftMetadataCache { + def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, partitionId: Int): Boolean = { +partitionStates.get(topic).exists { infos => + infos.remove(partitionId) + if (infos.isEmpty) partitionStates.remove(topic) + true +} + } + + def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, + partitionId: Int, + stateInfo: UpdateMetadataPartitionState): Unit = { +val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty) +infos(partitionId) = stateInfo + } +} + + +class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging { + this.logIdent = s"[MetadataCache brokerId=${brokerId}] " + + private val lock = new ReentrantLock() + + //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock) + //replace the value with a completely new one. this means reads (which are not under any lock) need to grab + //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. + //multiple reads of this value risk getting different snapshots. + @volatile private var _currentImage: MetadataImage = new MetadataImage() + + private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) + + // This method is the main hotspot when it comes to the performance of metadata requests, + // we should be careful about adding additional logic here. Relatedly, `brokers` is + // `List[Integer]` instead of `List[Int]` to avoid a collection copy. + // filterUnavailableEndpoints exists to support v0 MetadataResponses + private def maybeFilterAliveReplicas(image: MetadataImage, + brokers: java.util.List[Integer], + listenerName: ListenerName, + filterUnavailableEndpoints: Boolean): java.util.List[Integer] = { +if (!filterUnavailableEndpoints) { + brokers +} else { + val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size)) + for (brokerId <- brokers.asScala) { +if (hasAliveEndpoint(image, brokerId, listenerName)) + res.add(brokerId) + } + res +} + } + + def currentImage(): MetadataImage = _currentImage + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. + // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). + private def getPartitionMetadata(image: MetadataImage, topi
[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
hachikuji commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571151499 ## File path: core/src/main/scala/kafka/server/metadata/MetadataImage.scala ## @@ -118,5 +118,10 @@ case class MetadataImage(partitions: MetadataPartitions, def topicIdToName(uuid: Uuid): Option[String] = { partitions.topicIdToName(uuid) } + + def topicNameToId(name: String): Option[Uuid] = { + Review comment: nit: remove newline ## File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.api.LeaderAndIsr +import kafka.controller.StateChangeLogger +import kafka.server.MetadataCache +import kafka.utils.CoreUtils.inLock +import kafka.utils.Logging +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantLock +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ + +object RaftMetadataCache { + def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, partitionId: Int): Boolean = { +partitionStates.get(topic).exists { infos => + infos.remove(partitionId) + if (infos.isEmpty) partitionStates.remove(topic) + true +} + } + + def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, + partitionId: Int, + stateInfo: UpdateMetadataPartitionState): Unit = { +val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty) +infos(partitionId) = stateInfo + } +} + + +class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging { + this.logIdent = s"[MetadataCache brokerId=${brokerId}] " Review comment: nit: unneeded braces ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] Review comment: nit: We can save it for a follow-up, but it would be nice to drop all the `get` prefixes here ## File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571161027 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -16,20 +16,20 @@ */ package kafka.server +import org.apache.kafka.common.{Node, TopicPartition, Uuid} + import java.util -import java.util.Collections import util.Arrays.asList - -import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.UpdateMetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import java.util.Collections import scala.jdk.CollectionConverters._ class MetadataCacheTest { Review comment: Yea easy enough. I added `@ParameterizedTest` to MetadataCacheTest. One is failing with the Raft metadata cache, so I left that for ZK-only now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
hachikuji commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571149211 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { Review comment: Ok. Probably not a ton of work either way, but will leave it up to you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
hachikuji commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571148172 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -16,20 +16,20 @@ */ package kafka.server +import org.apache.kafka.common.{Node, TopicPartition, Uuid} + import java.util -import java.util.Collections import util.Arrays.asList - -import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.UpdateMetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import java.util.Collections import scala.jdk.CollectionConverters._ class MetadataCacheTest { Review comment: Should be easy to do with all the fancy junit 5 stuff. Since there are no separate tests for `RaftMetadataCache` (unless I'm missing some), I think we should do it here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571145320 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { Review comment: I was thinking it might be easier to refactor in the future if we only need to rename the factory method rather than changing all the `new Class`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
hachikuji commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571142951 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { +new ZkMetadataCache(brokerId) + } + + def raftMetadataCache(brokerId: Int): RaftMetadataCache = { +new RaftMetadataCache(brokerId) + } +} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class MetadataCache(brokerId: Int) extends Logging { +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: Perhaps we can do this as a follow-up? It is nice at the moment to see the diffs clearly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571140218 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] Review comment: Yea, this just got pulled up from the class when I extracted the trait. I'll fix up these comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571139784 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { +new ZkMetadataCache(brokerId) + } + + def raftMetadataCache(brokerId: Int): RaftMetadataCache = { +new RaftMetadataCache(brokerId) + } +} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class MetadataCache(brokerId: Int) extends Logging { +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: I left the ZK implementation in place since it's really the only production implementation for now. It also reduces the size of the diff for this change. I don't feel very strongly about it either way, so I'm happy to relocate it to a separate file This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
hachikuji commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571139392 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { Review comment: nit: I don't think these factories are providing much over the constructors `MetdataCache.zkMetadataCache` vs `new ZkMetadataCache` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571138962 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -16,20 +16,20 @@ */ package kafka.server +import org.apache.kafka.common.{Node, TopicPartition, Uuid} + import java.util -import java.util.Collections import util.Arrays.asList - -import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.UpdateMetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import java.util.Collections import scala.jdk.CollectionConverters._ class MetadataCacheTest { Review comment: Yea, I think we can probably do some parameterization thing or possibly even use test templating similar to #9986 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a change in pull request #10002: MINOR: remove the indent in security doc
omkreddy commented on a change in pull request #10002: URL: https://github.com/apache/kafka/pull/10002#discussion_r571127207 ## File path: docs/security.html ## @@ -649,22 +649,22 @@ Configuring Kafka Brokers Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example: - KafkaServer { -org.apache.kafka.common.security.plain.PlainLoginModule required -username="admin" -password="admin-secret" -user_admin="admin-secret" -user_alice="alice-secret"; -}; +KafkaServer { +org.apache.kafka.common.security.plain.PlainLoginModule required +username="admin" +password="admin-secret" +user_admin="admin-secret" +user_alice="alice-secret"; +}; This configuration defines two users (admin and alice). The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. The set of properties user_userName defines the passwords for all users that connect to the broker and the broker validates all client connections including those from other brokers using these properties. Pass the JAAS config file location as JVM parameter to each Kafka broker: - -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf + -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf Configure SASL port and SASL mechanisms in server.properties as described here. For example: -listeners=SASL_SSL://host.name:port +listeners=SASL_SSL://host.name:port Review comment: @showuon Thanks for the updates. Can you update below lines also? thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums
rondagostino commented on a change in pull request #10045: URL: https://github.com/apache/kafka/pull/10045#discussion_r571124965 ## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZkOrThrow(createException: => Exception): ZkSupport Review comment: Yeah, I prefer it here only because this has worked out so well that I would not be surprised if we tried to use the same approach somewhere else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests
mumrah commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r571122210 ## File path: core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala ## @@ -16,83 +16,94 @@ */ package kafka.server +import integration.kafka.server.IntegrationTestHelper + import java.net.Socket import java.util.Collections import kafka.api.{KafkaSasl, SaslSetup} import kafka.utils.JaasTestUtils import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse} -import org.apache.kafka.common.security.auth.SecurityProtocol +import kafka.test.annotation.{ClusterTest, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.{ClusterConfig, ClusterInstance} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.api.extension.ExtendWith -class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with SaslSetup { - override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT - private val kafkaClientSaslMechanism = "PLAIN" - private val kafkaServerSaslMechanisms = List("PLAIN") - protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) - protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) - override def brokerCount = 1 +import scala.jdk.CollectionConverters._ - @BeforeEach - override def setUp(): Unit = { -startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) -super.setUp() - } - @AfterEach - override def tearDown(): Unit = { -super.tearDown() -closeSasl() +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +class SaslApiVersionsRequestTest(helper: IntegrationTestHelper, + cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(helper, cluster) { + + val kafkaClientSaslMechanism = "PLAIN" + val kafkaServerSaslMechanisms = List("PLAIN") + + private var sasl: SaslSetup = _ + + @BeforeEach + def setupSasl(config: ClusterConfig): Unit = { +sasl = new SaslSetup() {} +sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + config.saslServerProperties().putAll(sasl.kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + config.saslClientProperties().putAll(sasl.kafkaClientSaslProperties(kafkaClientSaslMechanism)) +super.brokerPropertyOverrides(config.serverProperties()) } - @Test + @ClusterTest(securityProtocol = "SASL_PLAINTEXT", clusterType = Type.Zk) def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = { -val socket = connect() +val socket = helper.connect(cluster.brokers().asScala.head, cluster.listener()) try { - val apiVersionsResponse = sendAndReceive[ApiVersionsResponse]( + val apiVersionsResponse = helper.sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) - validateApiVersionsResponse(apiVersionsResponse) + validateApiVersionsResponse(apiVersionsResponse, cluster.listener()) sendSaslHandshakeRequestValidateResponse(socket) } finally { socket.close() } } - @Test + @ClusterTest(securityProtocol = "SASL_PLAINTEXT", clusterType = Type.Zk) Review comment: We could do it in a method specified by `@ClusterTemplate`, or we could add `securityProtocol ` to the `@ClusterTestDefaults` class-level annotation. I'll leave this for a follow-up PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
cmccabe commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571119868 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -16,20 +16,20 @@ */ package kafka.server +import org.apache.kafka.common.{Node, TopicPartition, Uuid} + import java.util -import java.util.Collections import util.Arrays.asList - -import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.UpdateMetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import java.util.Collections import scala.jdk.CollectionConverters._ class MetadataCacheTest { Review comment: we should have some way of running these tests on the raft metadata cache as well as the zk metadata cache. I guess we can do that in a follow-up PR, though This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
cmccabe commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571118214 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { +new ZkMetadataCache(brokerId) + } + + def raftMetadataCache(brokerId: Int): RaftMetadataCache = { +new RaftMetadataCache(brokerId) + } +} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class MetadataCache(brokerId: Int) extends Logging { +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: This is kind of a lot of code so why not have it in its own file? The JavaDoc should also explain that this is for brokers using ZK and not for brokers using the metadata quorum. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
cmccabe commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571116863 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] Review comment: can you put this in javadoc format ? i.e. `@returns` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
cmccabe commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571116386 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( Review comment: The formatting is kind of weird here. Why the blank line after `getTopicMetadata(`? Also, it would be good to have Javadoc for this function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10008: MINOR: Remove ZK dependency for coordinator topics' partition counts
cmccabe merged pull request #10008: URL: https://github.com/apache/kafka/pull/10008 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 closed pull request #10050: MINOR: Don't assume number of dns results (stabilize ClusterConnectio…
chia7712 closed pull request #10050: URL: https://github.com/apache/kafka/pull/10050 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10050: MINOR: Don't assume number of dns results (stabilize ClusterConnectio…
chia7712 commented on pull request #10050: URL: https://github.com/apache/kafka/pull/10050#issuecomment-774149259 @mumrah Thanks for kind reminder. close this now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12298) Create LeaderAndIsrRequestBenchmark
Justine Olshan created KAFKA-12298: -- Summary: Create LeaderAndIsrRequestBenchmark Key: KAFKA-12298 URL: https://issues.apache.org/jira/browse/KAFKA-12298 Project: Kafka Issue Type: Task Reporter: Justine Olshan Assignee: Justine Olshan Since KIP-516 is making some changes to LeaderAndIsrRequests, I thought it would be useful to have a benchmark to compare implementations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino opened a new pull request #10069: MINOR: Add RaftReplicaManager
rondagostino opened a new pull request #10069: URL: https://github.com/apache/kafka/pull/10069 This adds the logic to apply partition metadata when consuming from the Raft-based metadata log. `RaftReplicaManager` extends `ReplicaManager` for now to minimize changes to existing code for the 2.8 release. We will likely adjust this hierarchy at a later time (e.g. introducing a trait and adding a helper to refactor common code). For now, we expose the necessary fields and methods in `ReplicaManager` by changing their scope from `private` to `protected`, and we refactor out a couple of pieces of logic that are shared between the two implementation (stopping replicas and adding log dir fetchers). Existing tests are sufficient to expose regressions in the current `ReplicaManager`. We intend to exercise the new `RaftReplicaManager` code via system tests and unit/integration tests (both to come in later PRs). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10068: MINOR: StopReplicaResp and StopReplicaReq Test should cover all available version
dajac commented on pull request #10068: URL: https://github.com/apache/kafka/pull/10068#issuecomment-774140916 I've re-triggered Jenkins. I will merge once we have a clean run. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10008: MINOR: Remove ZK dependency for coordinator topics' partition counts
cmccabe commented on pull request #10008: URL: https://github.com/apache/kafka/pull/10008#issuecomment-774136720 Failing tests are `org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining` and `org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()` which are not related. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12193) Re-resolve IPs when a client is disconnected
[ https://issues.apache.org/jira/browse/KAFKA-12193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12193: Fix Version/s: 2.6.2 > Re-resolve IPs when a client is disconnected > > > Key: KAFKA-12193 > URL: https://issues.apache.org/jira/browse/KAFKA-12193 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.0 >Reporter: Bob Barrett >Assignee: Bob Barrett >Priority: Major > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > If `client.dns.lookup` is set to `use_all_dns_ips` or > `resolve_canonical_bootstrap_servers_only`, the NetworkClient can store > multiple IPs for each node, and currently it tries each IP in the list when > connecting before re-resolving the IPs. This is useful when first > establishing a connection because it ensures that the client exhausts all > possible IPs. However, in the case where the IPs changed after a connection > was already established, this would cause a reconnecting client to try > several invalid IPs before re-resolving and trying a valid one. Instead, we > should re-resolve DNS when a client disconnects from an established > connection, rather than assuming the all previously-resolved IPs are still > valid. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects
dajac commented on pull request #10061: URL: https://github.com/apache/kafka/pull/10061#issuecomment-774097673 @ableegoldman FYI - We have backported this fix to 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects
dajac merged pull request #10061: URL: https://github.com/apache/kafka/pull/10061 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12274) Transactional operation fails when broker is replaced using the same broker ID.
[ https://issues.apache.org/jira/browse/KAFKA-12274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michał Łukowicz updated KAFKA-12274: Description: Hello Team! One of our clusters is being used to: * process transactional writes * had ack set to all We are using java client and followed all recommendation regarding avoiding dead fencing issues, etc. We spotted the problem during upgrading kafka hosts to stronger machines: * stop old broker * start a new clean broker node (a different hostname) reusing the same broker.id During the operation we found that while kafka is normally replicating partitions to recover after very short period of time (1 - 3 mins) we start to see error on kafka broker: {code:java} broker=13] Error processing append operation on partition org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 51119 at offset 16878080903 in partition : 2930356 (incoming seq. number), 2930213 (current end sequence number){code} And we are starting to see records buffered on the Producer side, and eventually, the producer send requests failed with:: {code:java} Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for :120892 ms has passed since batch creation{code} The only additional thing we observed is that for some reason couple of paritions ISR had been reduced to 1 and then back to 3 when broker finished up replication. The same situation can be observed when adding new brokers to cluster and performing rebalacing (using kafka cruise control) and setting concurrent partition and leader movements to higher value. This does not happen if broker is just stopped - even for longer period of time or restarted - this only happens during host replace. Can you please let me know if this is a bug ... or we are doing something wrong? Kafka 2.6.0 min.insync.replica for topics is set to 1 (tested with set to 2 - no change) replication.factor is 3 all transaction settings are currently default. was: Hello Team! One of our clusters is being used to: * process transactional writes * had ack set to all We are using java client and followed all recommendation regarding avoiding dead fencing issues, etc. We spotted the problem during upgrading kafka hosts to stronger machines: * stop old broker * start a new clean broker node (a different hostname) reusing the same broker.id During the operation we found that while kafka is normally replicating partitions to recover after very short period of time (1 - 3 mins) we start to see error on kafka broker: {code:java} broker=13] Error processing append operation on partition org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 51119 at offset 16878080903 in partition : 2930356 (incoming seq. number), 2930213 (current end sequence number){code} And we are starting to see records buffered on the Producer side, and eventually, the producer send requests failed with:: {code:java} Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for :120892 ms has passed since batch creation{code} The only additional thing we observed is that for some reason couple of paritions ISR had been reduced to 1 and then back to 3 when broker finished up replication. The same situation can be observed when adding new brokers to cluster and performing rebalacing (using kafka cruise control) and setting concurrent partition and leader movements to higher value. This does not happen if broker is just stopped - even for longer period of time or restarted - this only happens during host replace. Can you please let me know if this is a bug ... or we are doing something wrong? Kafka 2.6.0 min.insync.replica for topics is set to 1 replication.factor is 3 all transaction settings are currently default. > Transactional operation fails when broker is replaced using the same broker > ID. > --- > > Key: KAFKA-12274 > URL: https://issues.apache.org/jira/browse/KAFKA-12274 > Project: Kafka > Issue Type: Bug > Components: controller, producer >Affects Versions: 2.6.0 >Reporter: Michał Łukowicz >Priority: Critical > > Hello Team! > One of our clusters is being used to: > * process transactional writes > * had ack set to all > We are using java client and followed all recommendation regarding avoiding > dead fencing issues, etc. > We spotted the problem during upgrading kafka hosts to stronger machines: > * stop old broker > * start a new clean broker node (a different hostname) reusing the same > broker.id > During the operation we found that while kafka is normally replicating > partitions to recover after very short period of time (1 - 3 mins
[GitHub] [kafka] mumrah commented on pull request #10050: MINOR: Don't assume number of dns results (stabilize ClusterConnectio…
mumrah commented on pull request #10050: URL: https://github.com/apache/kafka/pull/10050#issuecomment-774082101 @chia7712 I think this got fixed yesterday on trunk with the addition of a new HostResolver interface. Now we can precisely control what IPs are resolved during testing. https://github.com/apache/kafka/commit/131d4753cfed65ed6dee0a8c754765c97c3d513f This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
dajac commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r571004759 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.ConcurrentHashMap + +import kafka.controller.KafkaController +import kafka.utils.Logging +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.message.CreateTopicsRequestData +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.requests.CreateTopicsRequest +import org.apache.kafka.common.utils.Time + +import scala.collection.Map + +trait AutoTopicCreationManager { + + def createTopics( +topicNames: Set[CreatableTopic], +controllerMutationQuota: ControllerMutationQuota + ): Unit + + def start(): Unit = {} + + def shutdown(): Unit = {} +} + +object AutoTopicCreationManager { + + def apply( +config: KafkaConfig, +metadataCache: MetadataCache, +time: Time, +metrics: Metrics, +threadNamePrefix: Option[String], +adminManager: ZkAdminManager, +controller: KafkaController, +enableForwarding: Boolean + ): AutoTopicCreationManager = { + +val channelManager = + if (enableForwarding) +Some(new BrokerToControllerChannelManager( + controllerNodeProvider = MetadataCacheControllerNodeProvider( +config, metadataCache), + time = time, + metrics = metrics, + config = config, + channelName = "autoTopicCreationChannel", + threadNamePrefix = threadNamePrefix, + retryTimeoutMs = config.requestTimeoutMs.longValue +)) + else +None +new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs) + } +} + +class AutoTopicCreationManagerImpl( + channelManager: Option[BrokerToControllerChannelManager], + adminManager: ZkAdminManager, + controller: KafkaController, + requestTimeout: Int +) extends AutoTopicCreationManager with Logging { + + private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic] + + override def start(): Unit = { +channelManager.foreach(_.start()) + } + + override def shutdown(): Unit = { +channelManager.foreach(_.shutdown()) + } + + override def createTopics(topics: Set[CreatableTopic], +controllerMutationQuota: ControllerMutationQuota): Unit = { Review comment: @hachikuji Sorry for my late reply. I've missed the notification. We have to enforce the quota on the controller exclusively. It is a global quota and we can't really distribute it fairly in the cluster. In this case, it would be great if we could propagate the principal and clientId to the controller to enforce the quota. However, I wonder how we could propagate the error and the delay to the client if the topic creation is throttled. Perhaps, we could reply with `UNKNOW_TOPIC_OR_PARTITION` until the topic can be created. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org