[GitHub] [kafka] showuon commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
showuon commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1471333242 @robobario , nice catch! Welcome to submit a PR to fix it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] garyparrot opened a new pull request, #13398: MINOR: Fix typos in valid range of socket buffer size
garyparrot opened a new pull request, #13398: URL: https://github.com/apache/kafka/pull/13398 Fix the following typos in Kafka connect configs: [send.buffer.bytes](https://kafka.apache.org/documentation/#connectconfigs_send.buffer.bytes) and [receive.buffer.bytes](https://kafka.apache.org/documentation/#connectconfigs_receive.buffer.bytes) ![image](https://user-images.githubusercontent.com/39105714/225489012-9b6f2205-fbcb-4cf9-939c-d6c96cbe6afb.png) Currently, the valid values start from `0`, although the documentation states `-1` is also a valid value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] robobario commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
robobario commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1471191098 Hi, I think this introduced a race condition that can cause the producer-perf-test to log a lot of exceptions to console if num records > 50. Since the `iteration` is incremented from the callback, multiple threads may see the same value when deciding whether to update `latencies`, so it can go out of bounds when updating `latencies`. ``` bin/kafka-producer-perf-test.sh --topic perf-test --throughput -1 --num-records 100 --producer-props acks=all bootstrap.servers=localhost:9092 --record-size 50 ``` Sometimes it logs: ``` [2023-03-16 15:30:57,432] ERROR Error executing user-provided callback on message for topic-partition 'perf-test-0' (org.apache.kafka.clients.producer.internals.ProducerBatch) java.lang.ArrayIndexOutOfBoundsException: Index 51 out of bounds for length 51 at org.apache.kafka.tools.ProducerPerformance$Stats.record(ProducerPerformance.java:38 ``` It doesn't kill the test run but it looks alarming to see a couple of hundred errors in console. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13884) KRaft Obsever are not required to flush on every append
[ https://issues.apache.org/jira/browse/KAFKA-13884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-13884. Fix Version/s: 3.5.0 Resolution: Fixed > KRaft Obsever are not required to flush on every append > --- > > Key: KAFKA-13884 > URL: https://issues.apache.org/jira/browse/KAFKA-13884 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.5.0 > > > The current implementation of the KRaft Client flushes to disk when observers > append to the log. This is not required since observer don't participate in > leader election and the advancement of the high-watermark. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: WIP: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1137901882 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +// Check if we have already (either node or individual transaction). +val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node) +currentNodeAndTransactionDataOpt match { + case None => +nodesToTransactions.put(node, + new TransactionDataAndCallbacks(new AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()), +mutable.Map(transactionData.transactionalId() -> callback))) + case Some(currentNodeAndTransactionData) => +// Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. +val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) +if (currentTransactionData != null) { + if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), Errors.INVALID_PRODUCER_EPOCH) + } +} +val callback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) + currentNodeAndTransactionData.transactionData.remove(transactionData.transactionalId()) +callback(topicPartitionsToError.toMap) + } else { +// We should never see a request on the same epoch since we haven't finished handling the one in queue +throw new InvalidRecordException("Received a second request from the same connection without finishing the first.") + } +} +currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) +} +wakeup() + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + inflightNodes.synchronized(inflightNodes.remove(node)) + if (response.authenticationException() != null) { +error(s"AddPartitionsToTxnRequest failed for broker
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: WIP: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1137856849 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +// Check if we have already (either node or individual transaction). +val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node) +currentNodeAndTransactionDataOpt match { + case None => +nodesToTransactions.put(node, + new TransactionDataAndCallbacks(new AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()), +mutable.Map(transactionData.transactionalId() -> callback))) + case Some(currentNodeAndTransactionData) => +// Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. +val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) +if (currentTransactionData != null) { + if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), Errors.INVALID_PRODUCER_EPOCH) + } +} +val callback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) + currentNodeAndTransactionData.transactionData.remove(transactionData.transactionalId()) +callback(topicPartitionsToError.toMap) + } else { +// We should never see a request on the same epoch since we haven't finished handling the one in queue +throw new InvalidRecordException("Received a second request from the same connection without finishing the first.") + } +} +currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) +} +wakeup() + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + inflightNodes.synchronized(inflightNodes.remove(node)) + if (response.authenticationException() != null) { +error(s"AddPartitionsToTxnRequest failed for broker
[GitHub] [kafka] cmccabe merged pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration
cmccabe merged PR #13384: URL: https://github.com/apache/kafka/pull/13384 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
dajac commented on PR #13323: URL: https://github.com/apache/kafka/pull/13323#issuecomment-1470710416 @CalvinConfluent Could you rebase as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13396: KAFKA-13884; Only voters flush on Fetch response
jsancio merged PR #13396: URL: https://github.com/apache/kafka/pull/13396 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1137608147 ## clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java: ## @@ -198,6 +204,35 @@ public void testForgottenTopics(short version) { } } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) +public void testFetchRequestSimpleBuilderDowngrade(short version) { Review Comment: ReplicaStateDowngrade? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API
guozhangwang commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1137588681 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) { this.timer.update(currentTimeMs); } } + +private class FetchCommittedOffsetResponseHandler { Review Comment: I saw that we do retry on just the `COORDINATOR_LOAD_IN_PROGRESS` and `NOT_COORDINATOR` and `unstableTxnOffsetTopicPartitions`. Just confirming with you are they the only possible retriable errors for this response (saw `OffsetFetchResponse` there are others)? What about `COORDINATOR_NOT_AVAILABLE`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) { this.timer.update(currentTimeMs); } } + +private class FetchCommittedOffsetResponseHandler { +private final UnsentOffsetFetchRequestState request; + +private FetchCommittedOffsetResponseHandler(final UnsentOffsetFetchRequestState request) { +this.request = request; +} + +public void onResponse( +final long currentTimeMs, +final OffsetFetchResponse response) { +Errors responseError = response.groupLevelError(groupState.groupId); +if (responseError != Errors.NONE) { +onFailure(currentTimeMs, responseError); +return; +} + +onSuccess(currentTimeMs, response); +} +private void onFailure(final long currentTimeMs, + final Errors responseError) { +log.debug("Offset fetch failed: {}", responseError.message()); + +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +retry(currentTimeMs); +} else if (responseError == Errors.NOT_COORDINATOR) { +// re-discover the coordinator and retry + coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds()); +retry(currentTimeMs); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { +// TODO: I'm not sure if we should retry here. Sounds like we should propagate the error to let the +// user to fix the permission + request.future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId)); +} else { +request.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); +} +return; +} + +private void retry(final long currentTimeMs) { +this.request.onFailedAttempt(currentTimeMs); +unsentOffsetFetchRequests.enqueue(this.request); +} + +private void onSuccess(final long currentTimeMs, + final OffsetFetchResponse response) { +Set unauthorizedTopics = null; +Map responseData = +response.partitionDataMap(groupState.groupId); +Map offsets = new HashMap<>(responseData.size()); +Set unstableTxnOffsetTopicPartitions = new HashSet<>(); +for (Map.Entry entry : responseData.entrySet()) { +TopicPartition tp = entry.getKey(); +OffsetFetchResponse.PartitionData partitionData = entry.getValue(); +if (partitionData.hasError()) { +Errors error = partitionData.error; +log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); + +if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { +request.future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not " + +"exist")); +return; +} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { +if (unauthorizedTopics == null) { +unauthorizedTopics = new HashSet<>(); +} +unauthorizedTopics.add(tp.topic()); +} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) { +System.out.println("asdkljlsadjflksajfdlk"); +unstableTxnOffsetTopicPartitions.add(tp); +} else { +request.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset " + +"response for partition " + tp + ": " + error.message())); +return; +} +} else if
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API
guozhangwang commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1137549960 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) { this.timer.update(currentTimeMs); } } + +private class FetchCommittedOffsetResponseHandler { Review Comment: Ah thanks for pointing it out, will check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API
guozhangwang commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1137547234 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +static class UnsentOffsetFetchRequestState extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequestState(final Set partitions, + final GroupState.Generation generation, + final CompletableFuture> future, + final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = future; +} + +public boolean sameRequest(final UnsentOffsetFetchRequestState request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} +} + +/** + * This is used to support the committed() API. Here we use a Java Collections, {@code unsentRequests}, to + * track + * the OffsetFetchRequests that haven't been sent, to prevent sending the same requests in the same batch. + * + * If the request is new. It will be enqueued to the {@code unsentRequest}, and will be sent upon the next + * poll. + * + * If the same request has been sent, the request's {@code CompletableFuture} will be completed upon the + * completion of the existing one. + * + * TODO: There is an optimization to present duplication to the sent but incompleted requests. I'm not sure if we + * need that. + */ +class UnsentOffsetFetchRequests { Review Comment: Maybe it's not that common; but the current flag we used is to cover both periods, and I wonder if was conducted that way to avoid some surprising cases indeed.. So my rationale is just to keep the behavior consistent with the old code logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13368: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft
cmccabe commented on code in PR #13368: URL: https://github.com/apache/kafka/pull/13368#discussion_r1137526482 ## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ## @@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo } } + def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { +// This is probably fairly inefficient, but it preserves the semantics from AclAuthorizer (which is non-trivial) +var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) +def updateAcls(resourcePattern: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + allAcls = allAcls.updated(resourcePattern, versionedAcls) +} + +AclAuthorizer.loadAllAcls(zkClient, this, updateAcls) +allAcls.foreach { case (resourcePattern, versionedAcls) => Review Comment: The main issue that I see here is that batches could become too big. Could you add some code to limit batches to 100 records or so? We had a similar bug in snapshot generation until we decoupled batching from record generation. So we should probably do the same thing here. In other words, recordConsumer should take individual records and do the batching itself. Since we'll be in a metadata transaction, the batching is not meaningful here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13372: MINOR: Improved error handling in ZK migration
cmccabe commented on PR #13372: URL: https://github.com/apache/kafka/pull/13372#issuecomment-1470482252 @mumrah : `KRaftMigrationDriverTest.testMigrationWithClientException` is failing for me with this PR. Can you take a look? ``` Gradle Test Run :metadata:test > Gradle Test Executor 3 > KRaftMigrationDriverTest > testMigrationWithClientException(boolean) > org.apache.kafka.metadata.migration.KRaftMigrationDriverTest.testMigrationWithClientException(boolean)[1] FAILED org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) at app//org.apache.kafka.metadata.migration.KRaftMigrationDriverTest.testMigrationWithClientException(KRaftMigrationDriverTest.java:358)``` (Also Jenkins did something silly again, but hopefully the next build will work.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14811) The forwarding requests are discarded when network client is changed to/from zk/Kraft
Chia-Ping Tsai created KAFKA-14811: -- Summary: The forwarding requests are discarded when network client is changed to/from zk/Kraft Key: KAFKA-14811 URL: https://issues.apache.org/jira/browse/KAFKA-14811 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai We don't check the in-flight requests when closing stale network client. If the in-flight requests are related to metadata request from client, the client will get timeout exception. If the in-flight requests are related to ISR/leader, the partition can't be written as it can't meet mini ISR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14802) topic deletion bug
[ https://issues.apache.org/jira/browse/KAFKA-14802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700698#comment-17700698 ] Divij Vaidya commented on KAFKA-14802: -- Have you checked the value of the setting [auto.create.topics.enable|https://kafka.apache.org/documentation.html#brokerconfigs_auto.create.topics.enable] on the broker and the value of [allow.auto.create.topics|https://kafka.apache.org/documentation.html#consumerconfigs_allow.auto.create.topics] on the consumer? Enabling these values could lead to auto creation of topic even if it is deleted. > topic deletion bug > -- > > Key: KAFKA-14802 > URL: https://issues.apache.org/jira/browse/KAFKA-14802 > Project: Kafka > Issue Type: Bug > Components: controller, replication >Affects Versions: 3.3.2 > Environment: AWS m5.xlarge EC2 instance >Reporter: Behavox >Priority: Major > Attachments: server.properties > > > topic deletion doesn't work as expected when attempting to delete topic(s), > after successful deletion topic is recreated in a multi-controller > environment with 3 controllers and ReplicationFactor: 2 > How to reproduce - attempt to delete topic. Topic is removed successfully and > recreated right after removal. Example below shows a single topic named > example-topic. We have a total count of 17000 topics in the affected cluster. > > Our config is attached. > Run 1 > [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > Run 2 > [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
hudeqi commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1470060354 Thx, Is this [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms) interested in understanding and discussing? @chia7712 @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
chia7712 merged PR #13348: URL: https://github.com/apache/kafka/pull/13348 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
Hangleton commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1469956438 Many thanks David. I will try to get to this in the next couple of days. Apologies for the delay, I wish i could get to this sooner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit
Hangleton commented on PR #13378: URL: https://github.com/apache/kafka/pull/13378#issuecomment-1469954905 Thanks for the merge David. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1469825143 @Hangleton I just merge https://github.com/apache/kafka/pull/13378. We can update this PR now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit
dajac merged PR #13378: URL: https://github.com/apache/kafka/pull/13378 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit
dajac commented on PR #13378: URL: https://github.com/apache/kafka/pull/13378#issuecomment-1469817830 That's weird. The build is still reported `in progress` in the PR but the build has completed. The last build results are here: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13378/21/tests. Here are the failed tests: ``` Build / JDK 11 and Scala 2.13 / shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest 16s Build / JDK 17 and Scala 2.13 / testReturnRecordsDuringRebalance() – org.apache.kafka.clients.consumer.KafkaConsumerTest <1s Build / JDK 17 and Scala 2.13 / testTrustStoreAlter(String).quorum=kraft – kafka.server.DynamicBrokerReconfigurationTest 12s Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Existing failures - 6 Build / JDK 11 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 13s Build / JDK 11 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 25s Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 18s Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 27s Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 12s Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest ``` None of those are related to this PR. I am going to merge the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136820680 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,35 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +// It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw +// UnsupportedOperationException if it is used for upgrading. +public static class SimpleBuilder extends AbstractRequest.Builder { +private final FetchRequestData fetchRequestData; +public SimpleBuilder(FetchRequestData fetchRequestData) { Review Comment: nit: Let's add an empty line before this one. ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,35 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +// It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw +// UnsupportedOperationException if it is used for upgrading. Review Comment: nit: I would remove the second sentence because I find it unclear. What does `upgrading` mean here? ## clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java: ## @@ -198,6 +204,35 @@ public void testForgottenTopics(short version) { } } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) +public void testFetchRequestSimpleBuilderDowngrade(short version) { Review Comment: nit: `...FetchStateDowngrade`? ## core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala: ## @@ -159,6 +162,28 @@ class KafkaNetworkChannelTest { } } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + def testFetchRequestDowngrade(version: Short): Unit = { +val destinationId = 2 +val destinationNode = new Node(destinationId, "127.0.0.1", 9092) +channel.updateEndpoint(destinationId, new InetAddressSpec( + new InetSocketAddress(destinationNode.host, destinationNode.port))) +sendTestRequest(ApiKeys.FETCH, destinationId) +channel.pollOnce() + +assertEquals(1, client.requests().size()) +val request = client.requests().peek().requestBuilder().build(version) + +if (version < 15) { + assertTrue(request.asInstanceOf[FetchRequest].data().replicaId() == 1) Review Comment: small nit: In Scala, we usually don't put the `()` for accessors. In this test, you can remove them for `data`, `replicaId`, `size`, `replicaState`, etc. ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -983,16 +988,16 @@ private CompletableFuture handleFetchRequest( Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.debug("Failed to handle fetch from {} at {} due to {}", -request.replicaId(), fetchPartition.fetchOffset(), error); +replicaId, fetchPartition.fetchOffset(), error); Review Comment: nit: Let's keep the original indentation please. ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } +// This test mainly focuses on whether the leader state is correctly updated under different fetch version. +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) +public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { +int localId = 0; +int otherNodeId = 1; +int epoch = 5; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + +// First, test with a correct fetch request. +FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0); +FetchRequestData downgradedRequest = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); +context.deliverRequest(downgradedRequest); +context.client.poll(); +context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); +assertEquals(1L, context.log.highWatermark().offset); Review Comment: nit: Other tests use `context.client.highWatermark()`. Should we also use this one? ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } +// This test
[GitHub] [kafka] sionsmith commented on pull request #10233: KAFKA-9413: Auditing in Kafka
sionsmith commented on PR #10233: URL: https://github.com/apache/kafka/pull/10233#issuecomment-1469564517 It would be great to consider this as more companies are requiring an audit functionality -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista closed pull request #13397: short circuiting reads during fetch computation if enough bytes have …
badaiaqrandista closed pull request #13397: short circuiting reads during fetch computation if enough bytes have … URL: https://github.com/apache/kafka/pull/13397 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista opened a new pull request, #13397: short circuiting reads during fetch computation if enough bytes have …
badaiaqrandista opened a new pull request, #13397: URL: https://github.com/apache/kafka/pull/13397 …been collected *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org