[jira] [Comment Edited] (KAFKA-14934) KafkaClusterTestKit makes FaultHandler accessible
[ https://issues.apache.org/jira/browse/KAFKA-14934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726127#comment-17726127 ] Owen C.H. Leung edited comment on KAFKA-14934 at 5/27/23 2:49 AM: -- Hi [~mumrah] , I think it is already exposed ? https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L518-L524 was (Author: JIRAUSER300460): Hi [~mumrah] , Can I give a try to this ? I'm new to contributing to kafka and want to get my hands dirty with it > KafkaClusterTestKit makes FaultHandler accessible > - > > Key: KAFKA-14934 > URL: https://issues.apache.org/jira/browse/KAFKA-14934 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: David Arthur >Priority: Trivial > Labels: good-first-issue > > In KafkaClusterTestKit, we use a mock fault handler to avoid exiting the > process during tests. It would be useful to expose this fault handler so > tests could verify certain fault conditions (like a broker/controller failing > to start) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207577607 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { Review Comment: This block of code uses already existing logic within `TopicAdmin` to figure out if a topic exists or not. https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L503-L511. I am just caching it because if we established that a topic is deleted for partition p1, then we don't need to check again for other partitions of the same topic within this flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll
jolshan commented on PR #13768: URL: https://github.com/apache/kafka/pull/13768#issuecomment-1565102014 Thanks for the PR! This looks promising. As Ismael said, let's share in trunk first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult
gharris1727 opened a new pull request, #13771: URL: https://github.com/apache/kafka/pull/13771 The DelegatingClassLoader has a large number of fields and methods for keeping track of known PluginDesc objects. It has this in common with the PluginScanResult data object, which has a similar set of fields and methods. On trunk, the PluginScanResult is only used as a return value of one internal function, before it's results are accumulated by the DCL and discarded. To simplify the DCL, we should use the PluginScanResult object as a container to store and accumulate PluginDesc objects, and return it to the caller to allow them to inspect the scanned plugins without interacting with the DCL. Using the PluginScanResult as an accumulator, we can delay writing scan results to the pluginLoaders and aliases fields until after all scanning has taken place. This is done via the new installDiscoveredPlugins method. This prevents plugins being scanned from having an inconsistent delegation path: Previously, because the accumulators were updated as scanning proceeded, plugins may or may not be able to see one another via the DelegatingClassLoader::loadClass method. Now, plugins will _not_ be able to see one another before installDiscoveredPlugins is applied and scanning is finished. This is a first-pass refactor, before pulling the scanning logic out of the DCL to further simplify it. Once external scanning is complete, the caller will call installDiscoveredPlugins to finish initialization of the DCL. In the trunk implementation, there is some order-sensitivity to the scanning process because of the accumulator fields. In particular: 1. Because the classpath is scanned last, plugins on the classpath automatically take precedence over isolated plugins. In order to replicate this effect, PluginDesc objects now explicitly compare their isolated/non-isolated nature, and order classpath plugins first. 2. The alias mechanism attempts to determine if aliases are unique, but allows the _first_ usage of an alias to be applied before denying later ones. The new mechanism still chooses an arbitrary plugin to get the alias, while denying others. I've left a TODO in case we want to discard all conflicted aliases instead of allowing an arbitrary choice. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15031) Add plugin.discovery worker configuration
Greg Harris created KAFKA-15031: --- Summary: Add plugin.discovery worker configuration Key: KAFKA-15031 URL: https://issues.apache.org/jira/browse/KAFKA-15031 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Greg Harris Assignee: Greg Harris Add the worker configuration plugin.discovery as described in KIP-898. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15030) Add connect-plugin-path command line tool
Greg Harris created KAFKA-15030: --- Summary: Add connect-plugin-path command line tool Key: KAFKA-15030 URL: https://issues.apache.org/jira/browse/KAFKA-15030 Project: Kafka Issue Type: New Feature Components: KafkaConnect, tools Reporter: Greg Harris Assignee: Greg Harris Add the connect-plugin-path command line script as described in KIP-898 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: We don't do verification on non-client origin requests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: I suppose I should confirm we won't have an issue when the verification state is blank -- I will make sure of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14991) Improving Producer's record timestamp validation
[ https://issues.apache.org/jira/browse/KAFKA-14991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mehari Beyene reassigned KAFKA-14991: - Assignee: Mehari Beyene > Improving Producer's record timestamp validation > > > Key: KAFKA-14991 > URL: https://issues.apache.org/jira/browse/KAFKA-14991 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: Mehari Beyene >Assignee: Mehari Beyene >Priority: Minor > > When time-based retention is configured, the timestamp provided by the > producer is used by default to determine the retention period of the log. > Customers have the option of changing the configuration to use the broker's > timestamp by overriding the configuration for "log.message.timestamp.type", > but by default, the producer's timestamp is used. The producer's record > timestamp can be in the past or future. Kafka determines the retention time > of the log by comparing the broker's time with the record's time. > Arguably, there can be use cases for a producer to send records with > timestamps that are in the past (for example, for replaying old data), but it > is inaccurate for records to have a timestamp that is far in the future > compared to the broker's current time. > There is a configurable property called "message.timestamp.difference.max.ms" > that customers can use to control the allowed time difference between the > broker's current time and the record timestamp. However, the validation from > the Kafka code side can be improved by rejecting records with future > timestamps from being written in the first place. > Customers have run into this issue in the past where a producer is configured > erroneously to set the record timestamp in nanoseconds instead of > milliseconds, resulting in a record timestamp that is in the future, and the > time-based retention policy did not kick in as expected. > The improvement I am proposing is to add basic validation in > org.apache.kafka.storage.internals.log.LogValidator to reject record > timestamps that are in the future compared to the broker current timestamp > after accounting for a sensible tolerance for potential clock skew. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: We currently don't. ~I guess there is an argument that we could have a marker come in before a fetch response.~ EDIT: if we don't replicate in order we have a problem, so I don't think we need to cover this. I'm not sure how this would be implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration
mumrah commented on code in PR #13767: URL: https://github.com/apache/kafka/pull/13767#discussion_r1207482435 ## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ## @@ -145,44 +144,47 @@ class ZkMigrationClient( topicClient.iterateTopics( util.EnumSet.allOf(classOf[TopicVisitorInterest]), new TopicVisitor() { - override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = { -if (!topicBatch.isEmpty) { - recordConsumer.accept(topicBatch) - topicBatch = new util.ArrayList[ApiMessageAndVersion]() -} +override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = { + if (!topicBatch.isEmpty) { +recordConsumer.accept(topicBatch) +topicBatch = new util.ArrayList[ApiMessageAndVersion]() + } -topicBatch.add(new ApiMessageAndVersion(new TopicRecord() - .setName(topicName) - .setTopicId(topicId), 0.toShort)) - } + topicBatch.add(new ApiMessageAndVersion(new TopicRecord() +.setName(topicName) +.setTopicId(topicId), 0.toShort)) - override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = { -val record = new PartitionRecord() - .setTopicId(topicIdPartition.topicId()) - .setPartitionId(topicIdPartition.partition()) - .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava) - .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava) - .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava) - .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava) - .setLeader(partitionRegistration.leader) - .setLeaderEpoch(partitionRegistration.leaderEpoch) - .setPartitionEpoch(partitionRegistration.partitionEpoch) - .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value()) -partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_)) - partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_)) -topicBatch.add(new ApiMessageAndVersion(record, 0.toShort)) - } + // This breaks the abstraction a bit, but the topic configs belong in the topic batch Review Comment: I was considering the fact that we don't atomically apply the migration records during the migration. I think it's possible for the controller or broker to publish the migration metadata before it's all committed. In this case, I think it's probably safer to include the config records with the topic batch. This won't be an issue once we implement KIP-868. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request, #13770: MINOR: Add config to producerStateManager config
jolshan opened a new pull request, #13770: URL: https://github.com/apache/kafka/pull/13770 Originally part of https://github.com/apache/kafka/pull/13608/files. Since there are so many files changed, I decided to just pull this out into its own PR. I have moved this config into producer state manager so it can be checked easily under the log lock when we are about to append. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request, #13769: MINOR: Covering all epoch cases in add partitions to txn manager
jolshan opened a new pull request, #13769: URL: https://github.com/apache/kafka/pull/13769 Originally part of https://github.com/apache/kafka/pull/13608, Artem made a good point that this change was unrelated, so I'm making a minor PR to cover it. Cleaning up the AddPartitionsToTxnManager and covering the 3 epoch cases more clearly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
cmccabe commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1207395237 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: I agree with you that 5 minutes is way too long to "mute" broker removals. That was a miss in the orignial PR. (As a side note, broker session is not 18 seconds. It is 9 seconds.) I thought about this more and I think we may not need the overload state I originally wanted to introduce at all. We can simply do some basic processing on the heartbeat when we time it out. Specifically, we can update the "last seen time" of the broker. This will avoid the "congestion meltdown" behavior without introducing a new state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
cmccabe commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1207382044 ## server-common/src/main/java/org/apache/kafka/server/metrics/WindowedEventCounter.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.metrics; + +import java.util.Arrays; +import java.util.Objects; + +/** + * Counts the number of events that happen within a given time window. Not thread safe. + */ +public final class WindowedEventCounter { +private final long[] eventTimes; + +private final long window; + +private int startIndex; + +private int endIndex; + +public WindowedEventCounter( +int maxCount, +long window +) { Review Comment: You can use any units you'd like here. For example it would be possible to use either ms or ns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207368803 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exception. - if (currentTransactionData != null) { -val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) - Errors.INVALID_PRODUCER_EPOCH -else - Errors.NETWORK_EXCEPTION -val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() -currentTransactionData.topics().forEach { topic => - topic.partitions().forEach { partition => -topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) - } + // There are 3 cases if we already have existing data + // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced + // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception + // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify + if (existingTransactionData != null) { +if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) { +val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION + val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId()) + existingNodeAndTransactionData.transactionData.remove(transactionData) + oldCallback(topicPartitionsToError(existingTransactionData, error)) +} else { + // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately. + callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH)) + return } -val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) -currentNodeAndTransactionData.transactionData.remove(transactionData) -oldCallback(topicPartitionsToError.toMap) } - currentNodeAndTransactionData.transactionData.add(transactionData) - currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + Review Comment: I'll separate this out into a new PR as I'm already splitting this up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207333185 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig, val sTime = time.milliseconds val transactionalProducerIds = mutable.HashSet[Long]() + var verificationState: Optional[VerificationState] = Optional.empty() Review Comment: I don't think we would get a new one. We only need one per transaction right? So either we succeed and no longer need to worry about the verification, or the partition is retried and get the same verification object from the first time. We do have a test with multiple partitions but I would have to check if it also checks the verification state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207330097 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exception. - if (currentTransactionData != null) { -val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) - Errors.INVALID_PRODUCER_EPOCH -else - Errors.NETWORK_EXCEPTION -val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() -currentTransactionData.topics().forEach { topic => - topic.partitions().forEach { partition => -topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) - } + // There are 3 cases if we already have existing data + // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced + // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception + // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify + if (existingTransactionData != null) { +if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) { +val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION + val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId()) + existingNodeAndTransactionData.transactionData.remove(transactionData) + oldCallback(topicPartitionsToError(existingTransactionData, error)) +} else { + // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately. + callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH)) + return } -val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) -currentNodeAndTransactionData.transactionData.remove(transactionData) -oldCallback(topicPartitionsToError.toMap) } - currentNodeAndTransactionData.transactionData.add(transactionData) - currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + Review Comment: yes - we didn't really cover these epoch cases before, and I thought it would be good to include them for completeness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: We currently don't. I guess there is an argument that we could have a marker come in before a fetch response. I'm not sure how this would be implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions
cmccabe closed pull request #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions URL: https://github.com/apache/kafka/pull/13766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions
cmccabe commented on PR #13766: URL: https://github.com/apache/kafka/pull/13766#issuecomment-1564905334 Thanks @edoardocomar. Closing as duplicate of #13742 . If you're interested in doing more here, check out the discussion on the other PR about a possible KIP we could have (to add some configurable limits) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
artemlivshits commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1206199912 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,10 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Optional[VerificationState] = lock synchronized { +val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence) +if (entry.currentTxnFirstOffset.isPresent) { Review Comment: Maybe add a comment why we don't need verification if currentTxnFirstOffset.isPresent. ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { return false; } } + +public void maybeAddVerificationState() { +// If we already have a verification state, we can reuse it. This is because we know this is the same transaction +// as the state is cleared upon writing a control marker. +if (!this.verificationState.isPresent()) +this.verificationState = Optional.of(new VerificationState()); Review Comment: This creates an object of class Optional that points to an object of class Verification state, so we get an extra object for every producer entry. We just need a plain value of an Object to avoid extra overhead. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: Do we ever go through verification logic for followers? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state Review Comment: Maybe add more comments on how we validate transaction state. Also maybe not here but at least somewhere we should have a detailed comment about the race condition we're addressing and specifically how the verificationState solves the ABA problem. ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationState.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * This class serves as a unique object to ensure the same transaction is being verified. + * When verification starts, this object is created and checked before append to ensure the producer state entry + * is not modified (via ending the transaction) before the record is appended. + */ +public class VerificationState { Review Comment: Do we need a separate class for that? I think we could just use Object, because we just compare references, not values. ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData =
[GitHub] [kafka] ijuma commented on pull request #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll
ijuma commented on PR #13768: URL: https://github.com/apache/kafka/pull/13768#issuecomment-1564903842 We typically make changes to master first. Would you be willing to submit a PR for that instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
cmccabe commented on PR #13742: URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564904075 Thanks for all the reviews, and thanks @mumrah for the LGTM. Since this is a 3.5 blocker I am getting it in today so that it will be in the next RC. As I said before, this doesn't add any new limits, but just prevents damage to the controller when the existing limits are exceeded. However, the discussion about limits here was good. I think we should consider a follow-on KIP to make the maximum number of records per user operation configurable, and possibly add a configurabe partitions_max as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
cmccabe merged PR #13742: URL: https://github.com/apache/kafka/pull/13742 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll
[ https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726728#comment-17726728 ] Ruslan Scherbakov edited comment on KAFKA-9693 at 5/26/23 8:05 PM: --- Related pull request: [https://github.com/apache/kafka/pull/13768] The issue with repeating latency spikes during Kafka log segments rolling still reproduced on the latest versions including kafka_2.13-3.4.0. It was found that flushing Kafka snapshot file during segments rolling blocks producer request handling thread for some time: [https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452] {code:java} private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { ... val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE) try { fileChannel.write(buffer) fileChannel.force(true) <- here } finally { fileChannel.close() }...{code} More partitions - more cumulative latency effect observed.}}{}}} Suggested fix offloads flush (fileChannel.force) operation to the background thread similar to (but not exactly) how it was done in the UnifiedLog.scala: {code:java} def roll( ... // Schedule an asynchronous flush of the old segment scheduler.schedule("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) }{code} The benchmarking using this fix shows significant reduction in repeating latency spikes: *Test config:* AWS 3 node cluster (i3en.2xlarge) zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker 1 loadgen (m5n.8xlarge) - OpenMessaging benchmark ([OMB|https://github.com/openmessaging/benchmark]) 1 zookeeper (t2.small) acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m *variation 1:* partitions=10 ||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched|| |endToEnd service_time (ms) p50 max|2.00|2.00| |endToEnd service_time (ms) p75 max|3.00|2.00| |endToEnd service_time (ms) p95 max|94.0|3.00| |endToEnd service_time (ms) p99 max|290|6.00| |endToEnd service_time (ms) p99.9 max|355|21.0| |endToEnd service_time (ms) p99.99 max|372|34.0| |endToEnd service_time (ms) p100 max|374|36.0| |publish service_time (ms) p50 max|1.70|1.67| |publish service_time (ms) p75 max|2.23|2.09| |publish service_time (ms) p95 max|90.7|2.82| |publish service_time (ms) p99 max|287|4.69| |publish service_time (ms) p99.9 max|353|19.6| |publish service_time (ms) p99.99 max|369|31.3| |publish service_time (ms) p100 max|371|33.5| ||kafka||endToEnd chart|| |kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png!|https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png]| |kafka_2.13-3.4.0 patched|[!https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png!|https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png]| latency score improved up to 10x times in high percentiles ^^^, spikes almost invisible *variation 2:* partitions=100 ||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched|| |endToEnd service_time (ms) p50 max|91.0|2.00| |endToEnd service_time (ms) p75 max|358|3.00| |endToEnd service_time (ms) p95 max|1814|4.00| |endToEnd service_time (ms) p99 max|2777|21.0| |endToEnd service_time (ms) p99.9 max|3643|119| |endToEnd service_time (ms) p99.99 max|3724|141| |endToEnd service_time (ms) p100 max|3726|143| |publish service_time (ms) p50 max|77.4|1.92| |publish service_time (ms) p75 max|352|2.35| |publish service_time (ms) p95 max|1748|3.80| |publish service_time (ms) p99 max|2740|18.9| |publish service_time (ms) p99.9 max|3619|116| |publish service_time (ms) p99.99 max|3720|139| |publish service_time (ms) p100 max|3722|141| |endToEnd service_time| | | ||kafka||endToEnd chart|| |kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png!|https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png]| |kafka_2.13-3.4.0 patched|[!https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png!|https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png]| latency score improved up to 25x times in high percentiles ^^^ The fix was done for 3.4 branch - scala version of ProducerStateManager. Trunk needs corresponding fix for ProducerStateManager.java. was (Author: novosibman): Related pull request: [https://github.com/apache/kafka/pull/13768] The issue with repeating latency spikes during Kafka log segments rolling still reproduced on the latest versions including
[jira] [Comment Edited] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll
[ https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726728#comment-17726728 ] Ruslan Scherbakov edited comment on KAFKA-9693 at 5/26/23 8:00 PM: --- Related pull request: [https://github.com/apache/kafka/pull/13768] The issue with repeating latency spikes during Kafka log segments rolling still reproduced on the latest versions including kafka_2.13-3.4.0. It was found that flushing Kafka snapshot file during segments rolling blocks producer request handling thread for some time: [https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452] {code:java} private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { ... val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE) try { fileChannel.write(buffer) fileChannel.force(true) <- here } finally { fileChannel.close() }...{code} More partitions - more cumulative latency effect observed.{{{}{}}} Suggested fix offloads flush (fileChannel.force) operation to the background thread similar to (but not exactly) how it was done in the UnifiedLog.scala: {code:java} def roll( ... // Schedule an asynchronous flush of the old segment scheduler.schedule("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) }{code} The benchmarking using this fix shows significant reduction in repeating latency spikes: *Test config:* AWS 3 node cluster (i3en.2xlarge) zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker 1 loadgen (m5n.8xlarge) - OpenMessaging benchmark ([OMB|https://github.com/openmessaging/benchmark]) 1 zookeeper (t2.small) acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m *variation 1:* partitions=10 ||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched|| |endToEnd service_time (ms) p50 max|2.00|2.00| |endToEnd service_time (ms) p75 max|3.00|2.00| |endToEnd service_time (ms) p95 max|94.0|3.00| |endToEnd service_time (ms) p99 max|290|6.00| |endToEnd service_time (ms) p99.9 max|355|21.0| |endToEnd service_time (ms) p99.99 max|372|34.0| |endToEnd service_time (ms) p100 max|374|36.0| |publish service_time (ms) p50 max|1.70|1.67| |publish service_time (ms) p75 max|2.23|2.09| |publish service_time (ms) p95 max|90.7|2.82| |publish service_time (ms) p99 max|287|4.69| |publish service_time (ms) p99.9 max|353|19.6| |publish service_time (ms) p99.99 max|369|31.3| |publish service_time (ms) p100 max|371|33.5| ||kafka||endToEnd chart|| |kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png|width=703,height=281!|https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png]| |kafka_2.13-3.4.0 patched|[!https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png|width=700,height=280!|https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png]| latency score improved up to 10x times in high percentiles ^^^, spikes almost invisible *variation 2:* partitions=100 ||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched|| |endToEnd service_time (ms) p50 max|91.0|2.00| |endToEnd service_time (ms) p75 max|358|3.00| |endToEnd service_time (ms) p95 max|1814|4.00| |endToEnd service_time (ms) p99 max|2777|21.0| |endToEnd service_time (ms) p99.9 max|3643|119| |endToEnd service_time (ms) p99.99 max|3724|141| |endToEnd service_time (ms) p100 max|3726|143| |publish service_time (ms) p50 max|77.4|1.92| |publish service_time (ms) p75 max|352|2.35| |publish service_time (ms) p95 max|1748|3.80| |publish service_time (ms) p99 max|2740|18.9| |publish service_time (ms) p99.9 max|3619|116| |publish service_time (ms) p99.99 max|3720|139| |publish service_time (ms) p100 max|3722|141| |endToEnd service_time| | | ||kafka||endToEnd chart|| |kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png|width=645,height=258!|https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png]| |kafka_2.13-3.4.0 patched|[!https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png|width=643,height=257!|https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png]| latency score improved up to 25x times in high percentiles ^^^ The fix was done for 3.4 branch - scala version of ProducerStateManager. Trunk needs corresponding fix for ProducerStateManager.java. was (Author: novosibman): Related pull request: https://github.com/apache/kafka/pull/13768 The issue with repeating latency spikes during Kafka log
[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll
[ https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726728#comment-17726728 ] Ruslan Scherbakov commented on KAFKA-9693: -- Related pull request: https://github.com/apache/kafka/pull/13768 The issue with repeating latency spikes during Kafka log segments rolling still reproduced on the latest versions including kafka_2.13-3.4.0. It was found that flushing Kafka snapshot file during segments rolling blocks producer request handling thread for some time: [https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452] {code:java} private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { ... val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE) try { fileChannel.write(buffer) fileChannel.force(true) <- here } finally { fileChannel.close() }...{code} More partitions - more cumulative latency effect observed. {{{}{}}}{{{}{}}} Suggested fix offloads flush (fileChannel.force) operation to the background thread similar to (but not exactly) how it was done in the UnifiedLog.scala: {code:java} def roll( ... // Schedule an asynchronous flush of the old segment scheduler.schedule("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) }{code} The benchmarking using this fix shows significant reduction in repeating latency spikes: {{}} Test config: AWS 3 node cluster (i3en.2xlarge) zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker 1 loadgen (m5n.8xlarge) - OpenMessaging benchmark ([OMB|https://github.com/openmessaging/benchmark]) 1 zookeeper (t2.small) acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m h3. variation 1: partitions=10 ||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched|| |endToEnd service_time (ms) p50 max|2.00|2.00| |endToEnd service_time (ms) p75 max|3.00|2.00| |endToEnd service_time (ms) p95 max|94.0|3.00| |endToEnd service_time (ms) p99 max|290|6.00| |endToEnd service_time (ms) p99.9 max|355|21.0| |endToEnd service_time (ms) p99.99 max|372|34.0| |endToEnd service_time (ms) p100 max|374|36.0| |publish service_time (ms) p50 max|1.70|1.67| |publish service_time (ms) p75 max|2.23|2.09| |publish service_time (ms) p95 max|90.7|2.82| |publish service_time (ms) p99 max|287|4.69| |publish service_time (ms) p99.9 max|353|19.6| |publish service_time (ms) p99.99 max|369|31.3| |publish service_time (ms) p100 max|371|33.5| ||kafka||endToEnd chart|| |kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png|width=703,height=281!|https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png]| |kafka_2.13-3.4.0 patched|[!https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png|width=700,height=280!|https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png]| latency score improved up to 10x times in high percentiles ^^^, spikes almost invisible h3. variation 2: partitions=100 ||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched|| |endToEnd service_time (ms) p50 max|91.0|2.00| |endToEnd service_time (ms) p75 max|358|3.00| |endToEnd service_time (ms) p95 max|1814|4.00| |endToEnd service_time (ms) p99 max|2777|21.0| |endToEnd service_time (ms) p99.9 max|3643|119| |endToEnd service_time (ms) p99.99 max|3724|141| |endToEnd service_time (ms) p100 max|3726|143| |publish service_time (ms) p50 max|77.4|1.92| |publish service_time (ms) p75 max|352|2.35| |publish service_time (ms) p95 max|1748|3.80| |publish service_time (ms) p99 max|2740|18.9| |publish service_time (ms) p99.9 max|3619|116| |publish service_time (ms) p99.99 max|3720|139| |publish service_time (ms) p100 max|3722|141| |endToEnd service_time| | | ||kafka||endToEnd chart|| |kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png|width=645,height=258!|https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png]| |kafka_2.13-3.4.0 patched|[!https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png|width=643,height=257!|https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png]| latency score improved up to 25x times in high percentiles ^^^ The fix was done for 3.4 branch - scala version of ProducerStateManager. Trunk needs corresponding fix for ProducerStateManager.java. > Kafka latency spikes caused by log segment flush on roll > > > Key: KAFKA-9693 >
[GitHub] [kafka] novosibman opened a new pull request, #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll
novosibman opened a new pull request, #13768: URL: https://github.com/apache/kafka/pull/13768 Related issue https://issues.apache.org/jira/browse/KAFKA-9693 The issue with repeating latency spikes during Kafka log segments rolling still reproduced on the latest versions including kafka_2.13-3.4.0. It was found that flushing Kafka snapshot file during segments rolling blocks producer request handling thread for some time: https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452 ``` private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { ... val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE) try { fileChannel.write(buffer) fileChannel.force(true) <- here } finally { fileChannel.close() }... ``` More partitions - more cumulative latency effect observed. Suggested fix offloads flush (fileChannel.force) operation to the background thread similar to (but not exactly) how it was done in the UnifiedLog.scala: ``` def roll( ... // Schedule an asynchronous flush of the old segment scheduler.schedule("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) } ``` The benchmarking using this fix shows significant reduction in repeating latency spikes: test config: AWS 3 node cluster (i3en.2xlarge) zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker 1 loadgen (m5n.8xlarge) - OpenMessaging benchmark ([OMB](https://github.com/openmessaging/benchmark)) 1 zookeeper (t2.small) acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m ### variation 1: partitions=10 metric | kafka_2.13-3.4.0 | kafka_2.13-3.4.0 patched -- | -- | -- endToEnd service_time (ms) p50 max | 2.00 | 2.00 endToEnd service_time (ms) p75 max | 3.00 | 2.00 endToEnd service_time (ms) p95 max | 94.0 | 3.00 endToEnd service_time (ms) p99 max | 290 | 6.00 endToEnd service_time (ms) p99.9 max | 355 | 21.0 endToEnd service_time (ms) p99.99 max | 372 | 34.0 endToEnd service_time (ms) p100 max | 374 | 36.0 publish service_time (ms) p50 max | 1.70 | 1.67 publish service_time (ms) p75 max | 2.23 | 2.09 publish service_time (ms) p95 max | 90.7 | 2.82 publish service_time (ms) p99 max | 287 | 4.69 publish service_time (ms) p99.9 max | 353 | 19.6 publish service_time (ms) p99.99 max | 369 | 31.3 publish service_time (ms) p100 max | 371 | 33.5 kafka | endToEnd chart -- | -- kafka_2.13-3.4.0 | ![image](https://github.com/apache/kafka/assets/6793713/ec329711-47d4-459f-92d7-06310b770023) kafka_2.13-3.4.0 patched | ![image](https://github.com/apache/kafka/assets/6793713/e5aeb6a6-d33a-4d57-be80-c916fa1f05be) latency score improved up to 10x times in high percentiles ^^^, spikes almost invisible ### variation 2: partitions=100 metric | kafka_2.13-3.4.0 | kafka_2.13-3.4.0 patched -- | -- | -- endToEnd service_time (ms) p50 max | 91.0 | 2.00 endToEnd service_time (ms) p75 max | 358 | 3.00 endToEnd service_time (ms) p95 max | 1814 | 4.00 endToEnd service_time (ms) p99 max | 2777 | 21.0 endToEnd service_time (ms) p99.9 max | 3643 | 119 endToEnd service_time (ms) p99.99 max | 3724 | 141 endToEnd service_time (ms) p100 max | 3726 | 143 publish service_time (ms) p50 max | 77.4 | 1.92 publish service_time (ms) p75 max | 352 | 2.35 publish service_time (ms) p95 max | 1748 | 3.80 publish service_time (ms) p99 max | 2740 | 18.9 publish service_time (ms) p99.9 max | 3619 | 116 publish service_time (ms) p99.99 max | 3720 | 139 publish service_time (ms) p100 max | 3722 | 141 endToEnd service_time kafka | endToEnd chart -- | -- kafka_2.13-3.4.0 | ![image](https://github.com/apache/kafka/assets/6793713/dc6e9820-c3b7-4bd0-8dac-bce9f3886d91) kafka_2.13-3.4.0 patched | ![image](https://github.com/apache/kafka/assets/6793713/113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5) latency score improved up to 25x times in high percentiles ^^^ The fix was done for 3.4 branch - scala version of ProducerStateManager. Trunk needs corresponding fix for ProducerStateManager.java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration
cmccabe commented on code in PR #13767: URL: https://github.com/apache/kafka/pull/13767#discussion_r1207234024 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -194,23 +195,44 @@ void handleTopicsDelta(Function deletedTopicNameResolver, TopicsDe } void handleConfigsSnapshot(ConfigurationsImage configsImage) { -Set brokersToUpdate = new HashSet<>(); +Set newResources = new HashSet<>(configsImage.resourceData().keySet()) Review Comment: I think we should throw an exception if we get a ConfigResource whose type isn't BROKER or TOPIC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration
cmccabe commented on code in PR #13767: URL: https://github.com/apache/kafka/pull/13767#discussion_r1207225622 ## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ## @@ -145,44 +144,47 @@ class ZkMigrationClient( topicClient.iterateTopics( util.EnumSet.allOf(classOf[TopicVisitorInterest]), new TopicVisitor() { - override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = { -if (!topicBatch.isEmpty) { - recordConsumer.accept(topicBatch) - topicBatch = new util.ArrayList[ApiMessageAndVersion]() -} +override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = { + if (!topicBatch.isEmpty) { +recordConsumer.accept(topicBatch) +topicBatch = new util.ArrayList[ApiMessageAndVersion]() + } -topicBatch.add(new ApiMessageAndVersion(new TopicRecord() - .setName(topicName) - .setTopicId(topicId), 0.toShort)) - } + topicBatch.add(new ApiMessageAndVersion(new TopicRecord() +.setName(topicName) +.setTopicId(topicId), 0.toShort)) - override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = { -val record = new PartitionRecord() - .setTopicId(topicIdPartition.topicId()) - .setPartitionId(topicIdPartition.partition()) - .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava) - .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava) - .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava) - .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava) - .setLeader(partitionRegistration.leader) - .setLeaderEpoch(partitionRegistration.leaderEpoch) - .setPartitionEpoch(partitionRegistration.partitionEpoch) - .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value()) -partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_)) - partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_)) -topicBatch.add(new ApiMessageAndVersion(record, 0.toShort)) - } + // This breaks the abstraction a bit, but the topic configs belong in the topic batch Review Comment: It's not really required for the topic config records to come right after the topics. It would be OK to do it in a separate section as we do with snapshots. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13757: [WIP] Fix snapshot load during dual write.
mumrah commented on PR #13757: URL: https://github.com/apache/kafka/pull/13757#issuecomment-1564750482 The jira for this is KAFKA-15017 btw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions
mumrah commented on PR #13766: URL: https://github.com/apache/kafka/pull/13766#issuecomment-1564748396 @edoardocomar, thanks for taking a look at this. This issue is a little bit tricky since PartitionRecords are not the only thing inside the topic creation batch. There's also a TopicRecord and an unbounded number of ConfigRecords. @cmccabe has a related PR #13742 which deals with overly large batches. Once we implement KIP-868 (which will hopefully be very soon), I don't think we want to impose any limits on the partition count except by what is enforced by a user-supplied CreateTopicPolicy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
cmccabe commented on PR #13742: URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564712814 Hi all, Thanks for the reviews and comments. > @divijvaidya : But I was wondering if an additional guard could be to have a default TopicCreationPolicy with a MaxValue of X number of topics per requests (X is probably <= 10K). On every topic creation request, we will apply the policy and ensure that the request gets rejected upfront before entering the system. We could extend this pattern for other use cases where we would like to restrict range of certain parameters in the requests. What do you think? I like this idea, but it would require a KIP to implement. > @divijvaidya : I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it? The documentation does not call out any limitation on max topics that I can create in one API call. How do I know what the limit is? The alternative approach I proposed above (topic policy limitation) solves this. We can document policies with the constraints and follow a similar pattern for other out of bound configuration/request params. I think the thing to keep in mind is that this PR doesn't make any request fail that wouldn't have already failed. > @mumrah : Colin can correct me if I'm mistaken, but I believe this patch is mainly about closing an existing edge case until we implement KIP-868 (metadata transactions). Once we have transactions in the controller, we can allow arbitrarily large batches of records. Yes, that's correct. > @mumrah : I also wondered if we could solve this in ControllerResult rather than in each control manager separately. I think the issue is that people can request truly gigantic, garbage-collector killing lists of records to be constructed. You want to cut them off before they build the giant list, not after. > @mumrah : Will we remove this logic once transactions are implemented? I think we'll need some kind of limit even with metadata transactions in place. But it will be a limit not set by the implementation, but by our policy. > @edoardocomar : I had just done an alternative smaller PR for the same issue Thank you, Edoardo, I guess we were thinking along the same lines. One thing to keep in mind is that the problem is bigger than just CreateTopics. Any operation can be too big and cause implementation problems. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
cmccabe commented on code in PR #13742: URL: https://github.com/apache/kafka/pull/13742#discussion_r1207123242 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -457,9 +466,13 @@ private Throwable handleEventException(String name, long endProcessingTime = time.nanoseconds(); long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS); -if (exception instanceof ApiException) { +if ((exception instanceof ApiException) || +(exception instanceof BoundedListTooLongException)) { log.info("{}: failed with {} in {} us. Reason: {}", name, exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); +if (exception instanceof BoundedListTooLongException) { +exception = new UnknownServerException(exception.getMessage()); Review Comment: yeah that's fair. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
cmccabe commented on code in PR #13742: URL: https://github.com/apache/kafka/pull/13742#discussion_r1207117766 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -160,6 +161,14 @@ * the controller can fully initialize. */ public final class QuorumController implements Controller { +/** + * The maximum records any user-initiated operation is allowed to generate. + */ +final static int MAX_RECORDS_PER_USER_OP = 1; Review Comment: It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 1. I will make this clearer by initializing them to the same values. ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -160,6 +161,14 @@ * the controller can fully initialize. */ public final class QuorumController implements Controller { +/** + * The maximum records any user-initiated operation is allowed to generate. + */ +final static int MAX_RECORDS_PER_USER_OP = 1; Review Comment: It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 1. I will make this clearer by initializing one to the other. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
cmccabe commented on code in PR #13742: URL: https://github.com/apache/kafka/pull/13742#discussion_r1207117766 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -160,6 +161,14 @@ * the controller can fully initialize. */ public final class QuorumController implements Controller { +/** + * The maximum records any user-initiated operation is allowed to generate. + */ +final static int MAX_RECORDS_PER_USER_OP = 1; Review Comment: It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration
mumrah commented on code in PR #13767: URL: https://github.com/apache/kafka/pull/13767#discussion_r1207096649 ## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ## @@ -145,44 +144,47 @@ class ZkMigrationClient( topicClient.iterateTopics( util.EnumSet.allOf(classOf[TopicVisitorInterest]), new TopicVisitor() { - override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = { Review Comment: Just fixing the indentation here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #13767: KAFKA-15004: Fix configuration dual-write during migration
mumrah opened a new pull request, #13767: URL: https://github.com/apache/kafka/pull/13767 This PR builds on top of #13736. Fixes the following: * Topic configs are not sycned while handling snapshot. * New broker/topic configs in KRaft that did not exist in ZK will not be sync'd to ZK. * The sensitive configs are not encoded while writing them to Zookeeper. * Handle topic configs in ConfigMigrationClient and KRaftMigrationZkWriter#handleConfigsSnapshot Added tests to ensure we no longer have the above mentioned issues. Co-Authored-By: Akhilesh C -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14996) The KRaft controller should properly handle overly large user operations
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726688#comment-17726688 ] Edoardo Comar commented on KAFKA-14996: --- Opened a PR to allow responding gracefully with INVALID_PARTITION error to the clients https://github.com/apache/kafka/pull/13766 > The KRaft controller should properly handle overly large user operations > > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.5.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Blocker > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on PR #13678: URL: https://github.com/apache/kafka/pull/13678#issuecomment-1564684675 > @philipnee I will try to review it this week. Thanks! Hi @dajac, did you already get the chance to look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edoardocomar opened a new pull request, #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions
edoardocomar opened a new pull request, #13766: URL: https://github.com/apache/kafka/pull/13766 Limit partition count in Create Topic and Create Partitions to avoid exceeding the QuorumController's MAX_RECORDS_PER_BATCH, and respond gracefully to the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13465: KAFKA-14368: Connect offset write REST API
yashmayya commented on PR #13465: URL: https://github.com/apache/kafka/pull/13465#issuecomment-1564677170 Thanks Chris! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
splett2 commented on code in PR #13765: URL: https://github.com/apache/kafka/pull/13765#discussion_r1207073563 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1087,12 +1087,14 @@ class Partition(val topicPartition: TopicPartition, // avoid unnecessary collection generation val leaderLogEndOffset = leaderLog.logEndOffsetMetadata var newHighWatermark = leaderLogEndOffset -remoteReplicasMap.values.foreach { replica => +remoteReplicasMap.foreachEntry { (replicaId, replica) => Review Comment: Should we have a test in `PartitionTest` to assert that the HWM is incremented when there is a replica that is fenced but caught up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils
C0urante merged PR #13334: URL: https://github.com/apache/kafka/pull/13334 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1207028405 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -113,38 +142,113 @@ class ProducerIdManagerTest { } @ParameterizedTest - @ValueSource(ints = Array(1, 2, 10)) - def testContiguousIds(idBlockLen: Int): Unit = { + @ValueSource(ints = Array(1, 2, 10, 100)) + def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = { +// Send concurrent generateProducerId requests. Ensure that the generated producer id is unique. +// For each block (total 3 blocks), only "idBlockLen" number of requests should go through. +// All other requests should fail immediately. + +val numThreads = 5 +val latch = new CountDownLatch(idBlockLen * 3) val manager = new MockProducerIdManager(0, 0, idBlockLen) - -IntStream.range(0, idBlockLen * 3).forEach { i => - assertEquals(i, manager.generateProducerId()) +val pidMap = mutable.Map[Long, Int]() +val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads) + +for ( _ <- 0 until numThreads) { + requestHandlerThreadPool.submit(() => { +while(latch.getCount > 0) { + val result = manager.generateProducerId() + result match { +case Success(pid) => + pidMap synchronized { +if (latch.getCount != 0) { + val counter = pidMap.getOrElse(pid, 0) + pidMap += pid -> (counter + 1) + latch.countDown() +} + } + +case Failure(exception) => + assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass) + } + Thread.sleep(100) +} + }, 0) +} +assertTrue(latch.await(15000, TimeUnit.MILLISECONDS)) Review Comment: I would keep it a bit higher so that it does not become flaky. Have you run it a few times on your own to make sure it is not flaky already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils
C0urante commented on code in PR #13334: URL: https://github.com/apache/kafka/pull/13334#discussion_r1207028289 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ## @@ -188,11 +191,34 @@ public static boolean isClassFile(Path path) { return path.toString().toLowerCase(Locale.ROOT).endsWith(".class"); } -public static List pluginLocations(Path topPath) throws IOException { +public static List pluginLocations(String pluginPath) { +if (pluginPath == null) { +return Collections.emptyList(); +} +String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1); +List pluginLocations = new ArrayList<>(); +for (String path : pluginPathElements) { +try { +Path pluginPathElement = Paths.get(path).toAbsolutePath(); +// Currently 'plugin.paths' property is a list of top-level directories +// containing plugins +if (Files.isDirectory(pluginPathElement)) { +pluginLocations.addAll(pluginLocations(pluginPathElement)); +} else if (isArchive(pluginPathElement)) { +pluginLocations.add(pluginPathElement); +} Review Comment: Thanks, LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13356: KAFKA-14789: Prevent mis-attributing classpath plugins, allow discovery of classpath RestExtension and ConfigProvider
C0urante merged PR #13356: URL: https://github.com/apache/kafka/pull/13356 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
rondagostino commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: Thinking about this some more, I believe the key requirement is to accurately understand when we do not have a correct view of the cluster. This allows us to handle 2 important cases: not fencing a broker if its session times out but we think we could have missed enough heartbeats to make the decision to fence the wrong decision; and fencing a broker if its session times out whenever we think we have accurate information. I think I addressed the second part (detect and fence a crashed broker as quick as possible) in the comment above: I believe we have accurate information if we see a contiguous series of N successfully-processed heartbeats with no intervening timed-out heartbeats where N is perhaps the cluster size. For the first part (don't fence if we think the decision to do so could be wrong) assume the broker session is 18 seconds and the heartbeat interval is 2 seconds. That means we would need to miss 9 heartbeats for a broker in order to incorrectly fence it. Maybe we keep track of the number of contiguous successful heartbeats with no intervening misses (which, if we aren't missing any would always be a very high number). But then as soon as we miss one we increment the missed count and reset the contiguous count to 0. When we successfully process a heartbeat we increment the contiguous count and, if it reaches the necessary threshold N (which is on the order of the cluster size) we reset the missed count to 0. We can fence brokers only while the missed count is less than the session/interval ratio (i.e. 18/2 = 9 by default). We can tweak this to be a bit more conservative. Maybe we need N (the number of contiguous heartbeats seen to assure us we have good visibility) to be 1.5 or 2 times the broker count instead of just the broker count. Maybe the missed count only has to exceed half the session/interval ratio (so only missing 5 heartbeats without seeing N successfully-processed ones in a row instead of 9 by default) to prevent fencing. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
rondagostino commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: Thinking about this some more, I believe the key requirement is to accurately understand when we do not have a correct view of the cluster. This allows us to handle 2 important cases: not fencing a broker if its session times out but we think we could have missed enough heartbeats to make the decision to fence the wrong decision; and fencing a broker if its session times out whenever we think we have accurate information. I think I addressed the second part (detect and fence a crashed broker as quick as possible) in the comment above: I believe we have accurate information if we see a contiguous series of N successfully-processed heartbeats with no intervening timed-out heartbeats where N is perhaps the cluster size. For the first part (don't fence if we think the decision to do so could be wrong) assume the broker session is 18 seconds and the heartbeat interval is 2 seconds. That means we would need to miss 9 heartbeats for a broker in order to incorrectly fence it. Maybe we keep track of the last time we had enough contiguous successful heartbeats (which, if we aren't missing any would always be very recent). But then as soon as we miss one we increment the missed count and reset the contiguous count to 0. When we successfully process a heartbeat we increment the contiguous count and, if it reaches the necessary threshold N (which is on the order of the cluster size) we reset the missed count to 0. We can fence brokers only while the missed count is less than the session/interval ratio (i.e. 18/2 = 9 by default). We can tweak this to be a bit more conservative. Maybe we need N (the number of contiguous heartbeats seen to assure us we have good visibility) to be 1.5 or 2 times the broker count instead of just the broker count. Maybe the missed count only has to exceed half the session/interval ratio (so only missing 5 heartbeats without seeing N successfully-processed ones in a row instead of 9 by default) to prevent fencing. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
rondagostino commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: Thinking about this some more, I believe the key requirement is to accurately understand when we do not have a correct view of the cluster. This allows us to handle 2 important cases: not fencing a broker if its session times out but we think we could have missed enough heartbeats to make the decision to fence the wrong decision; and fencing a broker if its session times out whenever we think we have accurate information. I think I addressed the second part (detect and fence a crashed broker as quick as possible) in the comment above: I believe we have accurate information if we see a contiguous series of N successfully-processed heartbeats with no intervening timed-out heartbeats where N is perhaps the cluster size. For the first part (don't fence if we think the decision to do so could be wrong) assume the broker session is 18 seconds and the heartbeat interval is 2 seconds. That means we would need to miss 9 heartbeats for a broker in order to incorrectly fence it. Maybe we keep track of the last time we had enough contiguous successful heartbeats (which, if we aren't missing any would always be very recent). But then as soon as we miss one we increment the missed count and reset the contiguous count to 0. When we successfully process a heartbeat we increment the contiguous count and, if it reaches the necessary threshold N (which is on the order of the cluster size) we reset the missed count to 0. We can fence brokers only while the missed count is less than the session/interval ratio (i.e. 18/2 = 9 by default). We can tweak this to be a bit more conservative. Maybe we need N (the number of contiguous heartbeats seen to assure us we have good visibility) to be 1.5 or 2 times the broker count instead of just the broker count. Maybe the missed count only has to exceed half the session/interval ratio (so only missing 5 heartbeats without seeing N successfully-processed ones in a row would prevent fencing instead of 9 by default). WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante merged PR #13465: URL: https://github.com/apache/kafka/pull/13465 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
rondagostino commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: Thinking about this some more, I believe the key requirement is to accurately understand when we do not have a correct view of the cluster. This allows us to handle 2 important cases: not fencing a broker if its session times out but we think we could have missed enough heartbeats to make the decision to fence the wrong decision; and fencing a broker if its session times out whenever we think we have accurate information. I think I addressed the second part (detect and fence a crashed broker as quick as possible) in the comment above: I believe we have accurate information if we see a contiguous series of N successfully-processed heartbeats with no intervening timed-out heartbeats where N is perhaps the cluster size. For the first part (don't fence if we think the decision to do so could be wrong) assume the broker session is 18 seconds and the heartbeat interval is 2 seconds. That means we would need to miss 9 heartbeats for a broker in order to incorrectly fence it. Maybe we keep track of the last time we had enough contiguous successful heartbeats (which, if we aren't missing any would always be very recent). But then as soon as we miss one we increment the missed count and reset the contiguous count to 0. When we successfully process a heartbeat we increment the contiguous count and, if it reaches the necessary threshold N (which is on the order of the cluster size) we reset the missed count to 0. We can fence brokers only when the missed count approaches the session/interval ratio (i.e. 18/2 = 9 by default). We can tweak this to be a bit more conservative. Maybe we need N (the number of contiguous heartbeats seen to assure us we have good visibility) to be 1.5 or 2 times the broker count instead of just the broker count. Maybe the missed count only has to exceed half the session/interval ratio (so only missing 5 heartbeats without seeing N successfully-processed ones in a row would prevent fencing instead of 9 by default). WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1206991344 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: did we decide that topicIDPartition when the request is old would just have a 0 id? Could you give a brief outline for old vs new request versions and how they are handled (ie representation in memory when handling + what we return in the response for happy path and error cases) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
C0urante commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1206885931 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -695,9 +705,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not assigning offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); +continue; +} +long pos; +try { +pos = consumer.position(tp); +} catch (TimeoutException e) { +log.error("TimeoutException occurred when fetching position for topic partition {}. " + +"Checking if the topic {} exists", tp, tp.topic()); +Map topic = topicAdmin.describeTopics(tp.topic()); Review Comment: This adds new ACL requirements for sink connectors' admin clients, which is a breaking change and cannot be done until the next major release. We also need to handle generic exceptions thrown during this call. Probably safest to assume that the topic exists if we fail to describe it. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { Review Comment: We should not be relying on undocumented behavior like this without confirmation from someone familiar with the clients library that it's intentional, or at the very least, that it won't change in the future without a KIP. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1367,9 +1367,14 @@ public WorkerTask doBuild(Task task, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); +Map adminOverrides = adminConfigs(id.connector(), "connector-worker-adminclient-" + id.connector(), Review Comment: We already construct an admin client for sink connectors [if they use a DLQ topic](https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L947-L951). We should create at most one admin client per sink task, which means probably refactoring the `sinkTaskReporters` method to accept an admin client if we're going to be creating one unconditionally here. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { Review Comment: Also, this entire block relies on racy logic to detect deleted topics that may fail if a topic is deleted after this check takes place. Again, we should reach out to someone familiar with the clients library. It's unlikely that Kafka Connect is the only consumer application that is impacted by this scenario and it's better to fix this at the clients level instead of implementing Connect-specific workarounds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
ijuma commented on code in PR #13742: URL: https://github.com/apache/kafka/pull/13742#discussion_r1206985698 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -457,9 +466,13 @@ private Throwable handleEventException(String name, long endProcessingTime = time.nanoseconds(); long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS); -if (exception instanceof ApiException) { +if ((exception instanceof ApiException) || +(exception instanceof BoundedListTooLongException)) { log.info("{}: failed with {} in {} us. Reason: {}", name, exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); +if (exception instanceof BoundedListTooLongException) { +exception = new UnknownServerException(exception.getMessage()); Review Comment: `UnknownServerException` is pretty uninformative. Can we use `PolicyViolationException` with a string indicating the specifics? Not perfect, but the custom string sent back to the user can clarify what's going on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
ijuma commented on code in PR #13742: URL: https://github.com/apache/kafka/pull/13742#discussion_r1206985698 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -457,9 +466,13 @@ private Throwable handleEventException(String name, long endProcessingTime = time.nanoseconds(); long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS); -if (exception instanceof ApiException) { +if ((exception instanceof ApiException) || +(exception instanceof BoundedListTooLongException)) { log.info("{}: failed with {} in {} us. Reason: {}", name, exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); +if (exception instanceof BoundedListTooLongException) { +exception = new UnknownServerException(exception.getMessage()); Review Comment: This is pretty uninformative. Can we use `PolicyViolationException` with a string indicating the specifics? Not perfect, but the ability to include a custom string is super helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method
machi1990 commented on PR #13611: URL: https://github.com/apache/kafka/pull/13611#issuecomment-1564562097 Thanks @jsancio for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method
jsancio merged PR #13611: URL: https://github.com/apache/kafka/pull/13611 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller
mumrah commented on PR #13742: URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564483362 @divijvaidya Colin can correct me if I'm mistaken, but I believe this patch is mainly about closing an existing edge case until we implement KIP-868 (metadata transactions). Once we have transactions in the controller, we can allow arbitrarily large batches of records. > I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it? Right now, if you create a topic with more than ~1 partitions, you'll get a server error anyways. The controller fails to commit the batch, throws and exception, and the renounces leadership. Here's what happens on the controller: ``` [2023-05-26 10:24:28,308] DEBUG [QuorumController id=1] Got exception while running createTopics(1813420413). Invoking handleException. (org.apache.kafka.queue.KafkaEventQueue) java.lang.IllegalStateException: Attempted to atomically commit 20001 records, but maxRecordsPerBatch is 1 at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.lang.Thread.run(Thread.java:750) [2023-05-26 10:24:28,314] INFO [RaftManager id=1] Received user request to resign from the current epoch 3 (org.apache.kafka.raft.KafkaRaftClient) [2023-05-26 10:24:28,323] INFO [RaftManager id=1] Failed to handle fetch from 2 at 142 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient) ``` And the client sees: ``` [2023-05-26 10:24:28,351] ERROR org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. (kafka.admin.TopicCommand$) ``` So, really this patch isn't changing anything from the client's perspective. It's just prevent the controller from renouncing (which is the real problem). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer
vvcephei merged PR #13455: URL: https://github.com/apache/kafka/pull/13455 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test
machi1990 commented on PR #13664: URL: https://github.com/apache/kafka/pull/13664#issuecomment-1564402596 Thanks @philipnee @vvcephei for the review and merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test
vvcephei merged PR #13664: URL: https://github.com/apache/kafka/pull/13664 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
Hangleton commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1206778975 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: Sure that's fine! I will go ahead and start with the approach discussed last time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13764: KAFKA-14691; [1/N] Add new fields to OffsetFetchRequest and OffsetFetchResponse
dajac commented on PR #13764: URL: https://github.com/apache/kafka/pull/13764#issuecomment-1564350280 I have a few comments/questions: * I am not really comfortable with merging this without the server side implementation. @clolov Is there a strong reason to not do them together? * I agree with @Hangleton that it may be better to start with adding the TopicId only. This is complicated enough on its own. We can the other fields afterwards. * I agree with @jolshan that we should set `"latestVersionUnstable": true` while in development. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
dajac commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1206732981 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: I don't recall the details now. I have to get back to it... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
Hangleton commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1206702136 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: Hi David and Justine, coming back to this, apologies for the delay. Are you comfortable of using `TopicIdPartition` as suggested above, or just keep `TopicPartition` and do the resolution to `TopicIdPartition` in `KafkaApis`? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker
Hangleton commented on PR #13558: URL: https://github.com/apache/kafka/pull/13558#issuecomment-1564157619 Hi, Igor, thanks for the review. I added the changes you reminded me about above. I am going through an additional test runs for this integration test to ensure there is no intermittent failure. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.
mimaison commented on PR #13748: URL: https://github.com/apache/kafka/pull/13748#issuecomment-1564113409 Good catch! Yes it would be good to have this in 3.5. @krespo can you update your PR with the suggestions from @gharris1727 ? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14953) Add metrics for tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-14953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726550#comment-17726550 ] Luke Chen commented on KAFKA-14953: --- Sounds good! Thank you! > Add metrics for tiered storage > -- > > Key: KAFKA-14953 > URL: https://issues.apache.org/jira/browse/KAFKA-14953 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Abhijeet Kumar >Priority: Minor > > Not just for expired fetch. We also need to add all the metrics described in > KIP-405 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-NewMetrics > > ref: [https://github.com/apache/kafka/pull/13535#discussion_r1180286031] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1206322259 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,876 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1206317753 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerGroupTest { + +private ConsumerGroup createConsumerGroup(String groupId) { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +return new ConsumerGroup(snapshotRegistry, groupId); +} + +@Test +public void testGetOrCreateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +// Create a group. +member = consumerGroup.getOrMaybeCreateMember("member-id", true); +assertEquals("member-id", member.memberId()); + +// Get that group back. +member = consumerGroup.getOrMaybeCreateMember("member-id", false); +assertEquals("member-id", member.memberId()); + +assertThrows(UnknownMemberIdException.class, () -> +consumerGroup.getOrMaybeCreateMember("does-not-exist", false)); +} + +@Test +public void testUpdateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = consumerGroup.getOrMaybeCreateMember("member", true); + +member = new ConsumerGroupMember.Builder(member) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", false)); +} + +@Test +public void testRemoveMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); + +consumerGroup.getOrMaybeCreateMember("member", true); +assertTrue(consumerGroup.hasMember("member")); + +consumerGroup.removeMember("member"); +assertFalse(consumerGroup.hasMember("member")); + +} + +@Test +public void testUpdatingMemberUpdatesPartitionEpoch() { +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +Uuid zarTopicId = Uuid.randomUuid(); + +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(barTopicId, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(zarTopicId, 7, 8, 9))) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3)); +assertEquals(10,