[GitHub] [kafka] showuon opened a new pull request, #13037: MINOR: make sure all partition info is propagated
showuon opened a new pull request, #13037: URL: https://github.com/apache/kafka/pull/13037 Recently, we have some flaky tests failed with: ``` java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at kafka.utils.TestUtils$.describeTopic(TestUtils.scala:475) at kafka.utils.TestUtils$.topicHasSameNumPartitionsAndReplicationFactor(TestUtils.scala:484) ``` The line failed is when we tried to create a topic but got `TopicExistsException`, so we want to make sure this topic has the expected partition num and replication factor. It failed while trying to describe this topic, and got `UnknownTopicOrPartitionException`. That will happen when topic is created but the metadata haven't propagated to all brokers. So, fix it by explicitly waiting for partition info propagated to all brokers before verifying `topicHasSameNumPartitionsAndReplicationFactor`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a diff in pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log
chia7712 commented on code in PR #12979: URL: https://github.com/apache/kafka/pull/12979#discussion_r1055146924 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1079,6 +1079,9 @@ class LogManager(logDirs: Seq[File], if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") + // the metrics tags still contain "future", so we have to remove it. + // we will add metrics back after sourceLog remove the metrics + destLog.removeLogMetrics() destLog.renameDir(UnifiedLog.logDirName(topicPartition), true) destLog.updateHighWatermark(sourceLog.highWatermark) Review Comment: > it'll be helpful to have metrics reporting it make sense. will copy that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14470) Move log layer to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651098#comment-17651098 ] Satish Duggana edited comment on KAFKA-14470 at 12/22/22 6:54 AM: -- [~ijuma] [~mimaison] `LogOffsetMetadata` refactoring to move to storage module is being done as part of https://issues.apache.org/jira/browse/KAFKA-14480. Glad to raise PR against https://issues.apache.org/jira/browse/KAFKA-14543 for that specific change if you have not yet started working on that. was (Author: satish.duggana): [~ijuma] [~mimaison] `LogOffsetMetadata` refactoring to move to storage module is being done as part of https://issues.apache.org/jira/browse/KAFKA-14480. Let me know if I can raise PR against https://issues.apache.org/jira/browse/KAFKA-14543 for that specific change. > Move log layer to storage module > > > Key: KAFKA-14470 > URL: https://issues.apache.org/jira/browse/KAFKA-14470 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > > We introduced the `storage` module as part of KIP-405, but the existing log > layer remains in the `core` module. Moving the log layer to the `storage` > module would be another step towards improved modularity and build times > (similar to `metadata`, `raft` and `group-coordinator`). > We should do this in an incremental manner to make the code review process > easier. I will create separate tasks, each one mapping to one pull request. > In order to understand the feasibility, I tackled a few of the tasks myself. > Help from the community is appreciated for the unassigned tasks, but it > probably makes sense to do that after the initial PRs have been submitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14470) Move log layer to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651098#comment-17651098 ] Satish Duggana commented on KAFKA-14470: [~ijuma] [~mimaison] `LogOffsetMetadata` refactoring to move to storage module is being done as part of https://issues.apache.org/jira/browse/KAFKA-14480. Let me know if I can raise PR against https://issues.apache.org/jira/browse/KAFKA-14543 for that specific change. > Move log layer to storage module > > > Key: KAFKA-14470 > URL: https://issues.apache.org/jira/browse/KAFKA-14470 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > > We introduced the `storage` module as part of KIP-405, but the existing log > layer remains in the `core` module. Moving the log layer to the `storage` > module would be another step towards improved modularity and build times > (similar to `metadata`, `raft` and `group-coordinator`). > We should do this in an incremental manner to make the code review process > easier. I will create separate tasks, each one mapping to one pull request. > In order to understand the feasibility, I tackled a few of the tasks myself. > Help from the community is appreciated for the unassigned tasks, but it > probably makes sense to do that after the initial PRs have been submitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] emilnkrastev commented on pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…
emilnkrastev commented on PR #11818: URL: https://github.com/apache/kafka/pull/11818#issuecomment-1362465204 @gharris1727 The PR is updated. Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14470) Move log layer to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650947#comment-17650947 ] Ismael Juma edited comment on KAFKA-14470 at 12/22/22 6:03 AM: --- [~mimaison] Dependencies of the log layer classes would have to be moved too, yes. In some cases, it may warrant a separate ticket while in others it can be bundled with one of the tickets that already exists (left to the discretion of the contributor). Regarding kraft, yes there are some opportunities once the log layer has been completely moved to the storage layer. I was having a discussion with Jose yesterday about this and will file a ticket to capture it. was (Author: ijuma): [~mimaison] Dependencies of the log layer classes would have to be moved too, yes. In some cases, it may warrant a separate ticket while in others it can be bundled with one of the tickets that already exists (left to the discretion of the contributor). Regarding kraft, yes there are some opportunities once the log layer has been completed moved to the storage layer. I was having a discussion with Jose yesterday about this and will file a ticket to capture it. > Move log layer to storage module > > > Key: KAFKA-14470 > URL: https://issues.apache.org/jira/browse/KAFKA-14470 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > > We introduced the `storage` module as part of KIP-405, but the existing log > layer remains in the `core` module. Moving the log layer to the `storage` > module would be another step towards improved modularity and build times > (similar to `metadata`, `raft` and `group-coordinator`). > We should do this in an incremental manner to make the code review process > easier. I will create separate tasks, each one mapping to one pull request. > In order to understand the feasibility, I tackled a few of the tasks myself. > Help from the community is appreciated for the unassigned tasks, but it > probably makes sense to do that after the initial PRs have been submitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] finaut commented on pull request #8066: KAFKA-4090: Validate SSL connection in client
finaut commented on PR #8066: URL: https://github.com/apache/kafka/pull/8066#issuecomment-1362433187 > I documented my work on the ticket as well. It got very little attention from the Kafka Dev Team and seems to be pretty out of date now (merge conflicts). Will need someone internal to the project to take this baton and run with it. > > https://issues.apache.org/jira/browse/KAFKA-4090 It's hard to understand why such a serious error is being ignored. It's been reported many times in different forms. Without a fix for this, I can't see how it's possible to consider Kafka as Enterprise ready technology. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13036: KAFKA-14534: Reduce flakiness in TransactionsExpirationTest
showuon commented on code in PR #13036: URL: https://github.com/apache/kafka/pull/13036#discussion_r1055097820 ## core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala: ## @@ -128,10 +128,13 @@ class TransactionsExpirationTest extends KafkaServerTestHarness { producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false)) producer.flush() +TestUtils.waitUntilTrue(() => producerState.nonEmpty, s"Producer IDs for topic1 did not propagate quickly") + // Ensure producer IDs are added. val pState = producerState -assertEquals(1, pState.size) -val oldProducerId = pState(0).producerId +assertEquals(1, pState.size, s"No producer visible via admin api") Review Comment: I don't think the error message makes sense here since we already wait until the `producerState` is not empty. The error here could be the size is different, not no producer visible via admin api. Does that make sense? ## core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala: ## @@ -149,13 +152,18 @@ class TransactionsExpirationTest extends KafkaServerTestHarness { producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "3", "3", willBeCommitted = true)) producer.commitTransaction() +TestUtils.waitUntilTrue(() => producerState.nonEmpty, s"Producer IDs for topic1 did not propagate quickly") + // Producer IDs should repopulate. val pState2 = producerState -assertEquals(1, pState2.size) -val newProducerId = pState2(0).producerId +assertEquals(1, pState2.size, "No producer visible via admin api") +val newProducerId = pState2.head.producerId +val newProducerEpoch = pState2.head.producerEpoch -// Producer IDs should be the same. +// Because the transaction IDs outlive the producer IDs, creating a producer with the same transactional id +// soon after the first will re-use the same producerId, while bumping the epoch to indicate that they are distinct. assertEquals(oldProducerId, newProducerId) +assertEquals(oldProducerEpoch + 1, newProducerEpoch) Review Comment: Thanks for adding one more verification and the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log
showuon commented on code in PR #12979: URL: https://github.com/apache/kafka/pull/12979#discussion_r1055056767 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1079,6 +1079,9 @@ class LogManager(logDirs: Seq[File], if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") + // the metrics tags still contain "future", so we have to remove it. + // we will add metrics back after sourceLog remove the metrics + destLog.removeLogMetrics() destLog.renameDir(UnifiedLog.logDirName(topicPartition), true) destLog.updateHighWatermark(sourceLog.highWatermark) Review Comment: Do you think we should remove log metrics after destLog got renamed? I'm thinking if we got stuck for some time during `renameDir` and it'll be helpful to have metrics reporting it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log
showuon commented on code in PR #12979: URL: https://github.com/apache/kafka/pull/12979#discussion_r1055056767 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1079,6 +1079,9 @@ class LogManager(logDirs: Seq[File], if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") + // the metrics tags still contain "future", so we have to remove it. + // we will add metrics back after sourceLog remove the metrics + destLog.removeLogMetrics() destLog.renameDir(UnifiedLog.logDirName(topicPartition), true) destLog.updateHighWatermark(sourceLog.highWatermark) Review Comment: Do you think we should remove log metrics after destLog got renamed? I'm thinking if we'll get stuck for some time during `renameDir` and it'll be helpful to have metrics reporting it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13031: MINOR: increase connectionMaxIdleMs to make test reliable
showuon merged PR #13031: URL: https://github.com/apache/kafka/pull/13031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
ijuma merged PR #13012: URL: https://github.com/apache/kafka/pull/13012 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
ijuma commented on PR #13012: URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362265880 JDK 17 build passed, other build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #13034: MINOR: remove onChange call in stream assignor assign method
vvcephei merged PR #13034: URL: https://github.com/apache/kafka/pull/13034 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13034: MINOR: remove onChange call in stream assignor assign method
vvcephei commented on PR #13034: URL: https://github.com/apache/kafka/pull/13034#issuecomment-1362253175 None of the failing tests were from Streams: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplication() | 2 min 11 sec | 1 -- | -- | -- Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft | 10 sec | 1 Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft | 4.5 sec | 1 Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.Tls12SelectorTest.testCloseOldestConnection() | 0.59 sec | 1 Build / JDK 17 and Scala 2.13 / kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String).quorum=kraft | 16 sec | 1 Build / JDK 17 and Scala 2.13 / kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=zk | 5.4 sec | 1 Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String).quorum=kraft [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplication__/) 2 min 11 sec [1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/) [Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.api/TransactionsExpirationTest/Build___JDK_11_and_Scala_2_13___testTransactionAfterProducerIdExpires_String__quorum_kraft/) 10 sec [1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/) [Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testTrustStoreAlter_String__quorum_kraft/) 4.5 sec [1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.Tls12SelectorTest.testCloseOldestConnection()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testCloseOldestConnection__/) 0.59 sec [1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/) [Build / JDK 17 and Scala 2.13 / kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.admin/ReassignPartitionsIntegrationTest/Build___JDK_17_and_Scala_2_13___testCancellation_String__quorum_kraft/) 16 sec [1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/) [Build / JDK 17 and Scala 2.13 / kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.api/ProducerIdExpirationTest/Build___JDK_17_and_Scala_2_13___testProducerIdExpirationWithNoTransactions_String__quorum_zk/) 5.4 sec [1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/) [Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.api/TransactionsTest/Build___JDK_17_and_Scala_2_13___testSendOffsetsWithGroupId_String__quorum_kraft/) ``` I'll go ahead and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13027: MINOR Send ZK broker epoch in registration
mumrah commented on code in PR #13027: URL: https://github.com/apache/kafka/pull/13027#discussion_r1054964559 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -359,6 +354,11 @@ class KafkaServer( val brokerInfo = createBrokerInfo val brokerEpoch = zkClient.registerBroker(brokerInfo) +lifecycleManager = new BrokerLifecycleManager(config, + time, + threadNamePrefix, + zkBrokerEpoch = Some(brokerEpoch)) Review Comment: Good catch. We do get new broker epochs after Zk session re-initializes. I'll change this to use the same pattern we use in AlterPartitionManager -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14546) Allow Partitioner to return -1 to indicate default partitioning
James Olsen created KAFKA-14546: --- Summary: Allow Partitioner to return -1 to indicate default partitioning Key: KAFKA-14546 URL: https://issues.apache.org/jira/browse/KAFKA-14546 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 3.3.1 Reporter: James Olsen Prior to KIP-794 it was possible to create a custom Partitioner that could delegate to the DefaultPartitioner. DefaultPartitioner has been deprecated so we can now only delegate to BuiltInPartitioner.partitionForKey which does not handle a non-keyed message. Hence there is now no mechanism for a custom Partitioner to fallback to default partitioning, e.g. for the non-keyed sticky case. I would like to propose that KafkaProducer.partition(...) not throw IllegalArgumentException if the Partitioner returns RecordMetadata.UNKNOWN_PARTITION and instead continue with the default behaviour. Maybe with a configuration flag to enable this behaviour so as not to break existing expectations? Why was Partitioner delegation with default fallback useful? # A single Producer can be used to write to multiple Topics where each Topic may have different partitioning requirements. The Producer can only have a single Partitioner so the Partitioner needs to be able to switch behaviour based on the Topic, including the need to fallback to default behaviour if a given Topic does not have a custom requirement. # Multiple services may need to produce to the same Topic and these services may be authored by different teams. A single custom Partitioner that encapsulates all Topic specific partitioning logic can be used by all teams at all times for all Topics ensuring that mistakes are not made. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] akhileshchg commented on a diff in pull request #13028: KAFKA-14458: Introduce RPC support during ZK migration
akhileshchg commented on code in PR #13028: URL: https://github.com/apache/kafka/pull/13028#discussion_r1054933363 ## metadata/src/main/java/org/apache/kafka/image/ClusterImage.java: ## @@ -34,9 +34,15 @@ public final class ClusterImage { public static final ClusterImage EMPTY = new ClusterImage(Collections.emptyMap()); private final Map brokers; +private final Map zkBrokers; public ClusterImage(Map brokers) { this.brokers = Collections.unmodifiableMap(brokers); +this.zkBrokers = Collections.unmodifiableMap(brokers Review Comment: I thought about this. Have to not strong preference since methods based on zkBrokers should not be called often. Will change. ## core/src/main/scala/kafka/controller/ControllerChannelManager.scala: ## @@ -359,13 +400,15 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " + s"new one. Some UpdateMetadata state changes to brokers $updateMetadataRequestBrokerSet with partition info " + s"$updateMetadataRequestPartitionInfoMap might be lost ") +metadataInstance = metadataProvider() Review Comment: Yes. ## metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java: ## @@ -206,6 +206,9 @@ public void replay(ApiMessage record) { * updating the highest offset and epoch. */ break; +case ZK_MIGRATION_STATE_RECORD: +// TODO handle this +break; Review Comment: Spill over. Will temove this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13029: MINOR: Add zk migration field to the ApiVersionsResponse
cmccabe merged PR #13029: URL: https://github.com/apache/kafka/pull/13029 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1054893924 ## clients/src/main/java/org/apache/kafka/common/errors/FencedMemberEpoch.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class FencedMemberEpoch extends ApiException { Review Comment: I feel like we typically do that for classes that extend ApiException. (Same on other classes changed in this file.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1054893630 ## clients/src/main/java/org/apache/kafka/common/errors/FencedMemberEpoch.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class FencedMemberEpoch extends ApiException { Review Comment: Do we want to add the word Exception to the end of the class name and file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #12901: KAFKA-14367; Add `TnxOffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on PR #12901: URL: https://github.com/apache/kafka/pull/12901#issuecomment-1362182388 reminder to rebase here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054880470 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java: ## @@ -227,6 +227,24 @@ public boolean requireStable() { return data.requireStable(); } +public List groups() { +if (version() >= 8) { +return data.groups(); +} else { +OffsetFetchRequestData.OffsetFetchRequestGroup group = +new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId()); + +data.topics().forEach(topic -> { Review Comment: I feel like the streams + collection to list is extraneous and I'm not sure it's much more intuitive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054880470 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java: ## @@ -227,6 +227,24 @@ public boolean requireStable() { return data.requireStable(); } +public List groups() { +if (version() >= 8) { +return data.groups(); +} else { +OffsetFetchRequestData.OffsetFetchRequestGroup group = +new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId()); + +data.topics().forEach(topic -> { Review Comment: Is this doing the same thing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica
[ https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651012#comment-17651012 ] Jun Rao commented on KAFKA-9087: [~chia7712] : Thanks for the update. About the race condition. I am still wondering how we got into that state. Let's say ReplicaAlterLogDirsThread is in the about to append the fetched data to a future log when the log's dir is being changed. In this case, we will first remove the partition from the partitionState in ReplicaAlterLogDirsThread, recreate the future log and add the partition to ReplicaAlterLogDirsThread again. If ReplicaAlterLogDirsThread tries to append an old fetched data, it should fail the following test since the fetch offset in the fetch request and the currentFetchState will be different. [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L347] So, ReplicaAlterLogDirsThread is supposed to ignore the old fetched data and fetch again using the new fetch offset. I am wondering why that didn't happen. > ReplicaAlterLogDirs stuck and restart fails with > java.lang.IllegalStateException: Offset mismatch for the future replica > > > Key: KAFKA-9087 > URL: https://issues.apache.org/jira/browse/KAFKA-9087 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0 >Reporter: Gregory Koshelev >Priority: Major > > I've started multiple replica movements between log directories and some > partitions were stuck. After the restart of the broker I've got exception in > server.log: > {noformat} > [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to > (kafka.server.ReplicaAlterLogDirsThread) > org.apache.kafka.common.KafkaException: Error processing data for partition > metrics_timers-35 offset 4224887 > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: java.lang.IllegalStateException: Offset mismatch for the future > replica metrics_timers-35: fetched offset = 4224887, log end offset = 0. > at > kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311) > ... 16 more > [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped > (kafka.server.ReplicaAlterLogDirsThread) > {noformat} > Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix > the problem. To fix it I've stopped the broker and remove all the stuck > future partitions. > Detailed log below > {noformat} > [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest > offset in the log is 4224886 (kafka.log.Log) > [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Loading producer state till offset 4224887 with > message format version 2 (kafka.log.Log) > [2019-06-11 12:21:34,980] INFO [ProducerStateManager > partition=metrics_timers-35] Loading producer state from snapshot file > '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' > (kafka.log.ProducerStateManager) > [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Completed load of log with 1
[jira] [Commented] (KAFKA-14545) MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions
[ https://issues.apache.org/jira/browse/KAFKA-14545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651011#comment-17651011 ] Chris Solidum commented on KAFKA-14545: --- Adding a check for null values in [MirrorCheckpointTask.checkpointsForGroups |https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L172]seems like the simplest fix for this issue. > MirrorCheckpointTask throws NullPointerException when group hasn't consumed > from some partitions > > > Key: KAFKA-14545 > URL: https://issues.apache.org/jira/browse/KAFKA-14545 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.3.0 >Reporter: Chris Solidum >Assignee: Chris Solidum >Priority: Major > > MirrorTaskConnector looks like it's throwing a NullPointerException when a > consumer group hasn't consumed from all topics from a partition. This blocks > the syncing of consumer group offsets to the target cluster. The stacktrace > and error message is as follows: > {code:java} > WARN Failure polling consumer state for checkpoints. > (org.apache.kafka.connect.mirror.MirrorCheckpointTask) > at java.base/java.lang.Thread.run(Thread.java:829)Dec 20 > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191) > java.lang.NullPointerException > {code} > This seems to happen if the OffsetFetch call returns a > OffsetFetchPartitionResponsePartition with a negative commitedOffset. > Mirrormaker should handle this case more gracefully and still be sync over > consumer offsets for non negative partitions. > {code:java} > TRACE [AdminClient clientId=adminclient-55] > Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, > tries=0, nextAllowedTryMs=0) got response > OffsetFetchResponseData(throttleTimeMs=0, > topics=[OffsetFetchResponseTopic(name='XXX', > partitions=[OffsetFetchResponsePartition(partitionIndex=1, > committedOffset=866, committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=11,
[jira] [Updated] (KAFKA-14545) MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions
[ https://issues.apache.org/jira/browse/KAFKA-14545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Solidum updated KAFKA-14545: -- Description: MirrorTaskConnector looks like it's throwing a NullPointerException when a consumer group hasn't consumed from all topics from a partition. This blocks the syncing of consumer group offsets to the target cluster. The stacktrace and error message is as follows: {code:java} WARN Failure polling consumer state for checkpoints. (org.apache.kafka.connect.mirror.MirrorCheckpointTask) at java.base/java.lang.Thread.run(Thread.java:829)Dec 20 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191) java.lang.NullPointerException {code} This seems to happen if the OffsetFetch call returns a OffsetFetchPartitionResponsePartition with a negative commitedOffset. Mirrormaker should handle this case more gracefully and still be sync over consumer offsets for non negative partitions. {code:java} TRACE [AdminClient clientId=adminclient-55] Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, tries=0, nextAllowedTryMs=0) got response OffsetFetchResponseData(throttleTimeMs=0, topics=[OffsetFetchResponseTopic(name='XXX', partitions=[OffsetFetchResponsePartition(partitionIndex=1, committedOffset=866, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=11, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=4, committedOffset=872, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=7, committedOffset=863, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=10, committedOffset=835, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=13, committedOffset=860, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=12, committedOffset=885, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=3, committedOffset=771, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=6, committedOffset=859, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=2,
[jira] [Created] (KAFKA-14545) MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions
Chris Solidum created KAFKA-14545: - Summary: MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions Key: KAFKA-14545 URL: https://issues.apache.org/jira/browse/KAFKA-14545 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.3.0 Reporter: Chris Solidum Assignee: Chris Solidum MirrorTaskConnector looks like it's throwing a NullPointerException when a consumer group hasn't consumed from all topics from a partition. This blocks the syncing of consumer group offsets to the target cluster. The stacktrace and error message is as follows: {code:java} WARN Failure polling consumer state for checkpoints. (org.apache.kafka.connect.mirror.MirrorCheckpointTask) at java.base/java.lang.Thread.run(Thread.java:829)Dec 20 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191) java.lang.NullPointerException {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #13036: KAFKA-14534: Reduce flakiness in TransactionsExpirationTest
gharris1727 commented on PR #13036: URL: https://github.com/apache/kafka/pull/13036#issuecomment-1362127508 @jolshan Could you take a look at this test stabilization? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13036: KAFKA-14534: Reduce flakiness in TransactionsExpirationTest
gharris1727 opened a new pull request, #13036: URL: https://github.com/apache/kafka/pull/13036 This test asserts that after a producerId expires and before a transactionId expires, the producerId is reused in a subsequent epoch. The transactionId expiration time used in this test was too short, and a race condition between the two expirations was occasionally causing a new producerId to be returned without an epoch bump. Signed-off-by: Greg Harris ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
philipnee commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054847935 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java: ## @@ -227,6 +227,24 @@ public boolean requireStable() { return data.requireStable(); } +public List groups() { +if (version() >= 8) { +return data.groups(); +} else { Review Comment: The else block is not needed. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1338,27 +1337,22 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset fetch request */ - def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = { + def handleOffsetFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val version = request.header.apiVersion if (version == 0) { - // reading offsets from ZK - handleOffsetFetchRequestV0(request) -} else if (version >= 1 && version <= 7) { - // reading offsets from Kafka - handleOffsetFetchRequestBetweenV1AndV7(request) + handleOffsetFetchRequestFromZookeeper(request) Review Comment: i think it can be more idiomatic and cleaner if we rewrite it using match - case ``` version match { case 0: handle... case ... } ``` ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java: ## @@ -227,6 +227,24 @@ public boolean requireStable() { return data.requireStable(); } +public List groups() { +if (version() >= 8) { +return data.groups(); +} else { +OffsetFetchRequestData.OffsetFetchRequestGroup group = +new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId()); + +data.topics().forEach(topic -> { Review Comment: Do you think this can be a bit more intuitive? ``` group.topics.addAll( data.topics().stream().map(new Offset...).collect(Collections.toList()) ) ``` ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java: ## @@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs, this.error = null; } +public OffsetFetchResponse(List groups, short version) { +super(ApiKeys.OFFSET_FETCH); +data = new OffsetFetchResponseData(); + +if (version >= 8) { +data.setGroups(groups); +error = null; + +for (OffsetFetchResponseGroup group : data.groups()) { +this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode())); +} Review Comment: we could just return here to avoid the else. also save 1 indentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
philipnee commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1054670620 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -1933,11 +1943,79 @@ private Map topicPartitionTags(TopicPartition tp) { } } +// Visible for testing +void maybeCloseFetchSessions(final Timer timer) { +final Cluster cluster = metadata.fetch(); +final List> requestFutures = new ArrayList<>(); +for (final Map.Entry entry : sessionHandlers.entrySet()) { +final FetchSessionHandler sessionHandler = entry.getValue(); +// set the session handler to notify close. This will set the next metadata request to send close message. +sessionHandler.notifyClose(); + +final int sessionId = sessionHandler.sessionId(); +final Integer fetchTargetNodeId = entry.getKey(); +// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will +// skip sending the close request. +final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); +if (fetchTarget == null || client.isUnavailable(fetchTarget)) { +log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); +continue; +} + +final RequestFuture responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget); +responseFuture.addListener(new RequestFutureListener() { +@Override +public void onSuccess(ClientResponse value) { +log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); Review Comment: do we really need to log all of the successful close? I think logging the failure ones could be sufficient. ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -2403,17 +2404,40 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } -private void close(long timeoutMs, boolean swallowException) { +private void close(Duration timeout, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); -try { -if (coordinator != null) -coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); -} catch (Throwable t) { -firstException.compareAndSet(null, t); -log.error("Failed to close coordinator", t); + +final Timer closeTimer = (time == null) ? new SystemTime().timer(Math.min(timeout.toMillis(), requestTimeoutMs)) : time.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); +// Close objects with a timeout. The timeout is required because fetcher makes request to the server in the +// process of closing which may not respect the overall timeout defined for closing the consumer. +if (coordinator != null) { +try { +coordinator.close(closeTimer); Review Comment: note: I think it also tries blocking commits the offset during the close. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -446,6 +429,33 @@ private RequestFuture sendMetadataRequest(MetadataRequest.Builde return client.send(node, request); } +/** + * Send Fetch Request to Kafka cluster asynchronously. + * + * This method is visible for testing. + * + * @return A future that indicates result of sent Fetch request + */ +RequestFuture sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, Review Comment: can this be private? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -1933,11 +1943,79 @@ private Map topicPartitionTags(TopicPartition tp) { } } +// Visible for testing +void maybeCloseFetchSessions(final Timer timer) { +final Cluster cluster = metadata.fetch(); +final List> requestFutures = new ArrayList<>(); +for (final Map.Entry entry : sessionHandlers.entrySet()) { +final FetchSessionHandler sessionHandler = entry.getValue(); +// set the session handler to notify close. This will set the next metadata request to send close message. +sessionHandler.notifyClose(); + +final int sessionId = sessionHandler.sessionId(); +final Integer fetchTargetNodeId = entry.getKey(); +// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will +// skip sending the close request. +final Node
[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
junrao commented on code in PR #13012: URL: https://github.com/apache/kafka/pull/13012#discussion_r1054846959 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging { private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = { LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset) } + + // Visible for benchmarking + def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = { +new LogValidator.MetricsRecorder { + def recordInvalidMagic(): Unit = +allTopicsStats.invalidMagicNumberRecordsPerSec.mark() + + def recordInvalidOffset(): Unit = Review Comment: Sounds good. We can keep this as it is then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14271) Topic recreation fails in KRaft mode when topic contains collidable characters
[ https://issues.apache.org/jira/browse/KAFKA-14271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14271: -- Fix Version/s: (was: 3.4.0) (was: 3.3.2) > Topic recreation fails in KRaft mode when topic contains collidable characters > -- > > Key: KAFKA-14271 > URL: https://issues.apache.org/jira/browse/KAFKA-14271 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.0 >Reporter: Jeffrey Tolar >Priority: Major > Attachments: topic.with.dots.log, topicwithoutdots.log > > > We recently updated one of our clusters from 3.2.0 to 3.3.0 (primarily to get > the fix for KAFKA-13909). This cluster is running KRaft mode. > This is a cluster used for some integration tests - each test deletes the > topics it uses before the test to ensure a clean slate for the test; the > brokers get restarted in-between tests, but the broker data isn't deleted. > With 3.3.0, this semi-crashes Kafka. The brokers stay running, but the topic > creation fails: > {noformat} > [2022-09-30 17:17:59,216] WARN [Controller 1] createTopics: failed with > unknown server exception NoSuchElementException at epoch 1 in 601 us. > Renouncing leadership and reverting to the last committed offset 18. > (org.apache.kafka.controller.QuorumController) > java.util.NoSuchElementException > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167) > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139) > at > org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120) > at > org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799) > at > org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567) > at > org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} > This appears to be because our topic names contain {{.}}'s. Here's a quick > reproducer script: > {noformat} > #!/bin/bash > VERSION=3.3.0 > TOPIC=$1 > set -x > rm -rf -- kafka_2.13-${VERSION} kafka_2.13-${VERSION}.tgz > /tmp/kraft-combined-logs > trap 'kill -- "-$$" && wait' EXIT > curl -O https://dlcdn.apache.org/kafka/$VERSION/kafka_2.13-${VERSION}.tgz > tar -xzf kafka_2.13-${VERSION}.tgz > cd kafka_2.13-${VERSION} > id=$(./bin/kafka-storage.sh random-uuid) > ./bin/kafka-storage.sh format -t $id -c ./config/kraft/server.properties > ./bin/kafka-server-start.sh config/kraft/server.properties > broker.log 2>&1 & > sleep 1 > ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server > localhost:9092 > ./bin/kafka-topics.sh --delete --topic "$TOPIC" --bootstrap-server > localhost:9092 > ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server > localhost:9092 > sleep 1 > {noformat} > Running {{./test-kafka.sh topic.with.dots}} exhibits the failure; using > {{topicwithoutdots}} works as expected. > I'll attach the broker logs from each run. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054835003 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3405,6 +3405,341 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) } + @ParameterizedTest Review Comment: Ah I missed this. Thanks for sharing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054834094 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ## @@ -52,6 +54,7 @@ CompletableFuture joinGroup( ); /** +<<< HEAD Review Comment: nit: remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054833397 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset commit request */ - def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val header = request.header + def handleOffsetCommitRequest( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] -val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() -val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() -// the callback for sending an offset commit response -def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) -combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { -debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") - } -} - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) -} - - // reject the request if not authorized to the group +// Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( -offsetCommitRequest.data.topics, -error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( -new OffsetCommitResponseData() -.setTopics(responseTopicList) -.setThrottleTimeMs(requestThrottleMs) - )) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { -for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION -} - } - sendResponseCallback(errorMap.toMap) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) } else { - val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] + val authorizedTopics = authHelper.filterByAuthorized( +request.context, +READ, +TOPIC, +offsetCommitRequest.data.topics.asScala + )(_.name) + + val responseBuilder = new OffsetCommitResponse.Builder() + val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => +if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( +topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) +} else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( +topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) +} else { + // Otherwise, we check all partitions to ensure that they all
[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054830892 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -234,4 +240,78 @@ class GroupCoordinatorAdapter( } CompletableFuture.completedFuture(results) } + + override def commitOffsets( +context: RequestContext, +request: OffsetCommitRequestData, +bufferSupplier: BufferSupplier + ): CompletableFuture[OffsetCommitResponseData] = { +val future = new CompletableFuture[OffsetCommitResponseData]() + +def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { + val response = new OffsetCommitResponseData() + val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + commitStatus.forKeyValue { (tp, error) => +var topic = byTopics(tp.topic) +if (topic == null) { + topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> topic + response.topics.add(topic) +} +topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) +} + +// "default" expiration timestamp is now + retention (and retention may be overridden if v2) +// expire timestamp is computed differently for v1 and v2. +// - If v1 and no explicit commit timestamp is provided we treat it the same as v5. +// - If v1 and explicit retention time is provided we calculate expiration timestamp based on that Review Comment: I realize this comment was copy-pasted, but we can clean it up I think :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
ijuma commented on PR #13012: URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362078506 @junrao Pushed a change that addresses two of the review comments. I left a comment for the other review comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
dajac commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054822770 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset commit request */ - def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val header = request.header + def handleOffsetCommitRequest( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] -val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() -val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() -// the callback for sending an offset commit response -def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) -combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { -debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") - } -} - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) -} - - // reject the request if not authorized to the group +// Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( -offsetCommitRequest.data.topics, -error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( -new OffsetCommitResponseData() -.setTopics(responseTopicList) -.setThrottleTimeMs(requestThrottleMs) - )) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { -for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION -} - } - sendResponseCallback(errorMap.toMap) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) } else { - val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] + val authorizedTopics = authHelper.filterByAuthorized( +request.context, +READ, +TOPIC, +offsetCommitRequest.data.topics.asScala + )(_.name) + + val responseBuilder = new OffsetCommitResponse.Builder() + val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => +if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( +topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) +} else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( +topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) +} else { + // Otherwise, we check all partitions to ensure that they all
[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
ijuma commented on code in PR #13012: URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822697 ## clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java: ## @@ -14,15 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.kafka.common.utils; -package kafka.common +import org.junit.jupiter.api.Test; -import org.apache.kafka.common.errors.ApiException -import org.apache.kafka.common.requests.ProduceResponse.RecordError +import static org.junit.jupiter.api.Assertions.assertEquals; -import scala.collection.Seq - -class RecordValidationException(val invalidException: ApiException, -val recordErrors: Seq[RecordError]) - extends RuntimeException(invalidException) { +public class PrimitiveRefTest { +@Test +public void testLongRef() { Review Comment: `IntRef` was there before, but yes, I can take the chance to improve coverage. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long, config.messageTimestampDifferenceMaxMs, leaderEpoch, origin, -interBrokerProtocolVersion, -brokerTopicStats, +interBrokerProtocolVersion + ) + validator.validateMessagesAndAssignOffsets(offset, +validatorMetricsRecorder(brokerTopicStats.allTopicsStats), Review Comment: Good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
ijuma commented on code in PR #13012: URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822187 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging { private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = { LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset) } + + // Visible for benchmarking + def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = { +new LogValidator.MetricsRecorder { + def recordInvalidMagic(): Unit = +allTopicsStats.invalidMagicNumberRecordsPerSec.mark() + + def recordInvalidOffset(): Unit = Review Comment: It seemed a bit arbitrary that we combined these into the same metric. I thought it may be cleaner to define the interface in a general way and then have the implementation match the current behavior. I don't feel too strongly though, so if you still prefer to combine these into the same method, I can do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054819922 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset commit request */ - def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val header = request.header + def handleOffsetCommitRequest( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] -val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() -val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() -// the callback for sending an offset commit response -def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) -combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { -debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") - } -} - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) -} - - // reject the request if not authorized to the group +// Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( -offsetCommitRequest.data.topics, -error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( -new OffsetCommitResponseData() -.setTopics(responseTopicList) -.setThrottleTimeMs(requestThrottleMs) - )) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { -for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION -} - } - sendResponseCallback(errorMap.toMap) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) } else { - val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] + val authorizedTopics = authHelper.filterByAuthorized( +request.context, +READ, +TOPIC, +offsetCommitRequest.data.topics.asScala + )(_.name) + + val responseBuilder = new OffsetCommitResponse.Builder() + val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => +if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( +topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) +} else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( +topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) +} else { + // Otherwise, we check all partitions to ensure that they all
[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054814682 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -234,4 +240,78 @@ class GroupCoordinatorAdapter( } CompletableFuture.completedFuture(results) } + + override def commitOffsets( +context: RequestContext, +request: OffsetCommitRequestData, +bufferSupplier: BufferSupplier + ): CompletableFuture[OffsetCommitResponseData] = { +val future = new CompletableFuture[OffsetCommitResponseData]() + +def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { + val response = new OffsetCommitResponseData() + val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + commitStatus.forKeyValue { (tp, error) => +var topic = byTopics(tp.topic) +if (topic == null) { + topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> topic + response.topics.add(topic) +} +topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) +} + +// "default" expiration timestamp is now + retention (and retention may be overridden if v2) +// expire timestamp is computed differently for v1 and v2. +// - If v1 and no explicit commit timestamp is provided we treat it the same as v5. +// - If v1 and explicit retention time is provided we calculate expiration timestamp based on that Review Comment: This comment is a little confusing. So it seems like I understand v1 semantics -- we use the commit timestamp if provided for "now". For v2 and beyond, I'm a big confused about the last two bullets. It makes it seem like there is no difference between v2-v4 and v5+, but I think the difference is that the retention can no longer be overridden in v5+. That part is unclear in the last bullet as it says "partition expiration" but "RetentionTimeMs" is the field name. This is my understanding based on the code ``` version: (can define commit time aka "now"), (can define retention time) 1 yesno 2 no yes 3 no yes 4 no yes 5+ no no ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054814682 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -234,4 +240,78 @@ class GroupCoordinatorAdapter( } CompletableFuture.completedFuture(results) } + + override def commitOffsets( +context: RequestContext, +request: OffsetCommitRequestData, +bufferSupplier: BufferSupplier + ): CompletableFuture[OffsetCommitResponseData] = { +val future = new CompletableFuture[OffsetCommitResponseData]() + +def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { + val response = new OffsetCommitResponseData() + val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + commitStatus.forKeyValue { (tp, error) => +var topic = byTopics(tp.topic) +if (topic == null) { + topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> topic + response.topics.add(topic) +} +topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) +} + +// "default" expiration timestamp is now + retention (and retention may be overridden if v2) +// expire timestamp is computed differently for v1 and v2. +// - If v1 and no explicit commit timestamp is provided we treat it the same as v5. +// - If v1 and explicit retention time is provided we calculate expiration timestamp based on that Review Comment: This comment is a little confusing. So it seems like I understand v1 semantics -- we use the commit timestamp if provided for "now". For v2 and beyond, I'm a big confused about the last two bullets. It makes it seem like there is no difference between v2-v4 and v5+, but I think the difference is that the retention can no longer be overridden in v5+. That part is unclear in the last bullet as it says "partition expiration" but "partition retention" is the field name. This is my understanding based on the code ``` version: (can define commit time aka "now"), (can define retention time) 1 yesno 2 no yes 3 no yes 4 no yes 5+ no no ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054793572 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest { } } - @Test - def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = { Review Comment: I need to look into other APIs to see how we usually handle this. At a first glance, it seems that we are not consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request, #13035: KAFKA-9087 The changed future log causes that ReplicaAlterLogDirsThre…
chia7712 opened a new pull request, #13035: URL: https://github.com/apache/kafka/pull/13035 We encountered this error also. The root cause is about race condition. 1. ReplicaAlterLogDirsThread has fetched the data for topic partition 1. ReplicaManager#alterReplicaLogDirs changes the future log (the start offset is reset to 0) 1. ReplicaManager#alterReplicaLogDirs call AbstractFetcherManager#addFetcherForPartitions to add the topic partition (it just change the partition state in the ReplicaAlterLogDirsThread) 1. ReplicaAlterLogDirsThread starts to process the fetched data, and it throws IllegalStateException because the future log get renewed and start offset is zero. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054782481 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3405,6 +3405,341 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) } + @ParameterizedTest Review Comment: ``` val groups = Map( "group-1" -> List( new TopicPartition("foo", 0), new TopicPartition("foo", 1) ).asJava, "group-2" -> null, "group-3" -> null, ).asJava ``` with `null`, it fetch all offsets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
cmccabe commented on PR #13033: URL: https://github.com/apache/kafka/pull/13033#issuecomment-1362010708 @mumrah : you're right that we need to be checking MV before generating these records. On second thought, I also think we should be using abort transation rather than checking record epochs to see if a transaction was aborted -- it will just make it easier for tools to process the log. I will make that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
dajac commented on PR #12886: URL: https://github.com/apache/kafka/pull/12886#issuecomment-1362010257 @jolshan @OmniaGM Thanks for your comments. I just updated the PR. I have tried to simplify the code in KafkaApis as much as possible. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
dajac commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054778967 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset commit request */ def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val header = request.header val offsetCommitRequest = request.body[OffsetCommitRequest] -val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() -val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() -// the callback for sending an offset commit response -def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) -combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { -debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") +def sendResponse(response: OffsetCommitResponse): Unit = { + trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " + +s"client ${request.header.clientId}.") + + if (isDebugEnabled) { +response.data.topics.forEach { topic => + topic.partitions.forEach { partition => +if (partition.errorCode != Errors.NONE.code) { + debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " + +s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}") +} } } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) + } + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { +response.maybeSetThrottleTimeMs(requestThrottleMs) +response + }) } - // reject the request if not authorized to the group +// Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( -offsetCommitRequest.data.topics, -error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( -new OffsetCommitResponseData() -.setTopics(responseTopicList) -.setThrottleTimeMs(requestThrottleMs) - )) + sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { -for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION + sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) +} else { + val offsetCommitResponseData = new OffsetCommitResponseData() + val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + val authorizedTopics = authHelper.filterByAuthorized( +request.context, +READ, +TOPIC, +offsetCommitRequest.data.topics.asScala + )(_.name) + + def makePartitionResponse( +partitionIndex: Int, +error: Errors + ): OffsetCommitResponseData.OffsetCommitResponsePartition = { +new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code) + } + + def addTopicToResponse( +topic: OffsetCommitRequestData.OffsetCommitRequestTopic, +error: Errors + ): Unit = { +val topicResponse = new
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054778185 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3405,6 +3405,341 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) } + @ParameterizedTest Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
cmccabe commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054777972 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1071,7 +1136,11 @@ public ControllerResult generateRecordsAndResult() throws Exception { log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + "at metadata.version {} from {}.", bootstrapMetadata.records().size(), bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("initial transaction"), (short) 0)); records.addAll(bootstrapMetadata.records()); +records.add(new ApiMessageAndVersion( +new EndTransactionRecord(), (short) 0)); Review Comment: it's not needed but I wanted to get some testing on this. thinking more, we should only begin a transaction here when we're doing upgrade-from-zk and in premigration (but we need to have that code in first...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
cmccabe commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054777011 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -76,6 +77,21 @@ class BrokerMetadataListener( */ private var _highestTimestamp = -1L + /** + * The log epoch of the current transaction, or -1 if there is none. + */ + private var _transactionEpoch = -1; + + /** + * The log offset of the current transaction, or -1L if there is none. + */ + private var _transactionOffset = -1L; + + /** + * Pending records in the current transaction, or null if there is none such. + */ + private var _transactionRecords: util.ArrayList[ApiMessageAndVersion] = _ + Review Comment: yeah, this file will be removed. but that's too ambitious for 3.4 I think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
cmccabe commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054776436 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -860,7 +886,7 @@ public void handleCommit(BatchReader reader) { int i = 1; for (ApiMessageAndVersion message : messages) { try { -replay(message.message(), Optional.empty(), offset); +replay(message.message(), Optional.empty(), offset + i - 1, epoch); Review Comment: previously it was sending the end offset of the batch. it now sends the actual record offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
cmccabe commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054775292 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -289,6 +316,59 @@ class BrokerMetadataListener( BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), numBytes) } + private def applyRecordToDelta( +delta: MetadataDelta, +messageAndVersion: ApiMessageAndVersion, +baseOffset: Long, +index: Int, +snapshotName: Option[String] + ): Unit = { +try { + delta.replay(messageAndVersion.message()) +} catch { + case e: Throwable => snapshotName match { +case None => metadataLoadingFaultHandler.handleFault( + s"Error replaying metadata log record at offset ${baseOffset + index}", e) +case Some(name) => metadataLoadingFaultHandler.handleFault( + s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e) + } +} + } + + private def beginTransaction(newTransactionEpoch: Int, newTransactionOffset: Long): Unit = { +if (_transactionEpoch != -1) { + abortTransaction("a new begin transaction record appeared", newTransactionOffset) +} +_transactionEpoch = newTransactionEpoch +_transactionOffset = newTransactionOffset +_transactionRecords = new util.ArrayList[ApiMessageAndVersion] +log.debug("Beginning metadata transaction {}_{}.", _transactionOffset, _transactionEpoch) + } + + private def endTransaction(delta: MetadataDelta, endOffset: Long): Unit = { +log.debug("Ending metadata transaction {}_{} at offset {}", _transactionOffset, _transactionEpoch, endOffset) +var index = 0 +_transactionRecords.forEach(record => { Review Comment: yes, I will check if there is a transaction and raise a fault if so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13029: MINOR: Add zk migration field to the ApiVersionsResponse
cmccabe commented on code in PR #13029: URL: https://github.com/apache/kafka/pull/13029#discussion_r1054764792 ## clients/src/main/resources/common/message/ApiVersionsResponse.json: ## @@ -69,6 +69,9 @@ {"name": "MinVersionLevel", "type": "int16", "versions": "3+", "about": "The cluster-wide finalized min version level for the feature."} ] -} +}, +{ "name": "ZkMigrationReady", "type": "bool", "versions": "4+", "taggedVersions": "4+", Review Comment: version should be 3, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13001: KAFKA-14446: code style improvements for broker-to-controller forwarding
cmccabe merged PR #13001: URL: https://github.com/apache/kafka/pull/13001 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica
[ https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650979#comment-17650979 ] Chia-Ping Tsai commented on KAFKA-9087: --- We encountered this error also. The root cause is about race condition. # ReplicaAlterLogDirsThread has fetched the data for topic partition # ReplicaManager#alterReplicaLogDirs changes the future log (the start offset is reset to 0) # ReplicaManager#alterReplicaLogDirs call AbstractFetcherManager#addFetcherForPartitions to add the topic partition (it just change the partition state in the ReplicaAlterLogDirsThread) # ReplicaAlterLogDirsThread starts to process the fetched data, and it throws IllegalStateException because the future log get renewed and start offset is zero. This bug causes the future log of topic partition can't get synced forever as the topic partition is marked as failed. It seems to me that we should return None instead of throwing IllegalStateException when start offset of future log is zero. [~junrao] WDYT? > ReplicaAlterLogDirs stuck and restart fails with > java.lang.IllegalStateException: Offset mismatch for the future replica > > > Key: KAFKA-9087 > URL: https://issues.apache.org/jira/browse/KAFKA-9087 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0 >Reporter: Gregory Koshelev >Priority: Major > > I've started multiple replica movements between log directories and some > partitions were stuck. After the restart of the broker I've got exception in > server.log: > {noformat} > [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to > (kafka.server.ReplicaAlterLogDirsThread) > org.apache.kafka.common.KafkaException: Error processing data for partition > metrics_timers-35 offset 4224887 > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: java.lang.IllegalStateException: Offset mismatch for the future > replica metrics_timers-35: fetched offset = 4224887, log end offset = 0. > at > kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311) > ... 16 more > [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped > (kafka.server.ReplicaAlterLogDirsThread) > {noformat} > Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix > the problem. To fix it I've stopped the broker and remove all the stuck > future partitions. > Detailed log below > {noformat} > [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest > offset in the log is 4224886 (kafka.log.Log) > [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Loading producer state till offset 4224887 with > message format version 2 (kafka.log.Log) > [2019-06-11 12:21:34,980] INFO [ProducerStateManager > partition=metrics_timers-35] Loading producer state from snapshot file > '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' > (kafka.log.ProducerStateManager) > [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Completed load of log with 1 segments, log start > offset
[GitHub] [kafka] C0urante commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks
C0urante commented on code in PR #12802: URL: https://github.com/apache/kafka/pull/12802#discussion_r1054700417 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1658,11 +1659,20 @@ private void backoff(long ms) { backoffRetries = BACKOFF_RETRIES; } -private void startAndStop(Collection> callables) { +// Visible for testing +void startAndStop(Collection> callables) { try { startAndStopExecutor.invokeAll(callables); } catch (InterruptedException e) { // ignore +} catch (RejectedExecutionException e) { +// Shutting down. Just log the exception +if (stopping.get()) { +log.debug("RejectedExecutionException thrown while herder is shutting down. This could be " + +"because startAndStopExecutor is either already shutdown or is full."); Review Comment: The [Executors::newFixedThreadPool Javadocs](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-java.util.concurrent.ThreadFactory-) state that the executor operates off of an "unbounded queue", so it's misleading to state that this error could arise because the executor is full. It would also probably help to be clear to users that this is not a sign that something's wrong with the worker. ```suggestion log.debug("Ignoring RejectedExecutionException thrown while starting/stopping connectors/tasks en masse " + "as the herder is already in the process of shutting down. This is not indicative of a problem and is normal behavior"); ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -271,6 +275,9 @@ public DistributedHerder(DistributedConfig config, this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG); this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); +// Timeout for herderExecutor to gracefully terminate is set to a value to accommodate +// reading to the end of the config topic + successfully attempting to stop all connectors and tasks and a buffer of 10s +this.herderExecutorTerminationTimeoutMs = this.workerSyncTimeoutMs + this.workerTasksShutdownTimeoutMs + Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS + 1; Review Comment: Do these have to be stored as instance variables? It clutters up the class a bit to keep having so many private fields that are only used once. Maybe we could remove the `workerTasksShutdownTimeoutMs` and `herderExecutorTerminationTimeoutMs` fields, and inline the logic responsible for computing them and move it into `stop`? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() { PowerMock.verifyAll(); } +@Test(expected = RejectedExecutionException.class) +@SuppressWarnings("unchecked") +public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() throws InterruptedException { +ExecutorService startAndStopExecutor = EasyMock.mock(ExecutorService.class); +herder.startAndStopExecutor = startAndStopExecutor; + +Callable connectorStartingCallable = () -> null; + + EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new RejectedExecutionException()); + +PowerMock.replayAll(startAndStopExecutor); + + herder.startAndStop(Collections.singletonList(connectorStartingCallable)); + +} + +@Test +public void shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws TimeoutException { +connectProtocolVersion = CONNECT_PROTOCOL_V1; +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion); +final int rebalanceDelayMs = 2; + +// Assign the connector to this worker, and have it start +expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); + +member.wakeup(); +PowerMock.expectLastCall(); +member.requestRejoin(); +PowerMock.expectLastCall(); +member.maybeLeaveGroup(anyString()); +PowerMock.expectLastCall(); +worker.stopAndAwaitConnectors(); +PowerMock.expectLastCall(); +worker.stopAndAwaitTasks(); +
[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
junrao commented on code in PR #13012: URL: https://github.com/apache/kafka/pull/13012#discussion_r1054705599 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging { private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = { LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset) } + + // Visible for benchmarking + def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = { +new LogValidator.MetricsRecorder { + def recordInvalidMagic(): Unit = +allTopicsStats.invalidMagicNumberRecordsPerSec.mark() + + def recordInvalidOffset(): Unit = Review Comment: recordInvalidOffset() and recordInvalidSequence() do the same thing. Should we just have a single method? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long, config.messageTimestampDifferenceMaxMs, leaderEpoch, origin, -interBrokerProtocolVersion, -brokerTopicStats, +interBrokerProtocolVersion + ) + validator.validateMessagesAndAssignOffsets(offset, +validatorMetricsRecorder(brokerTopicStats.allTopicsStats), Review Comment: Could we just create a single instance of MetricsRecorder and reuse it for all appends? ## clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java: ## @@ -14,15 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.kafka.common.utils; -package kafka.common +import org.junit.jupiter.api.Test; -import org.apache.kafka.common.errors.ApiException -import org.apache.kafka.common.requests.ProduceResponse.RecordError +import static org.junit.jupiter.api.Assertions.assertEquals; -import scala.collection.Seq - -class RecordValidationException(val invalidException: ApiException, -val recordErrors: Seq[RecordError]) - extends RuntimeException(invalidException) { +public class PrimitiveRefTest { +@Test +public void testLongRef() { Review Comment: Should we add a similar test for IntRef() too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log
chia7712 commented on PR #12979: URL: https://github.com/apache/kafka/pull/12979#issuecomment-1361909104 file a jira as this is related to true bug -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14544) The "is-future" should be removed from metrics tags after future log becomes current log
Chia-Ping Tsai created KAFKA-14544: -- Summary: The "is-future" should be removed from metrics tags after future log becomes current log Key: KAFKA-14544 URL: https://issues.apache.org/jira/browse/KAFKA-14544 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai we don't remove "is-future=true" tag from future log after the future log becomes "current" log. It causes two potential issues: # the metrics monitors can't get metrics of Log if they don't trace the property "is-future=true". # all Log metrics of specify partition get removed if the partition is moved to another folder again. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #12936: KAFKA-13881: Add Streams package infos
mimaison commented on PR #12936: URL: https://github.com/apache/kafka/pull/12936#issuecomment-1361845545 @mjsax @ableegoldman Can you take a look? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13881) Add package.java for public package javadoc
[ https://issues.apache.org/jira/browse/KAFKA-13881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650968#comment-17650968 ] Mickael Maison commented on KAFKA-13881: [~gharris1727] Are you planning to open a PR for the org.apache.kafka.server.log.remote.storage located in the storage:api module? See the javadoc for this package: https://kafka.apache.org/33/javadoc/org/apache/kafka/server/log/remote/storage/package-frame.html > Add package.java for public package javadoc > --- > > Key: KAFKA-13881 > URL: https://issues.apache.org/jira/browse/KAFKA-13881 > Project: Kafka > Issue Type: Task >Reporter: Tom Bentley >Assignee: Greg Harris >Priority: Trivial > Fix For: 3.5.0 > > > Our public javadoc ([https://kafka.apache.org/31/javadoc/index.html)] doesn't > have any package descriptions, which is a bit intimidating for anyone who is > new to the project. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lihaosky opened a new pull request, #13034: MINOR: remove onChange call in stream assignor assign method
lihaosky opened a new pull request, #13034: URL: https://github.com/apache/kafka/pull/13034 ## Description Remove unnecessary calls in assign method ## Test Existing tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13467) Clients never refresh cached bootstrap IPs
[ https://issues.apache.org/jira/browse/KAFKA-13467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650964#comment-17650964 ] Ivan Yurchenko commented on KAFKA-13467: Probably related https://issues.apache.org/jira/browse/KAFKA-8206 > Clients never refresh cached bootstrap IPs > -- > > Key: KAFKA-13467 > URL: https://issues.apache.org/jira/browse/KAFKA-13467 > Project: Kafka > Issue Type: Improvement > Components: clients, network >Reporter: Matthias J. Sax >Priority: Minor > > Follow up ticket to https://issues.apache.org/jira/browse/KAFKA-13405. > For certain broker rolling upgrade scenarios, it would be beneficial to > expired cached bootstrap server IP addresses and re-resolve those IPs to > allow clients to re-connect to the cluster without the need to restart the > client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic
C0urante merged PR #12947: URL: https://github.com/apache/kafka/pull/12947 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14543) Move LogOffsetMetadata to storage module
Mickael Maison created KAFKA-14543: -- Summary: Move LogOffsetMetadata to storage module Key: KAFKA-14543 URL: https://issues.apache.org/jira/browse/KAFKA-14543 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison Assignee: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054648954 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset commit request */ def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val header = request.header val offsetCommitRequest = request.body[OffsetCommitRequest] -val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() -val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() -// the callback for sending an offset commit response -def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) -combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { -debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") +def sendResponse(response: OffsetCommitResponse): Unit = { + trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " + +s"client ${request.header.clientId}.") + + if (isDebugEnabled) { +response.data.topics.forEach { topic => + topic.partitions.forEach { partition => +if (partition.errorCode != Errors.NONE.code) { + debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " + +s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}") +} } } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) + } + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { +response.maybeSetThrottleTimeMs(requestThrottleMs) +response + }) } - // reject the request if not authorized to the group +// Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( -offsetCommitRequest.data.topics, -error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( -new OffsetCommitResponseData() -.setTopics(responseTopicList) -.setThrottleTimeMs(requestThrottleMs) - )) + sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { -for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION + sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) +} else { + val offsetCommitResponseData = new OffsetCommitResponseData() + val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + val authorizedTopics = authHelper.filterByAuthorized( +request.context, +READ, +TOPIC, +offsetCommitRequest.data.topics.asScala + )(_.name) + + def makePartitionResponse( +partitionIndex: Int, +error: Errors + ): OffsetCommitResponseData.OffsetCommitResponsePartition = { +new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code) + } + + def addTopicToResponse( +topic: OffsetCommitRequestData.OffsetCommitRequestTopic, +error: Errors + ): Unit = { +val topicResponse = new
[GitHub] [kafka] cmccabe commented on a diff in pull request #13029: MINOR: Add zk migration field to the ApiVersionsResponse
cmccabe commented on code in PR #13029: URL: https://github.com/apache/kafka/pull/13029#discussion_r1054647289 ## clients/src/main/resources/common/message/ApiVersionsResponse.json: ## @@ -25,9 +25,12 @@ // not in the header. The length of the header must not change in order to guarantee the // backward compatibility. // + // Version 4 adds zk to kraft migration field to the response used to determine if the controller + // is ready for migration. + // // Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned. - "validVersions": "0-3", + "validVersions": "0-4", Review Comment: It's not necessary to increment the version when adding a tagged field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054640668 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest { } } - @Test - def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = { Review Comment: Are we deciding to actually throw the error? If so we should document that. Otherwise, maybe we can add a comment in the request file that requesting the group multiple times in the request will also give us the response multiple times as of this release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054638304 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3405,6 +3405,341 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) } + @ParameterizedTest Review Comment: Do we have a test here for the fetchAllOffsets path? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14470) Move log layer to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650947#comment-17650947 ] Ismael Juma commented on KAFKA-14470: - [~mimaison] Dependencies of the log layer classes would have to be moved too, yes. In some cases, it may warrant a separate ticket while in others it can be bundled with one of the tickets that already exists (left to the discretion of the contributor). Regarding kraft, yes there are some opportunities once the log layer has been completed moved to the storage layer. I was having a discussion with Jose yesterday about this and will file a ticket to capture it. > Move log layer to storage module > > > Key: KAFKA-14470 > URL: https://issues.apache.org/jira/browse/KAFKA-14470 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > > We introduced the `storage` module as part of KIP-405, but the existing log > layer remains in the `core` module. Moving the log layer to the `storage` > module would be another step towards improved modularity and build times > (similar to `metadata`, `raft` and `group-coordinator`). > We should do this in an incremental manner to make the code review process > easier. I will create separate tasks, each one mapping to one pull request. > In order to understand the feasibility, I tackled a few of the tasks myself. > Help from the community is appreciated for the unassigned tasks, but it > probably makes sense to do that after the initial PRs have been submitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054636234 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchResponse } requestHelper.sendResponseMaybeThrottle(request, createResponse) +CompletableFuture.completedFuture[Unit](()) } - private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): Unit = { -val header = request.header + private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = { Review Comment: Cool. Thanks for simplifying -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054632844 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java: ## @@ -19,6 +19,7 @@ import java.util.Map.Entry; Review Comment: sounds like a good plan. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13933) Stuck SSL/TLS unit tests in case of authentication failure
[ https://issues.apache.org/jira/browse/KAFKA-13933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-13933: Description: When there is an authentication error after the initial TCP connection, the Selector never becomes READY, and its SSL/TLS tests wait forever for readiness. This is actually what happened to me while running SSL/TLS Selector unit tests using an OpenJDK build that does not support the required cipher suites. was: When there is an authentication error after the initial TCP connection, the Selector never becomes READY, and its SSL/TLS tests wait forever waiting for readiness. This is actually what happened to me while running SSL/TLS Selector unit tests using an OpenJDK build that does not support the required cipher suites. > Stuck SSL/TLS unit tests in case of authentication failure > -- > > Key: KAFKA-13933 > URL: https://issues.apache.org/jira/browse/KAFKA-13933 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 3.2.0 >Reporter: Federico Valeri >Assignee: Federico Valeri >Priority: Minor > Fix For: 3.3.0 > > > When there is an authentication error after the initial TCP connection, the > Selector never becomes READY, and its SSL/TLS tests wait forever for > readiness. > This is actually what happened to me while running SSL/TLS Selector unit > tests using an OpenJDK build that does not support the required cipher suites. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] michaeljmarshall commented on a diff in pull request #13032: KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer
michaeljmarshall commented on code in PR #13032: URL: https://github.com/apache/kafka/pull/13032#discussion_r1054618672 ## clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +public class DataOutputStreamWritableTest { +@Test +public void testWritingSlicedByteBuffer() { +ByteBuffer expectedBuffer = ByteBuffer.wrap(new byte[]{2, 3}); +ByteBuffer originalBuffer = ByteBuffer.allocate(4); +for (int i = 0; i < originalBuffer.limit(); i++) { +originalBuffer.put(i, (byte) i); +} +// Move position forward to ensure slice is not whole buffer +originalBuffer.position(2); +ByteBuffer slicedBuffer = originalBuffer.slice(); + Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14132: -- Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # WorkerSinkTaskTest (owner: Divij) *WIP* # WorkerSinkTaskThreadedTest (owner: Divij) *WIP* # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#ff8b00}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#ff8b00}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # WorkerSinkTaskTest (owner: Divij) *WIP* # WorkerSinkTaskThreadedTest (owner: Divij) *WIP* # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#FF8B00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#FF8B00}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#FF8B00}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) *WIP* > # WorkerSinkTaskThreadedTest (owner: Divij) *WIP* > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > #
[GitHub] [kafka] C0urante merged pull request #12735: KAFKA-14132: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest
C0urante merged PR #12735: URL: https://github.com/apache/kafka/pull/12735 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12735: KAFKA-14132: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest
C0urante commented on PR #12735: URL: https://github.com/apache/kafka/pull/12735#issuecomment-1361648543 FYI, I've changed the Jira ticket in the title from KAFKA-14059 (which refers to a different test class) to KAFKA-14132, which is the umbrella issue for migration from EasyMock/PowerMock to Mockito and mentions this test case explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] michaeljmarshall commented on a diff in pull request #13032: KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer
michaeljmarshall commented on code in PR #13032: URL: https://github.com/apache/kafka/pull/13032#discussion_r1054597086 ## clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +public class DataOutputStreamWritableTest { +@Test +public void testWritingSlicedByteBuffer() { +ByteBuffer expectedBuffer = ByteBuffer.wrap(new byte[]{2, 3}); +ByteBuffer originalBuffer = ByteBuffer.allocate(4); +for (int i = 0; i < originalBuffer.limit(); i++) { +originalBuffer.put(i, (byte) i); +} +// Move position forward to ensure slice is not whole buffer +originalBuffer.position(2); +ByteBuffer slicedBuffer = originalBuffer.slice(); + Review Comment: You are right. You provided the solution in the other comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
dajac commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054592382 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset commit request */ def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val header = request.header val offsetCommitRequest = request.body[OffsetCommitRequest] -val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() -val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() -// the callback for sending an offset commit response -def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) -combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { -debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") +def sendResponse(response: OffsetCommitResponse): Unit = { + trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " + +s"client ${request.header.clientId}.") + + if (isDebugEnabled) { +response.data.topics.forEach { topic => + topic.partitions.forEach { partition => +if (partition.errorCode != Errors.NONE.code) { + debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " + +s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}") +} } } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) + } + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { +response.maybeSetThrottleTimeMs(requestThrottleMs) +response + }) } - // reject the request if not authorized to the group +// Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( -offsetCommitRequest.data.topics, -error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( -new OffsetCommitResponseData() -.setTopics(responseTopicList) -.setThrottleTimeMs(requestThrottleMs) - )) + sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { -for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION + sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) +} else { + val offsetCommitResponseData = new OffsetCommitResponseData() + val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + val authorizedTopics = authHelper.filterByAuthorized( +request.context, +READ, +TOPIC, +offsetCommitRequest.data.topics.asScala + )(_.name) + + def makePartitionResponse( +partitionIndex: Int, +error: Errors + ): OffsetCommitResponseData.OffsetCommitResponsePartition = { +new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code) + } + + def addTopicToResponse( +topic: OffsetCommitRequestData.OffsetCommitRequestTopic, +error: Errors + ): Unit = { +val topicResponse = new
[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
dajac commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054582822 ## core/src/main/java/kafka/server/builders/KafkaApisBuilder.java: ## @@ -178,6 +179,7 @@ public KafkaApis build() { metadataSupport, replicaManager, groupCoordinator, + new GroupCoordinatorAdapter(groupCoordinator, time), Review Comment: GroupCoordinatorAdapter is an internal change so we don't have to expose it in the builder. It only wraps the GroupCoordinator passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] michaeljmarshall commented on a diff in pull request #13032: KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer
michaeljmarshall commented on code in PR #13032: URL: https://github.com/apache/kafka/pull/13032#discussion_r1054547725 ## clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java: ## @@ -99,7 +99,7 @@ public void writeUnsignedVarint(int i) { public void writeByteBuffer(ByteBuffer buf) { try { if (buf.hasArray()) { -out.write(buf.array(), buf.position(), buf.limit()); +out.write(buf.array(), buf.arrayOffset(), buf.limit()); Review Comment: Great catch, thank you for your reviews. I'll fix the implementation and make sure the missed cases are covered by tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14470) Move log layer to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650927#comment-17650927 ] Mickael Maison commented on KAFKA-14470: Should we also move classes like LogOffsetMetadata? It's in kafka.server but it's used quite a bit in LocalLog, UnifiedLog, LogSegment. There's also a similar LogOffsetMetadata class in the raft module. I'm not very familiar with the raft copy but both seem to represent pretty much the same thing so we could potentially merge them. > Move log layer to storage module > > > Key: KAFKA-14470 > URL: https://issues.apache.org/jira/browse/KAFKA-14470 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > > We introduced the `storage` module as part of KIP-405, but the existing log > layer remains in the `core` module. Moving the log layer to the `storage` > module would be another step towards improved modularity and build times > (similar to `metadata`, `raft` and `group-coordinator`). > We should do this in an incremental manner to make the code review process > easier. I will create separate tasks, each one mapping to one pull request. > In order to understand the feasibility, I tackled a few of the tasks myself. > Help from the community is appreciated for the unassigned tasks, but it > probably makes sense to do that after the initial PRs have been submitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14542) Deprecate OffsetFetch/Commit version 0 and remove them in 4.0
David Jacot created KAFKA-14542: --- Summary: Deprecate OffsetFetch/Commit version 0 and remove them in 4.0 Key: KAFKA-14542 URL: https://issues.apache.org/jira/browse/KAFKA-14542 Project: Kafka Issue Type: Improvement Reporter: David Jacot Assignee: David Jacot We should deprecate OffsetFetch/Commit APIs and remove them in AK 4.0. Those two APIs are used by old clients to write offsets to and read offsets from ZK. We need a small KIP for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054496553 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest { } } - @Test - def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = { Review Comment: In my opinion, it is pretty bad to just ignore one group passed in the request like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14541) Profile produce workload for Apache Kafka
[ https://issues.apache.org/jira/browse/KAFKA-14541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14541: - Description: I have been profiling Kafka (3.4.0 / trunk right now) for a produce only workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] workloads. The goal is to get a better understanding of CPU usage profile for Kafka and eliminate potential overheads to reduce CPU consumption. h2. *Setup* R6i.16xl (64 cores) OS: Amazon Linux 2 Single broker, One topic, One partition Plaintext Prometheus Java agent attached {code:java} -XX:+PreserveFramePointer -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints{code} {code:java} queued.max.requests=1 num.network.threads=32 num.io.threads=128 socket.request.max.bytes=104857600{code} h3. Producer setup: {code:java} batch.size=9000 buffer.memory=33554432 enable.idempotence=false linger.ms=0 receive.buffer.bytes=-1 send.buffer.bytes=-1 max.in.flight.requests.per.connection=10{code} h3. Profiler setup: [async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" mode (wall clock time) or "cycles" mode (used for better kernel call stack) h2. Observations # Processor.run > Processor#updateRequestMetrics() is a very expensive call. We should revisit whether we want to pre-compute histograms or not. Maybe upgrading to latest dropwizard will improve this? (JIRA: https://issues.apache.org/jira/browse/KAFKA-14423) # (Processor > Selector.poll), (Processor > Selector.write) and many other places - Accumulative cost of Sensor#recordInternal is high. # Processor threads are consuming more CPU than Handler threads?!! Perhaps because handler threads spend a lot of time waiting for partition lock at UnifiedLock.scala # HandleProduceRequest > RequestSizeInBytes - Unnecessary call to calculate size in bytes here. Low hanging opportunity to improve CPU utilisation for a request heavy workload. (Fixed in https://issues.apache.org/jira/browse/KAFKA-14414) # UnifiedLog#append > HeapByteBuffer.duplicate() - Why do we duplicate the buffer here? Ideally we shouldn't be making copies of buffer during the produce workflow. We should be using the same buffer after reading from socket to writing in a file. # Processor > Selector.select - Why is epoll consuming CPU cycles? It should have the thread in a timed_waiting state and hence, shouldn't consume CPU at all. # In a produce workload writing to socket is more CPU intensive than reading from the socket. This is surprising because reading would read more data from the socket (produce records) whereas writing would only write the response back which doesn't contain record data. # RequestChannel#sendResponse > wakeup - This is the call which wakes up the selector by writing to a file descriptor. Why is this so expensive? # The bottleneck in throughout is thread contention at the UnifiedLog.scala where handler thread wait for 80-90% of the time trying to acquire a lock. This observation was recorded from separate profiling. Can we remove this bottleneck by using fine grained lock for highwatermark update and log append as the current coarse grained lock causes contention? I am still analysing the flamegraph (cpu mode attached here). Please feel free to comment on any of the observations or add your own observations here. was: I have been profiling Kafka (3.4.0 / trunk right now) for a produce only workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] workloads. The goal is to get a better understanding of CPU usage profile for Kafka and eliminate potential overheads to reduce CPU consumption. h2. *Setup* R6i.16xl (64 cores) OS: Amazon Linux 2 Single broker, One topic, One partition Plaintext Prometheus Java agent attached {code:java} -XX:+PreserveFramePointer -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints{code} {code:java} queued.max.requests=1 num.network.threads=32 num.io.threads=128 socket.request.max.bytes=104857600{code} h3. Producer setup: {code:java} batch.size=9000 buffer.memory=33554432 enable.idempotence=false linger.ms=0 receive.buffer.bytes=-1 send.buffer.bytes=-1 max.in.flight.requests.per.connection=10{code} h3. Profiler setup: [async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" mode (wall clock time) or "cycles" mode (used for better kernel call stack) h2. Observations # Processor.run > Processor#updateRequestMetrics() is a very expensive call. We should revisit whether we want to pre-compute histograms or not. Maybe upgrading to
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054494916 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1397,77 +1397,172 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, createResponse) } - private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): Unit = { -val header = request.header + private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): CompletableFuture[Unit] = { Review Comment: I am not sure. We had a lot of complex logic before. I have tried to simplify as much as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054494189 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1338,11 +1337,12 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset fetch request */ - def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = { + def handleOffsetFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val version = request.header.apiVersion if (version == 0) { // reading offsets from ZK handleOffsetFetchRequestV0(request) + CompletableFuture.completedFuture[Unit](()) Review Comment: It does not matter where it is. We just need it because this is what the method returns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054493387 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest { } } - @Test - def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = { Review Comment: I think that it is a quirk of the implementation. It is because we used HashMaps before this patch so the last one wins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054492204 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java: ## @@ -19,6 +19,7 @@ import java.util.Map.Entry; Review Comment: As I follow-up, I would like to clean this class. There are way too many ways to construct this object and the logic is pretty complicated. It is the same for the request. Let's do this separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054490900 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3405,6 +3405,341 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) } + @ParameterizedTest Review Comment: I added a few unit tests here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1054490432 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchResponse } requestHelper.sendResponseMaybeThrottle(request, createResponse) +CompletableFuture.completedFuture[Unit](()) } - private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): Unit = { -val header = request.header + private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = { Review Comment: I have simplified this code even further by pushing the handling of single vs multiple groups in the request/response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
mumrah commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054485418 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1071,7 +1136,11 @@ public ControllerResult generateRecordsAndResult() throws Exception { log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + "at metadata.version {} from {}.", bootstrapMetadata.records().size(), bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("initial transaction"), (short) 0)); records.addAll(bootstrapMetadata.records()); +records.add(new ApiMessageAndVersion( +new EndTransactionRecord(), (short) 0)); Review Comment: Why do we need the bootstrap records wrapped in a transaction? Aren't they written as an atomic batch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
mumrah commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054455358 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -289,6 +316,59 @@ class BrokerMetadataListener( BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), numBytes) } + private def applyRecordToDelta( +delta: MetadataDelta, +messageAndVersion: ApiMessageAndVersion, +baseOffset: Long, +index: Int, +snapshotName: Option[String] + ): Unit = { +try { + delta.replay(messageAndVersion.message()) +} catch { + case e: Throwable => snapshotName match { +case None => metadataLoadingFaultHandler.handleFault( + s"Error replaying metadata log record at offset ${baseOffset + index}", e) +case Some(name) => metadataLoadingFaultHandler.handleFault( + s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e) + } +} + } + + private def beginTransaction(newTransactionEpoch: Int, newTransactionOffset: Long): Unit = { +if (_transactionEpoch != -1) { + abortTransaction("a new begin transaction record appeared", newTransactionOffset) +} +_transactionEpoch = newTransactionEpoch +_transactionOffset = newTransactionOffset +_transactionRecords = new util.ArrayList[ApiMessageAndVersion] +log.debug("Beginning metadata transaction {}_{}.", _transactionOffset, _transactionEpoch) + } + + private def endTransaction(delta: MetadataDelta, endOffset: Long): Unit = { +log.debug("Ending metadata transaction {}_{} at offset {}", _transactionOffset, _transactionEpoch, endOffset) +var index = 0 +_transactionRecords.forEach(record => { Review Comment: We should guard against a null `_transactionRecords` here. If two EndTransactionRecord-s appeared (somehow..) it would raise an NPE Same comment for this logic in MetadataLoader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
mumrah commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054483117 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -76,6 +77,21 @@ class BrokerMetadataListener( */ private var _highestTimestamp = -1L + /** + * The log epoch of the current transaction, or -1 if there is none. + */ + private var _transactionEpoch = -1; + + /** + * The log offset of the current transaction, or -1L if there is none. + */ + private var _transactionOffset = -1L; + + /** + * Pending records in the current transaction, or null if there is none such. + */ + private var _transactionRecords: util.ArrayList[ApiMessageAndVersion] = _ + Review Comment: I'm guessing all of this logic will go away once we refactor the broker to use MetadataLoader? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions
mumrah commented on code in PR #13033: URL: https://github.com/apache/kafka/pull/13033#discussion_r1054455358 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -289,6 +316,59 @@ class BrokerMetadataListener( BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), numBytes) } + private def applyRecordToDelta( +delta: MetadataDelta, +messageAndVersion: ApiMessageAndVersion, +baseOffset: Long, +index: Int, +snapshotName: Option[String] + ): Unit = { +try { + delta.replay(messageAndVersion.message()) +} catch { + case e: Throwable => snapshotName match { +case None => metadataLoadingFaultHandler.handleFault( + s"Error replaying metadata log record at offset ${baseOffset + index}", e) +case Some(name) => metadataLoadingFaultHandler.handleFault( + s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e) + } +} + } + + private def beginTransaction(newTransactionEpoch: Int, newTransactionOffset: Long): Unit = { +if (_transactionEpoch != -1) { + abortTransaction("a new begin transaction record appeared", newTransactionOffset) +} +_transactionEpoch = newTransactionEpoch +_transactionOffset = newTransactionOffset +_transactionRecords = new util.ArrayList[ApiMessageAndVersion] +log.debug("Beginning metadata transaction {}_{}.", _transactionOffset, _transactionEpoch) + } + + private def endTransaction(delta: MetadataDelta, endOffset: Long): Unit = { +log.debug("Ending metadata transaction {}_{} at offset {}", _transactionOffset, _transactionEpoch, endOffset) +var index = 0 +_transactionRecords.forEach(record => { Review Comment: We should guard against a null `_transactionRecords` here. If two EndTransactionRecord-s appeared (somehow..) it would raise an NPE ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -995,6 +1024,42 @@ private void appendRaftEvent(String name, Runnable runnable) { } } +private void beginTransaction( +long newTransactionOffset, +int newTransactionEpoch +) { +if (transactionEpoch != -1) { +abortTransaction("a new begin transaction record appeared", newTransactionOffset); +} +transactionOffset = newTransactionOffset; +transactionEpoch = newTransactionEpoch; +snapshotRegistry.getOrCreateSnapshot(transactionOffset); +log.debug("Beginning metadata transaction {}_{}.", transactionOffset, transactionEpoch); +} + +private void endTransaction(long offset) { +if (transactionEpoch == -1) { +throw fatalFaultHandler.handleFault("Tried to end a transaction at offset " + offset + +" but there was no current transaction."); +} +transactionOffset = -1L; +transactionEpoch = -1; +log.debug("Completing metadata transaction {}_{}.", transactionOffset, transactionEpoch); +} + +private void abortTransaction(String reason, long offset) { Review Comment: maybe give "offset" a more explicit name? ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -254,6 +270,9 @@ class BrokerMetadataListener( while (iterator.hasNext) { val batch = iterator.next() + if (_transactionEpoch != -1 && _transactionEpoch != batch.epoch) { +abortTransaction("the log epoch changed to " + batch.epoch, batch.baseOffset) Review Comment: Since we're in scala here, we can use s-strings ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -860,7 +886,7 @@ public void handleCommit(BatchReader reader) { int i = 1; for (ApiMessageAndVersion message : messages) { try { -replay(message.message(), Optional.empty(), offset); +replay(message.message(), Optional.empty(), offset + i - 1, epoch); Review Comment: The `offset` -> `offset + i - 1` change, was that a bug previously? Were we just sending the batch offset to replay before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14541) Profile produce workload for Apache Kafka
[ https://issues.apache.org/jira/browse/KAFKA-14541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14541: - Description: I have been profiling Kafka (3.4.0 / trunk right now) for a produce only workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] workloads. The goal is to get a better understanding of CPU usage profile for Kafka and eliminate potential overheads to reduce CPU consumption. h2. *Setup* R6i.16xl (64 cores) OS: Amazon Linux 2 Single broker, One topic, One partition Plaintext Prometheus Java agent attached {code:java} -XX:+PreserveFramePointer -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints{code} {code:java} queued.max.requests=1 num.network.threads=32 num.io.threads=128 socket.request.max.bytes=104857600{code} h3. Producer setup: {code:java} batch.size=9000 buffer.memory=33554432 enable.idempotence=false linger.ms=0 receive.buffer.bytes=-1 send.buffer.bytes=-1 max.in.flight.requests.per.connection=10{code} h3. Profiler setup: [async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" mode (wall clock time) or "cycles" mode (used for better kernel call stack) h2. Observations # Processor.run > Processor#updateRequestMetrics() is a very expensive call. We should revisit whether we want to pre-compute histograms or not. Maybe upgrading to latest dropwizard will improve this? (JIRA: https://issues.apache.org/jira/browse/KAFKA-14423) # (Processor > Selector.poll), (Processor > Selector.write) and many other places - Accumulative cost of Sensor#recordInternal is high. # Processor threads are consuming more CPU than Handler threads?!! Perhaps because handler threads spend a lot of time waiting for partition lock at UnifiedLock.scala # HandleProduceRequest > RequestSizeInBytes - Unnecessary call to calculate size in bytes here. Low hanging opportunity to improve CPU utilisation for a request heavy workload. (Fixed in https://issues.apache.org/jira/browse/KAFKA-14414) # UnifiedLog#append > HeapByteBuffer.duplicate() - Why do we duplicate the buffer here? Ideally we shouldn't be making copies of buffer during the produce workflow. We should be using the same buffer after reading from socket to writing in a file. # Processor > Selector.select - Why is epoll consuming CPU cycles? It should have the thread in a timed_waiting state and hence, shouldn't consume CPU at all. # In a produce workload writing to socket is more CPU intensive than reading from the socket. This is surprising because reading would read more data from the socket (produce records) whereas writing would only write the response back which doesn't contain record data. # RequestChannel#sendResponse > wakeup - This is the call which wakes up the selector by writing to a file descriptor. Why is this so expensive? # The bottleneck in throughout is thread contention at the UnifiedLog.scala where handler thread wait for 80-90% of the time trying to acquire a lock. This observation was recorded from separate profiling. I am still analysing the flamegraph (cpu mode attached here). Please feel free to comment on any of the observations or add your own observations here. was: I have been profiling Kafka (3.4.0 / trunk right now) for a produce only workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] workloads. The goal is to get a better understanding of CPU usage profile for Kafka and eliminate potential overheads to reduce CPU consumption. h2. *Setup* R6i.16xl (64 cores) OS: Amazon Linux 2 Single broker, One topic, One partition Plaintext Prometheus Java agent attached {code:java} -XX:+PreserveFramePointer -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints{code} {code:java} queued.max.requests=1 num.network.threads=32 num.io.threads=128 socket.request.max.bytes=104857600{code} h3. Producer setup: {code:java} batch.size=9000 buffer.memory=33554432 enable.idempotence=false linger.ms=0 receive.buffer.bytes=-1 send.buffer.bytes=-1 max.in.flight.requests.per.connection=10{code} h3. Profiler setup: [async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" mode (wall clock time) or "cycles" mode (used for better kernel call stack) h2. Observations # Processor.run > Processor#updateRequestMetrics() is a very expensive call. We should revisit whether we want to pre-compute histograms or not. Maybe upgrading to latest dropwizard will improve this? # (Processor > Selector.poll), (Processor > Selector.write) and many other places - Accumulative cost of