[jira] [Commented] (KAFKA-13607) Cannot use PEM certificate coding when parent defined file-based
[ https://issues.apache.org/jira/browse/KAFKA-13607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808526#comment-17808526 ] Sergey Ivanov commented on KAFKA-13607: --- Hello, We also faced an issue with DefaultSslEngineFactory.java In our case we use Kafka Connect with config provider, which has the following properties: {code:java} "producer.override.ssl.truststore.type" : "${saas:topicconfig:ssl.truststore.type}", "producer.override.bootstrap.servers" : "${saas:topicconfig:bootstrap.servers}", "producer.override.ssl.truststore.certificates" : "${saas:topicconfig:ssl.truststore.certificates}", "producer.override.security.protocol" : "${saas:topicconfig:security.protocol}", "producer.override.ssl.keystore.key" : "${saas:topicconfig:ssl.keystore.key}", "producer.override.ssl.keystore.type" : "${saas:topicconfig:ssl.keystore.type}", "producer.override.sasl.jaas.config" : "${saas:topicconfig:sasl.jaas.config}", "producer.override.sasl.mechanism" : "${saas:topicconfig:sasl.mechanism}", "producer.override.ssl.keystore.certificate.chain" : "${saas:topicconfig:ssl.keystore.certificate.chain}"{code} And base on real Kafka connection properties the Conmfig Provider includes coresponding values. For example, for Kafka with TLS but without mTLS (only server cert), it provides empty values for ssl.keystore.key and ssl.keystore.type (as you know Config Provider can't remove property from connector at all). But in the code here: [https://github.com/apache/kafka/blob/3.5/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L274] it doesn't check for empty key, only for null. So we got an error: {code:java} Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: SSL private key can be specified only for PEM, but key store type is . {code} Looks like the provided PR is helping us also: [https://github.com/apache/kafka/pull/11707] > Cannot use PEM certificate coding when parent defined file-based > > > Key: KAFKA-13607 > URL: https://issues.apache.org/jira/browse/KAFKA-13607 > Project: Kafka > Issue Type: Bug > Components: clients, config, connect >Affects Versions: 2.7.1, 3.0.0 >Reporter: Piotr Smolinski >Priority: Major > > The problem applies to the situation when we create a Kafka client based on > prepopulated config. If we have only partial control on the input we can > attempt to reset some values. > KIP-651 added a new cool feature to use PEM coding of certificates as an > alternative to file stores. I have observed a problem in Confluent > Replicator. We have shifted the common configuration to the worker level and > assumed the connectors define only what is specific for them. The security > setup is mTLS, i.e. we need both client cert and trusted chain. Our default > configuration has both in #PKCS12 files, but we had to reverse the > replication direction and redefine the destination coordinates. For these we > have certificates, but having KIP-651 we could specify them as connector > params as opposed to the worker deployment change. > It came out that we cannot override {*}ssl.keystore.location{*}, > {*}ssl.keystore.password{*}, etc. simply with empty values, because the code > in the *DefaultSslEngineFactory* checks if the entry is null. We can only > override it to empty string. > *DefaultSslEngineFactory* should treat the unexpected configuration entries > as absent when they are {*}null{*}, but also when the given entry is an empty > string. > For a workaround I have created a hacky patch that fixes the behaviour: > [https://github.com/piotrsmolinski/kafka-ssl-fix] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]
dajac commented on PR #15183: URL: https://github.com/apache/kafka/pull/15183#issuecomment-1899919345 For the reference, I was thinking about an alternative approach. Instead of keeping track of the offset of the record in the log for every committed offsets, we could delete all the pending transactional offsets when a new offset record is replayed. The downside of this approach is that we would need to restructure how pending transactional offsets are stored in order to have an efficient way to find all the pending offsets for a given group, topic and partition tuple. Otherwise, the replay would become less efficient than it is today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]
dajac commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1458508459 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -712,13 +712,14 @@ public void run() { try { // Apply the records to the state machine. if (result.replayRecords()) { -result.records().forEach(record -> +for (int i = 0; i < result.records().size(); i++) { context.coordinator.replay( +prevLastWrittenOffset + i, Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]
dajac commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1458504559 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -918,7 +920,7 @@ public void replay( groupId, topic, partition, -OffsetAndMetadata.fromRecord(value) +OffsetAndMetadata.fromRecord(offset, value) Review Comment: I agree that it is a bit confusing. Let me rename `offset` to `recordOffset` to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on PR #14612: URL: https://github.com/apache/kafka/pull/14612#issuecomment-1899907260 Verified the following tests locally testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress also fails in other PR https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2588/tests/ testDescribeQuorumReplicationSuccessful This PR mostly new code and uses its code path, so theoretically will not affect other UT. Running the integration again by merging the latest master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]
dajac commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1458501802 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java: ## @@ -92,30 +117,34 @@ public boolean equals(Object o) { OffsetAndMetadata that = (OffsetAndMetadata) o; -if (offset != that.offset) return false; +if (committedOffset != that.committedOffset) return false; if (commitTimestampMs != that.commitTimestampMs) return false; -if (!leaderEpoch.equals(that.leaderEpoch)) return false; -if (!metadata.equals(that.metadata)) return false; -return expireTimestampMs.equals(that.expireTimestampMs); +if (recordOffset != that.recordOffset) return false; +if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false; Review Comment: I don't think so. There is a feature to auto-generate equals and hashCode. It has been around for a while now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]
showuon commented on code in PR #15193: URL: https://github.com/apache/kafka/pull/15193#discussion_r1458455522 ## docs/ops.html: ## @@ -3992,6 +3979,75 @@ Finalizing the migration # Other configs ... + Reverting to ZooKeeper mode During the Migration + +While the cluster is still in migration mode, it is possible to revert to ZK mode. The process +to follow depends on how far the migration has progressed. In order to find out how to revert, +select the final migration step that you have completed in this table. + + +Note that the directions given here assume that each step was fully completed, and they were +done in order. So, for example, we assume that if "Enabling the migration on the brokers" was completed, +"Provisioning the KRaft controller quorum" was also fully completed previously. + + +If you did not fully complete any step, back out whatever you have done and then follow revert +directions for the last fully completed step. + + + + + +Final Migration Section Completed +Directions for Reverting +Notes + + +Preparing for migration +The prepartion section does not involve leaving ZK mode. So there is nothing to do in +the case of a revert. + + + +Provisioning the KRaft controller quorum +Deprovision the KRaft controller quorum, and then you are done. + + + +Enabling zookeeper.metadata.migration.enable on the brokers +Roll the broker cluster with zookeeper.metadata.migration.enable=false. Then, +deprovision the KRaft controller quorum. Then you are done. Review Comment: In the `Enabling zookeeper.metadata.migration.enable on the brokers` step, the controller znode will be replaced with KRaft controller, we should remove the controller znode to allow zk broker to take over. ## docs/ops.html: ## @@ -3992,6 +3979,75 @@ Finalizing the migration # Other configs ... + Reverting to ZooKeeper mode During the Migration + +While the cluster is still in migration mode, it is possible to revert to ZK mode. The process +to follow depends on how far the migration has progressed. In order to find out how to revert, +select the final migration step that you have completed in this table. + + +Note that the directions given here assume that each step was fully completed, and they were +done in order. So, for example, we assume that if "Enabling the migration on the brokers" was completed, +"Provisioning the KRaft controller quorum" was also fully completed previously. + + +If you did not fully complete any step, back out whatever you have done and then follow revert +directions for the last fully completed step. + + + + + +Final Migration Section Completed +Directions for Reverting +Notes + + +Preparing for migration +The prepartion section does not involve leaving ZK mode. So there is nothing to do in +the case of a revert. + + + +Provisioning the KRaft controller quorum +Deprovision the KRaft controller quorum, and then you are done. + + + +Enabling zookeeper.metadata.migration.enable on the brokers +Roll the broker cluster with zookeeper.metadata.migration.enable=false. Then, +deprovision the KRaft controller quorum. Then you are done. + + + +Migrating brokers to KRaft + +Roll the broker cluster with the process.roles configuration omitted, node.id +replaced with broker.id, and the zookeeper.connect configuration set to a valid +value. + + +After this roll is fully complete, perform a second roll where you set +zookeeper.metadata.migration.enable=false. Then, +deprovision the KRaft controller quorum. Then you are done. Review Comment: removing controller znode? ## docs/ops.html: ## @@ -3992,6 +3979,75 @@ Finalizing the migration # Other configs ... + Reverting to ZooKeeper mode During the Migration + +While the cluster is still in migration mode, it is possible to revert to ZK mode. The process +to follow depends on how far the migration has progressed. In order to find out how to revert, +select the final migration step that you have completed in this table. + + +Note that the directions given here assume that each step was fully completed, and they were +done in order. So, for example, we assume that if "Enabling the migration on the brokers" was completed, +"Provisioning the KRaft controller quorum" was also fully completed previously. + + +If you did not fully complete any step, back out whatever you have done and then
Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]
philipnee commented on PR #15210: URL: https://github.com/apache/kafka/pull/15210#issuecomment-1899847833 Hey @lucasbru - Thanks for your time reviewing this PR. I've made changes according to your suggestion. Let me know what do you think of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]
appchemist commented on code in PR #15225: URL: https://github.com/apache/kafka/pull/15225#discussion_r1458326771 ## core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala: ## @@ -277,6 +283,22 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { producer.close() } + private def createTopicWithAssignment(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = { Review Comment: @ex172000 In `TestUtils`, How about changing the method names from `createTopicWithAssignment` to `createTopic`, and from `createTopic` to `createTopicWithZkClient`, similar to `createTopicWithAdmin`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]
appchemist commented on code in PR #15225: URL: https://github.com/apache/kafka/pull/15225#discussion_r1458315848 ## core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala: ## @@ -277,6 +283,22 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { producer.close() } + private def createTopicWithAssignment(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = { Review Comment: @ex172000 Of course, I did. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16144: skip checkQuorum for only 1 voter case [kafka]
showuon commented on PR #15235: URL: https://github.com/apache/kafka/pull/15235#issuecomment-1899645426 @mimaison , please help review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16144: skip checkQuorum for only 1 voter case [kafka]
showuon commented on code in PR #15235: URL: https://github.com/apache/kafka/pull/15235#discussion_r1458256739 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -101,6 +101,10 @@ protected LeaderState( * @return the remainingMs before the checkQuorumTimer expired */ public long timeUntilCheckQuorumExpires(long currentTimeMs) { +// if there's only 1 voter, it should never get expired. +if (voterStates.size() == 1) { +return Long.MAX_VALUE; +} checkQuorumTimer.update(currentTimeMs); Review Comment: I was thinking we don't need `checkQuorumTimer` for 1 voter's case, but since in next release, we're going to allow dynamically scaling up/down voters, we should keep the flexibility here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16144: skip checkQuorum for only 1 voter case [kafka]
showuon opened a new pull request, #15235: URL: https://github.com/apache/kafka/pull/15235 When there's only 1 voter, there will be no fetch request from other voters. In this case, we should still not expire the checkQuorum timer because there's just 1 voter. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15610: Fix `CoreUtils.swallow()` test gaps [kafka]
github-actions[bot] commented on PR #14583: URL: https://github.com/apache/kafka/pull/14583#issuecomment-1899638097 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16144) Controller leader checkQuorum timer should skip only 1 controller case
[ https://issues.apache.org/jira/browse/KAFKA-16144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-16144: - Assignee: Luke Chen > Controller leader checkQuorum timer should skip only 1 controller case > -- > > Key: KAFKA-16144 > URL: https://issues.apache.org/jira/browse/KAFKA-16144 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Labels: newbie, newbie++ > > In KAFKA-15489, we fixed the potential "split brain" issue by adding the > check quorum timer. This timer will be updated when the follower fetch > request arrived. And it expires the timer when the there are no majority of > voter followers fetch from leader, and resign the leadership. > But in KAFKA-15489, we forgot to consider the case where there's only 1 > controller node. If there's only 1 controller node (and no broker node), > there will be no fetch request arrived, so the timer will expire each time. > However, if there's only 1 node, we don't have to care about the "check > quorum" at all. We should skip the check for only 1 controller node case. > Currently, this issue will happen only when there's 1 controller node and no > any broker node (i.e. no fetch request sent to the controller). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode
[ https://issues.apache.org/jira/browse/KAFKA-16163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16163. --- Resolution: Duplicate > Constant resignation/reelection of controller when starting a single node in > combined mode > -- > > Key: KAFKA-16163 > URL: https://issues.apache.org/jira/browse/KAFKA-16163 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Priority: Major > > When starting a single node in combined mode: > {noformat} > $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" > $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c > config/kraft/server.properties > $ bin/kafka-server-start.sh config/kraft/server.properties{noformat} > > it's constantly spamming the logs with: > {noformat} > [2024-01-18 17:37:09,065] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch > request from the majority of the voters within 3000ms. Current fetched voters > are []. (org.apache.kafka.raft.LeaderState) > [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to > ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, > unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, > epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, > metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], > voterStates={1=ReplicaState(nodeId=1, > endOffset=Optional[LogOffsetMetadata(offset=835, > metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], > lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, > hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState) > [2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,072] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,123] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,124] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,175] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,176] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,227] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,229] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,279] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread){noformat} > This did not happen in 3.6. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16111: -- Description: There is justified concern that the new threading model may not play well with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide some assurance that it will support complicated patterns. # Design and implement test scenarios # Update and document any design changes with the callback sub-system where needed # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by said design > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > There is justified concern that the new threading model may not play well > with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide > some assurance that it will support complicated patterns. > # Design and implement test scenarios > # Update and document any design changes with the callback sub-system where > needed > # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by > said design -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]
mjsax commented on PR #15217: URL: https://github.com/apache/kafka/pull/15217#issuecomment-1899524211 Jenkins looks good, but previous system tests run did not start to run the test (I assume also because of the compilation error). Retriggered: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6039/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]
mjsax commented on PR #15228: URL: https://github.com/apache/kafka/pull/15228#issuecomment-1899520079 Thanks @apoorvmittal10! Merged to `trunk` and cherry-picked to `3.7` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]
mjsax merged PR #15228: URL: https://github.com/apache/kafka/pull/15228 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16111: - Assignee: Lucas Brutschy (was: Kirk True) > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144631 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2560,41 +2562,6 @@ class ReplicaManagerTest { } } - @ParameterizedTest - @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE")) - def testMaybeVerificationErrorConversions(error: Errors): Unit = { Review Comment: We also have one for the GroupCoordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144452 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2560,41 +2562,6 @@ class ReplicaManagerTest { } } - @ParameterizedTest - @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE")) - def testMaybeVerificationErrorConversions(error: Errors): Unit = { Review Comment: We have the test above in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143981 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig, requestLocal: RequestLocal, verificationErrors: Map[TopicPartition, Errors] ): Unit = { - // Map transaction coordinator errors to known errors for the response - val convertedErrors = verificationErrors.map { case (tp, error) => -error match { - case Errors.CONCURRENT_TRANSACTIONS | -Errors.COORDINATOR_LOAD_IN_PROGRESS | -Errors.COORDINATOR_NOT_AVAILABLE | -Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS - case _ => tp -> error -} - - } Review Comment: But yes, we simply pass through concurrent txns which will be fatal to the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143630 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig, requestLocal: RequestLocal, verificationErrors: Map[TopicPartition, Errors] ): Unit = { - // Map transaction coordinator errors to known errors for the response - val convertedErrors = verificationErrors.map { case (tp, error) => -error match { - case Errors.CONCURRENT_TRANSACTIONS | -Errors.COORDINATOR_LOAD_IN_PROGRESS | -Errors.COORDINATOR_NOT_AVAILABLE | -Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS - case _ => tp -> error -} - - } Review Comment: We have separate handling for produce requests and txn offset commit requests. for produce: ``` case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) case Errors.CONCURRENT_TRANSACTIONS | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.COORDINATOR_NOT_AVAILABLE | Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) case _ => None ``` for txn offset commit: ``` error match { case Errors.UNKNOWN_TOPIC_OR_PARTITION | Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => Errors.COORDINATOR_NOT_AVAILABLE case Errors.NOT_LEADER_OR_FOLLOWER | Errors.KAFKA_STORAGE_ERROR => Errors.NOT_COORDINATOR case Errors.MESSAGE_TOO_LARGE | Errors.RECORD_LIST_TOO_LARGE | Errors.INVALID_FETCH_SIZE => Errors.INVALID_COMMIT_OFFSET_SIZE // We may see INVALID_TXN_STATE or INVALID_PID_MAPPING here due to transaction verification. // They can be returned without mapping to a new error. case other => other } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458141964 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -64,28 +64,89 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) +.setState(Optional.of(ConsumerGroupState.EMPTY)) +.setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), + new ConsumerGroupListing(group, false) +.setState(Optional.of(ConsumerGroupState.STABLE)) +.setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()) +) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, Set.empty).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") -val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) +val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE), Set.empty).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): Unit = { +val simpleGroup = "simple-group" +addSimpleGroupExecutor(group = simpleGroup) +addConsumerGroupExecutor(numConsumers = 1) + +val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") +val service = getConsumerGroupService(cgcArgs) + +val expectedListingStable = Set.empty[ConsumerGroupListing] + +val expectedListing = Set( + new ConsumerGroupListing(simpleGroup, true) +.setState(Optional.of(ConsumerGroupState.EMPTY)) +.setType(if(quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), Review Comment: According to the original way I wrote the code, we were gonna return empty for any group type while using the old GC, this will change with my new commit, was waiting to rebase so I can test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458133607 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -64,28 +64,89 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) Review Comment: yeah the changes were made due to the constructor changes to ConsumerGroupListing, I guess we'll change it everywhere if we decide to change it there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]
kirktrue commented on code in PR #15186: URL: https://github.com/apache/kafka/pull/15186#discussion_r1458125932 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -376,25 +376,21 @@ protected Map prepareCloseFetchSessi final Cluster cluster = metadata.fetch(); Map fetchable = new HashMap<>(); -try { -sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { -// set the session handler to notify close. This will set the next metadata request to send close message. -sessionHandler.notifyClose(); +sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { Review Comment: I think that's worth exploring, because I don't see that we ever remove any of the `FetchSessionHandler` entries once they're created -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458121585 ## core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala: ## @@ -116,6 +123,18 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { } object ConsumerGroupCommandTest { + // We want to test the following combinations: + // * ZooKeeper and the classic group protocol. + // * KRaft and the classic group protocol. + // * KRaft with the new group coordinator enabled and the classic group protocol. + // * KRaft with the new group coordinator enabled and the consumer group protocol. + def getTestQuorumAndGroupProtocolParametersAll: java.util.stream.Stream[Arguments] = { Review Comment: I wasn't able to re use it so I added the code here but I can double check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
Kirk True created KAFKA-16167: - Summary: Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup Key: KAFKA-16167 URL: https://issues.apache.org/jira/browse/KAFKA-16167 Project: Kafka Issue Type: Bug Components: clients, consumer, unit tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458105856 ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -102,6 +103,15 @@ object ConsumerGroupCommand extends Logging { parsedStates } + def consumerGroupTypesFromString(input: String): Set[ConsumerGroupType] = { +val parsedStates = input.split(',').map(s => ConsumerGroupType.parse(s.trim)).toSet Review Comment: omgg thanks for catching that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]
lzyLuke commented on code in PR #15193: URL: https://github.com/apache/kafka/pull/15193#discussion_r1458081035 ## docs/ops.html: ## @@ -3992,6 +3979,75 @@ Finalizing the migration # Other configs ... + Reverting to ZooKeeper mode During the Migration + +While the cluster is still in migration mode, it is possible to revert to ZK mode. The process +to follow depends on how far the migration has progressed. In order to find out how to revert, +select the final migration step that you have completed in this table. + + +Note that the directions given here assume that each step was fully completed, and they were +done in order. So, for example, we assume that if "Enabling the migration on the brokers" was completed, +"Provisioning the KRaft controller quorum" was also fully completed previously. + + +If you did not fully complete any step, back out whatever you have done and then follow revert +directions for the last fully completed step. + + + + + +Final Migration Section Completed +Directions for Reverting +Notes + + +Preparing for migration +The prepartion section does not involve leaving ZK mode. So there is nothing to do in +the case of a revert. + + + +Provisioning the KRaft controller quorum +Deprovision the KRaft controller quorum, and then you are done. + + + +Enabling zookeeper.metadata.migration.enable on the brokers +Roll the broker cluster with zookeeper.metadata.migration.enable=false. Then, +deprovision the KRaft controller quorum. Then you are done. + + + +Migrating brokers to KRaft + +Roll the broker cluster with the process.roles configuration omitted, node.id +replaced with broker.id, and the zookeeper.connect configuration set to a valid +value. Review Comment: qq: what is the valid value for zookeeper.connect ? ## docs/ops.html: ## @@ -3992,6 +3979,75 @@ Finalizing the migration # Other configs ... + Reverting to ZooKeeper mode During the Migration + +While the cluster is still in migration mode, it is possible to revert to ZK mode. The process +to follow depends on how far the migration has progressed. In order to find out how to revert, Review Comment: Should we explain on which phase we could revert to ZK mode? Or at least say after the first roll? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]
dongnuo123 commented on code in PR #15221: URL: https://github.com/apache/kafka/pull/15221#discussion_r1458085202 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -856,7 +899,8 @@ public boolean cleanupExpiredOffsets(String groupId, List records) { }); metrics.record(OFFSET_EXPIRED_SENSOR_NAME, expiredPartitions.size()); -return allOffsetsExpired.get(); +// We don't want to remove the group if there are ongoing transactions. +return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId); Review Comment: Just want to understand here, if there are ongoing transactions, `hasPendingTransactionalOffsets(groupId, topic, partition)` will be true and `allOffsetsExpired` will be set to false in L888. Why `!openTransactionsByGroup.containsKey(groupId)` is needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458083463 ## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ## @@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends AbstractOptions states = Collections.emptySet(); +private Set groupTypes = Collections.emptySet(); + /** - * If states is set, only groups in these states will be returned by listConsumerGroups() + * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListConsumerGroupsOptions inStates(Set states) { -this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); +this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : states; +return this; +} + +/** + * If groupTypes is set, only groups of these groupTypes will be returned by listConsumerGroups(). + * Otherwise, all groups are returned. + * + */ +public ListConsumerGroupsOptions inTypes(Set groupTypes) { Review Comment: Ohh thanks yess I'll change it, sry was just following the method names we used with States for consistency but inTypes doesn't make sense like inStates does -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458079066 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2229,7 +2229,7 @@ void handleResponse(AbstractResponse abstractResponse) { String topicName = cluster.topicName(topicId); if (topicName == null) { -future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found.")); +future.completeExceptionally(new InvalidTopicException("TopicId " + topicId + " not found.")); Review Comment: Thanks for the catch, missed this when I was splitting the PR and merging! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458077276 ## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ## @@ -21,95 +21,111 @@ import java.util.Optional; Review Comment: Ohh hmm that makes sense thanks for bringing this up! I do feel like the way it exists right now is kinda inefficient that's why I changed it. Out of curiosity if I wanted to make these changes to the file what would we have to do? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458069279 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -104,9 +165,27 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListGroupCommand(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): Unit = { +var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer") +assertEquals(Set(ConsumerGroupType.CONSUMER), result) + +result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, classic") +assertEquals(Set(ConsumerGroupType.CONSUMER, ConsumerGroupType.CLASSIC), result) + +assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")) + +assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString("Consumer")) Review Comment: yes, It will work once we get the case insensitive code in -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458068041 ## clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupType { +UNKNOWN("unknown"), +CONSUMER("consumer"), +CLASSIC("classic"); + +private final static Map NAME_TO_ENUM = Arrays.stream(values()) +.collect(Collectors.toMap(type -> type.name, Function.identity())); + +private final String name; + +ConsumerGroupType(String name) { +this.name = name; +} + +/** + * Parse a string into a consumer group type. + */ +public static ConsumerGroupType parse(String name) { Review Comment: Yes I have all the changes locally, waiting to rebase the API changes so I can run some tests all together -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1458067667 ## clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupType { Review Comment: Okie I can change it, I was following precedent with ConsumerGroupState, trying to keep everything consistent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458066834 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel, val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID // call the replica manager to append messages to the replicas - replicaManager.appendRecords( + replicaManager.handleProduceAppend( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, origin = AppendOrigin.CLIENT, Review Comment: that's fair. I can do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]
jolshan commented on code in PR #15212: URL: https://github.com/apache/kafka/pull/15212#discussion_r1457917435 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -875,30 +875,43 @@ private void maybeRemovePartitionEpoch( ConsumerGroupMember oldMember ) { if (oldMember != null) { -removePartitionEpochs(oldMember.assignedPartitions()); -removePartitionEpochs(oldMember.partitionsPendingRevocation()); +removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch()); +removePartitionEpochs(oldMember.partitionsPendingRevocation(), oldMember.memberEpoch()); } } /** * Removes the partition epochs based on the provided assignment. * * @param assignmentThe assignment. + * @param expectedEpoch The expected epoch. + * @throws IllegalStateException if the epoch does not match the expected one. + * package-private for testing. */ -private void removePartitionEpochs( -Map> assignment +void removePartitionEpochs( +Map> assignment, +int expectedEpoch ) { assignment.forEach((topicId, assignedPartitions) -> { currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { if (partitionsOrNull != null) { -assignedPartitions.forEach(partitionsOrNull::remove); +assignedPartitions.forEach(partitionId -> { +Integer prevValue = partitionsOrNull.remove(partitionId); +if (prevValue != expectedEpoch) { +throw new IllegalStateException( +String.format("Cannot remove the epoch %d from %s-%s because the partition is " + +"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue)); +} +}); if (partitionsOrNull.isEmpty()) { return null; } else { return partitionsOrNull; } } else { -return null; +throw new IllegalStateException( Review Comment: What is the effect of throwing this error? Do we also block removing the rest of the partitions? Just trying to figure out the state we are left in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]
jolshan commented on code in PR #15212: URL: https://github.com/apache/kafka/pull/15212#discussion_r1457917435 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -875,30 +875,43 @@ private void maybeRemovePartitionEpoch( ConsumerGroupMember oldMember ) { if (oldMember != null) { -removePartitionEpochs(oldMember.assignedPartitions()); -removePartitionEpochs(oldMember.partitionsPendingRevocation()); +removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch()); +removePartitionEpochs(oldMember.partitionsPendingRevocation(), oldMember.memberEpoch()); } } /** * Removes the partition epochs based on the provided assignment. * * @param assignmentThe assignment. + * @param expectedEpoch The expected epoch. + * @throws IllegalStateException if the epoch does not match the expected one. + * package-private for testing. */ -private void removePartitionEpochs( -Map> assignment +void removePartitionEpochs( +Map> assignment, +int expectedEpoch ) { assignment.forEach((topicId, assignedPartitions) -> { currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { if (partitionsOrNull != null) { -assignedPartitions.forEach(partitionsOrNull::remove); +assignedPartitions.forEach(partitionId -> { +Integer prevValue = partitionsOrNull.remove(partitionId); +if (prevValue != expectedEpoch) { +throw new IllegalStateException( +String.format("Cannot remove the epoch %d from %s-%s because the partition is " + +"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue)); +} +}); if (partitionsOrNull.isEmpty()) { return null; } else { return partitionsOrNull; } } else { -return null; +throw new IllegalStateException( Review Comment: What is the effect of throwing this error? Do we also block removing the rest of the partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1458064289 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java: ## @@ -92,30 +117,34 @@ public boolean equals(Object o) { OffsetAndMetadata that = (OffsetAndMetadata) o; -if (offset != that.offset) return false; +if (committedOffset != that.committedOffset) return false; if (commitTimestampMs != that.commitTimestampMs) return false; -if (!leaderEpoch.equals(that.leaderEpoch)) return false; -if (!metadata.equals(that.metadata)) return false; -return expireTimestampMs.equals(that.expireTimestampMs); +if (recordOffset != that.recordOffset) return false; +if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false; Review Comment: I see. Is this using AI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1458061363 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -9616,47 +9618,76 @@ public void testListGroups() { .build())); context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11)); +// Test list group response without a group state or group type filter. Map actualAllGroupMap = -context.sendListGroups(Collections.emptyList()) - .stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); +context.sendListGroups(Collections.emptyList(), Collections.emptyList()).stream() + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + Map expectAllGroupMap = Stream.of( new ListGroupsResponseData.ListedGroup() .setGroupId(classicGroup.groupId()) -.setProtocolType(classicGroupType) -.setGroupState(EMPTY.toString()), +.setProtocolType("classic") +.setGroupState(EMPTY.toString()) +.setGroupType(Group.GroupType.CLASSIC.toString()), new ListGroupsResponseData.ListedGroup() .setGroupId(consumerGroupId) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()) +.setGroupType(Group.GroupType.CONSUMER.toString()) ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); assertEquals(expectAllGroupMap, actualAllGroupMap); context.commit(); -actualAllGroupMap = context.sendListGroups(Collections.emptyList()).stream() + +// Test list group response to check assigning state in the consumer group. +actualAllGroupMap = context.sendListGroups(Collections.singletonList("assigning"), Collections.emptyList()).stream() .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); expectAllGroupMap = Stream.of( -new ListGroupsResponseData.ListedGroup() -.setGroupId(classicGroup.groupId()) -.setProtocolType(classicGroupType) -.setGroupState(EMPTY.toString()), new ListGroupsResponseData.ListedGroup() .setGroupId(consumerGroupId) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString()) +.setGroupType(Group.GroupType.CONSUMER.toString()) ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); assertEquals(expectAllGroupMap, actualAllGroupMap); -actualAllGroupMap = context.sendListGroups(Collections.singletonList("Empty")).stream() +// Test list group response with group state filter and no group type filter. +actualAllGroupMap = context.sendListGroups(Collections.singletonList("Empty"), Collections.emptyList()).stream() + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); +expectAllGroupMap = Stream.of( +new ListGroupsResponseData.ListedGroup() +.setGroupId(classicGroup.groupId()) +.setProtocolType("classic") +.setGroupState(EMPTY.toString()) +.setGroupType(Group.GroupType.CLASSIC.toString()) + ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + +assertEquals(expectAllGroupMap, actualAllGroupMap); + +// Test list group response with no group state filter and with group type filter. +actualAllGroupMap = context.sendListGroups(Collections.emptyList(), Collections.singletonList(Group.GroupType.CLASSIC.toString())).stream() Review Comment: Okie I added the case insensitive case to be of Consumer type -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1458058459 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -452,19 +453,36 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +Set statesFilter, +Set typesFilter, +long committedOffset +) { +Predicate combinedFilter = group -> { +boolean stateCheck = statesFilter.isEmpty() || statesFilter.contains(group.stateAsString(committedOffset)); + +// The type check is case-insensitive. +boolean typeCheck = typesFilter.isEmpty() || +typesFilter.stream() +.map(String::toLowerCase) Review Comment: oya that makes sense, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]
apoorvmittal10 commented on PR #15190: URL: https://github.com/apache/kafka/pull/15190#issuecomment-1899343398 > @apoorvmittal10 : Thanks for the PR. A couple of minor comments. Thanks for reviewing @junrao. I have addressed and replied to the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15190: URL: https://github.com/apache/kafka/pull/15190#discussion_r1458055273 ## server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java: ## @@ -59,6 +60,7 @@ public static RequestContext requestContext() throws UnknownHostException { new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, "producer-1", 0), "1", InetAddress.getLocalHost(), +Optional.of(56078), Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
apoorvmittal10 opened a new pull request, #15234: URL: https://github.com/apache/kafka/pull/15234 KIP-714 requires client instance cache in broker which should also have a time-based eviction policy where client instances which are not actively sending metrics should be evicted. KIP mentions `This client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds` The PR adds support to evict such instances from the cache. There could be multiple approaches for the eviction i.e. 1. Periodically iterating on all entries in the cache as every instance can have different TTL (based on configured push interval) 2. Using heap to store instance eviction - using delay queues 3. Using hashed wheel timer which updates and evicts entries in O(1) time (minor overhead of bucketed delay queues) I have also moved a wrapper class for SystemTimer from `grou-coordinator` to `utils`. cc: @AndrewJSchofield @junrao ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1458050364 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition + * index that is not included in the result. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerNameThe listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * @param upperIndex The upper limit of the index of the partitions to be included in the result. + *Note that, the upper index can be larger than the largest partition index in + *this topic. + * @returnA collection of topic partition metadata and next partition index (-1 means + *no next partition). + */ + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName, +startIndex: Int, +maxCount: Int + ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = { +Option(image.topics().getTopic(topicName)) match { + case None => (None, -1) + case Some(topic) => { +val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]() +val partitions = topic.partitions().keySet() +val upperIndex = topic.partitions().size().min(startIndex + maxCount) +val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1 +for (partitionId <- startIndex until upperIndex) { + topic.partitions().get(partitionId) match { +case partition : PartitionRegistration => { + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { +case None => + result.append(new DescribeTopicPartitionsResponsePartition() +.setPartitionIndex(partitionId) +.setLeaderId(MetadataResponse.NO_LEADER_ID) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +.setEligibleLeaderReplicas(Replicas.toList(partition.elr)) +.setLastKnownElr(Replicas.toList(partition.lastKnownElr))) +case Some(leader) => + result.append(new DescribeTopicPartitionsResponsePartition() +.setPartitionIndex(partitionId) +.setLeaderId(leader.id()) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +.setEligibleLeaderReplicas(Replicas.toList(partition.elr)) +.setLastKnownElr(Replicas.toList(partition.lastKnownElr))) + } +} +case _ => Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808378#comment-17808378 ] Proven Provenzano commented on KAFKA-16162: --- I have a PR [GitHub Pull Request #15232|https://github.com/apache/kafka/pull/15232] that should fix the issue. I have not created tests for it yet though and it really should also add a condition where if a broker sends a heartbeat indicating it has no online directories, it should be fenced. > New created topics are unavailable after upgrading to 3.7 > - > > Key: KAFKA-16162 > URL: https://issues.apache.org/jira/browse/KAFKA-16162 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Blocker > > In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration > request will include the `LogDirs` fields with UUID for each log dir in each > broker. This info will be stored in the controller and used to identify if > the log dir is known and online while handling AssignReplicasToDirsRequest > [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093]. > > While upgrading from old version, the kafka cluster will run in 3.7 binary > with old metadata version, and then upgrade to newer version using > kafka-features.sh. That means, while brokers startup and send the > brokerRegistration request, it'll be using older metadata version without > `LogDirs` fields included. And it makes the controller has no log dir info > for all brokers. Later, after upgraded, if new topic is created, the flow > will go like this: > 1. Controller assign replicas and adds in metadata log > 2. brokers fetch the metadata and apply it > 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment > 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica > assignment, controller will think the log dir in current replica is offline, > so triggering offline handler, and reassign leader to another replica, and > offline, until no more replicas to assign, so assigning leader to -1 (i.e. no > leader) > So, the results will be that new created topics are unavailable (with no > leader) because the controller thinks all log dir are offline. > {code:java} > lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic > quickstart-events3 --bootstrap-server localhost:9092 > > Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: > 3 ReplicationFactor: 3Configs: segment.bytes=1073741824 > Topic: quickstart-events3 Partition: 0Leader: none > Replicas: 7,2,6 Isr: 6 > Topic: quickstart-events3 Partition: 1Leader: none > Replicas: 2,6,7 Isr: 6 > Topic: quickstart-events3 Partition: 2Leader: none > Replicas: 6,7,2 Isr: 6 > {code} > The log snippet in the controller : > {code:java} > # handling 1st assignReplicaToDirs request > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] > offline-dir-assignment: changing partition(s): quickstart-events3-0, > quickstart-events3-2, quickstart-events3-1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [7K5JBERyyqFFxIXSXYluJA, AA, AA], > partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition > change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, > isr=null, leader=-2, replicas=null, removingReplicas=null, > addingReplicas=null, leaderRecoveryState=-1, > directories=[7K5JBERyyqFFxIXSXYluJA, AA, > AA], eligibleLeaderReplicas=null, lastKnownELR=null) for > topic quickstart-events3 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG
[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano updated KAFKA-16162: -- Affects Version/s: 3.7.0 > New created topics are unavailable after upgrading to 3.7 > - > > Key: KAFKA-16162 > URL: https://issues.apache.org/jira/browse/KAFKA-16162 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Blocker > > In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration > request will include the `LogDirs` fields with UUID for each log dir in each > broker. This info will be stored in the controller and used to identify if > the log dir is known and online while handling AssignReplicasToDirsRequest > [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093]. > > While upgrading from old version, the kafka cluster will run in 3.7 binary > with old metadata version, and then upgrade to newer version using > kafka-features.sh. That means, while brokers startup and send the > brokerRegistration request, it'll be using older metadata version without > `LogDirs` fields included. And it makes the controller has no log dir info > for all brokers. Later, after upgraded, if new topic is created, the flow > will go like this: > 1. Controller assign replicas and adds in metadata log > 2. brokers fetch the metadata and apply it > 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment > 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica > assignment, controller will think the log dir in current replica is offline, > so triggering offline handler, and reassign leader to another replica, and > offline, until no more replicas to assign, so assigning leader to -1 (i.e. no > leader) > So, the results will be that new created topics are unavailable (with no > leader) because the controller thinks all log dir are offline. > {code:java} > lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic > quickstart-events3 --bootstrap-server localhost:9092 > > Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: > 3 ReplicationFactor: 3Configs: segment.bytes=1073741824 > Topic: quickstart-events3 Partition: 0Leader: none > Replicas: 7,2,6 Isr: 6 > Topic: quickstart-events3 Partition: 1Leader: none > Replicas: 2,6,7 Isr: 6 > Topic: quickstart-events3 Partition: 2Leader: none > Replicas: 6,7,2 Isr: 6 > {code} > The log snippet in the controller : > {code:java} > # handling 1st assignReplicaToDirs request > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] > offline-dir-assignment: changing partition(s): quickstart-events3-0, > quickstart-events3-2, quickstart-events3-1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [7K5JBERyyqFFxIXSXYluJA, AA, AA], > partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition > change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, > isr=null, leader=-2, replicas=null, removingReplicas=null, > addingReplicas=null, leaderRecoveryState=-1, > directories=[7K5JBERyyqFFxIXSXYluJA, AA, > AA], eligibleLeaderReplicas=null, lastKnownELR=null) for > topic quickstart-events3 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [AA, 7K5JBERyyqFFxIXSXYluJA, AA], > partitionEpoch: 0 -> 1
[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano updated KAFKA-16162: -- Priority: Blocker (was: Major) > New created topics are unavailable after upgrading to 3.7 > - > > Key: KAFKA-16162 > URL: https://issues.apache.org/jira/browse/KAFKA-16162 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Blocker > > In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration > request will include the `LogDirs` fields with UUID for each log dir in each > broker. This info will be stored in the controller and used to identify if > the log dir is known and online while handling AssignReplicasToDirsRequest > [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093]. > > While upgrading from old version, the kafka cluster will run in 3.7 binary > with old metadata version, and then upgrade to newer version using > kafka-features.sh. That means, while brokers startup and send the > brokerRegistration request, it'll be using older metadata version without > `LogDirs` fields included. And it makes the controller has no log dir info > for all brokers. Later, after upgraded, if new topic is created, the flow > will go like this: > 1. Controller assign replicas and adds in metadata log > 2. brokers fetch the metadata and apply it > 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment > 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica > assignment, controller will think the log dir in current replica is offline, > so triggering offline handler, and reassign leader to another replica, and > offline, until no more replicas to assign, so assigning leader to -1 (i.e. no > leader) > So, the results will be that new created topics are unavailable (with no > leader) because the controller thinks all log dir are offline. > {code:java} > lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic > quickstart-events3 --bootstrap-server localhost:9092 > > Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: > 3 ReplicationFactor: 3Configs: segment.bytes=1073741824 > Topic: quickstart-events3 Partition: 0Leader: none > Replicas: 7,2,6 Isr: 6 > Topic: quickstart-events3 Partition: 1Leader: none > Replicas: 2,6,7 Isr: 6 > Topic: quickstart-events3 Partition: 2Leader: none > Replicas: 6,7,2 Isr: 6 > {code} > The log snippet in the controller : > {code:java} > # handling 1st assignReplicaToDirs request > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] > offline-dir-assignment: changing partition(s): quickstart-events3-0, > quickstart-events3-2, quickstart-events3-1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [7K5JBERyyqFFxIXSXYluJA, AA, AA], > partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition > change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, > isr=null, leader=-2, replicas=null, removingReplicas=null, > addingReplicas=null, leaderRecoveryState=-1, > directories=[7K5JBERyyqFFxIXSXYluJA, AA, > AA], eligibleLeaderReplicas=null, lastKnownELR=null) for > topic quickstart-events3 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [AA, 7K5JBERyyqFFxIXSXYluJA, AA], > partitionEpoch: 0 -> 1
[PR] KAFKA-16166: Generify RetryWithToleranceOperator and ErrorReporter classes [kafka]
gharris1727 opened a new pull request, #15233: URL: https://github.com/apache/kafka/pull/15233 This is a follow-up to #15154 which propagates the generic type for ProcessingContext upward through all its call-sites. I decided to avoid generifying the WorkerTask and instead separate the common retryWithToleranceOperator field into two fields with different types, in WorkerSinkTask and AbstractWorkerSourceTask, as the only operation which was present in the WorkerTask was the retryWithToleranceOperator.triggerStop() call. This now makes the type system enforce that the DeadLetterQueueReporter only works for sink tasks, and the LogReporter must be specialized as either LogReporter.Sink or LogReporter.Source. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16162: [WIP] [kafka]
pprovenzano opened a new pull request, #15232: URL: https://github.com/apache/kafka/pull/15232 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]
cmccabe commented on PR #15193: URL: https://github.com/apache/kafka/pull/15193#issuecomment-1899239682 I put this into a table in hopes that it will make it clearer. cc @mumrah -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16166) Generify RetryWithToleranceOperator and ErrorReporter
Greg Harris created KAFKA-16166: --- Summary: Generify RetryWithToleranceOperator and ErrorReporter Key: KAFKA-16166 URL: https://issues.apache.org/jira/browse/KAFKA-16166 Project: Kafka Issue Type: Improvement Components: connect Reporter: Greg Harris Assignee: Greg Harris The RetryWithToleranceOperator and ErrorReporter instances in connect are only ever used with a single type of ProcessingContext (ProcessingContext for sources, ProcessingContext> for sinks) and currently dynamically decide between these with instanceof checks. Instead, these classes should be generic, and have their implementations accept consistent ProcessingContext objects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16087) Tasks dropping incorrect records when errors.tolerance=all and errors reported asynchronously due to data race
[ https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-16087. - Fix Version/s: 3.8.0 Resolution: Fixed > Tasks dropping incorrect records when errors.tolerance=all and errors > reported asynchronously due to data race > -- > > Key: KAFKA-16087 > URL: https://issues.apache.org/jira/browse/KAFKA-16087 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0, 3.2.0, 3.7.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.8.0 > > > The ErrantRecordReporter introduced in KIP-610 (2.6.0) allows sink connectors > to push records to the connector DLQ topic. The implementation of this > reporter interacts with the ProcessingContext within the per-task > RetryWithToleranceOperator. The ProcessingContext stores mutable state about > the current operation, such as what error has occurred or what record is > being operated on. > The ProcessingContext and RetryWithToleranceOperator is also used by the > converter and transformation pipeline of the connector for similar reasons. > When the ErrantRecordReporter#report function is called from SinkTask#put, > there is no contention over the mutable state, as the thread used for > SinkTask#put is also responsible for converting and transforming the record. > However, if ErrantRecordReporter#report is called by an extra thread within > the SinkTask, there is thread contention on the single mutable > ProcessingContext. > This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the > synchronized keyword was added to all methods of RetryWithToleranceOperator > which interact with the ProcessingContext. However, this solution still > allows the RWTO methods to interleave, and produce unintended data races. > Consider the following interleaving: > 1. Thread 1 converts and transforms record A successfully. > 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task. > 3. Thread 1 queues some other thread 2 with some delay to call > ErrantRecordReporter#report(A). > 4. Thread 1 returns from SinkTask#put and polls record B from the consumer. > 5. Thread 1 calls RWTO#execute for a converter or transformation operation. > For example, [converting > headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539] > 6. The operation succeeds, and the ProcessingContext is left with error == > null, or equivalently failed() == false. > 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report. > 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls > [RWTO > executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109] > and returns. > 9. The operation leaves ProcessingContext with error != null, or equivalently > failed() == true. > 10. Thread 1 then resumes execution, and calls [RWTO > failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541] > which evaluates to true. > 11. Thread 1 then drops Record B, even though the header conversion succeeded > without error. > 12. Record B is never delivered to the Sink Task, and never delivered to the > error reporter for processing, despite having produced no error during > processing. > This per-method synchronization for returning nulls and errors separately is > insufficient, and either the data sharing should be avoided or a different > locking mechanism should be used. > A similar flaw exists in source connectors and asynchronous errors reported > by the producer, and was introduced in KIP-779 (3.2.0) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 merged PR #15154: URL: https://github.com/apache/kafka/pull/15154 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on PR #15154: URL: https://github.com/apache/kafka/pull/15154#issuecomment-1899225769 Test failures appear unrelated, and connect:runtime passes for me locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]
apoorvmittal10 commented on code in PR #15228: URL: https://github.com/apache/kafka/pull/15228#discussion_r1457965643 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) { throw new IllegalStateException("Unknown telemetry state: " + localState); } -log.debug("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg); +log.trace("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg); Review Comment: I don't think it will make much difference, seems fine to me. Writing the suggestion will involve changing `msg` in each block. I think it's fine the way it is :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]
apoorvmittal10 commented on code in PR #15228: URL: https://github.com/apache/kafka/pull/15228#discussion_r1457964560 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -379,6 +379,7 @@ public Optional> createRequest() { lock.readLock().unlock(); } +log.debug("Creating telemetry request. Telemetry state: {}", localState); Review Comment: Hmmm, I don't have an opinion there, either seems fine to me. Your wish is my command :). Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]
jeffkbkim commented on code in PR #15221: URL: https://github.com/apache/kafka/pull/15221#discussion_r1457909168 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -685,6 +712,20 @@ private boolean hasPendingTransactionalOffsets( return false; } +/** + * @return true iff there is a committed offset in the main offset store for the + * given group, topic and partition. + * + * Package private for testing. + */ +boolean hadCommittedOffset( Review Comment: nit: should this be hasCommittedOffset? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -982,12 +1040,15 @@ public void replayEndTransactionMarker( log.debug("Committed transaction offset commit for producer id {} in group {} " + "with topic {}, partition {}, and offset {}.", producerId, groupId, topicName, partitionId, offsetAndMetadata); -offsets.put( +OffsetAndMetadata previousValue = offsets.put( groupId, topicName, partitionId, offsetAndMetadata ); +if (previousValue == null) { +metrics.incrementNumOffsets(); +} Review Comment: thanks for adding this! ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -604,22 +602,23 @@ public CoordinatorResult deleteOffsets( ) ); } else { -final TimelineHashMap offsetsByPartition = offsetsByTopic == null ? -null : offsetsByTopic.get(topic.name()); -if (offsetsByPartition != null) { -topic.partitions().forEach(partition -> { -if (offsetsByPartition.containsKey(partition.partitionIndex())) { -responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() -.setPartitionIndex(partition.partitionIndex()) -); - records.add(RecordHelpers.newOffsetCommitTombstoneRecord( -request.groupId(), -topic.name(), -partition.partitionIndex() -)); -} -}); -} +topic.partitions().forEach(partition -> { +// We always add the partition to the response. +responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +); + +// A tombstone is written if an offset is present is the main storage or Review Comment: nit: "is present in" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15843: --- Description: Legacy coordinator triggers onPartitionsAssigned with empty assignment (which is not the case when triggering onPartitionsRevoked or Lost). This is the behaviour of the legacy coordinator, and the new consumer implementation maintains the same principle. We should review this to fully understand if it is really needed to call onPartitionsAssigned with empty assignment (or if it should behave consistently with the onRevoke/Lost). Note that the consumer integration tests rely on this call to onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala) was:Legacy coordinator triggers onPartitionsAssigned with empty assignment (which is not the case when triggering onPartitionsRevoked or Lost). This is the behaviour of the legacy coordinator, and the new consumer implementation maintains the same principle. We should review this to fully understand if it is really needed to call onPartitionsAssigned with empty assignment (or if it should behave consistently with the onRevoke/Lost) > Review consumer onPartitionsAssigned called with empty partitions > - > > Key: KAFKA-15843 > URL: https://issues.apache.org/jira/browse/KAFKA-15843 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.8.0 > > > Legacy coordinator triggers onPartitionsAssigned with empty assignment (which > is not the case when triggering onPartitionsRevoked or Lost). This is the > behaviour of the legacy coordinator, and the new consumer implementation > maintains the same principle. We should review this to fully understand if it > is really needed to call onPartitionsAssigned with empty assignment (or if it > should behave consistently with the onRevoke/Lost). > Note that the consumer integration tests rely on this call to > onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]
philipnee commented on code in PR #15228: URL: https://github.com/apache/kafka/pull/15228#discussion_r1457945378 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) { throw new IllegalStateException("Unknown telemetry state: " + localState); } -log.debug("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg); +log.trace("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg); Review Comment: so that we can remove the "returning the value 224678 ms" - it was a bit hard to understand what it actually mean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]
philipnee commented on code in PR #15228: URL: https://github.com/apache/kafka/pull/15228#discussion_r1457944304 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) { throw new IllegalStateException("Unknown telemetry state: " + localState); } -log.debug("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg); +log.trace("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg); Review Comment: Do you think it makes sense to rephrase the log? For the `msg` can we explicitly say: ...client will wait for {}ms before submitting the next... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]
philipnee commented on code in PR #15228: URL: https://github.com/apache/kafka/pull/15228#discussion_r1457941137 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -379,6 +379,7 @@ public Optional> createRequest() { lock.readLock().unlock(); } +log.debug("Creating telemetry request. Telemetry state: {}", localState); Review Comment: it would be useful to know the type of the request client is sending: either it is a subscriptionRequest or push request. I guess we can derive it from the current state, but state is rather an internal thing so that the person looking at it might not know what it means. my suggestion is to be explicit about "we are sending a push request" or "we are sending a subscription request". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]
mjsax commented on code in PR #15219: URL: https://github.com/apache/kafka/pull/15219#discussion_r1457939699 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ## @@ -146,7 +146,7 @@ public static QueryResult handleBasicQueries( "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" ); } -result.setPosition(position); +result.setPosition(position.copy()); Review Comment: Thanks. I was hoping it would be thready-safe already -- maybe not. Let me double check the code and figure it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
artemlivshits commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1457923250 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition + * index that is not included in the result. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerNameThe listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * @param upperIndex The upper limit of the index of the partitions to be included in the result. + *Note that, the upper index can be larger than the largest partition index in + *this topic. + * @returnA collection of topic partition metadata and next partition index (-1 means + *no next partition). + */ + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName, +startIndex: Int, +maxCount: Int + ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = { +Option(image.topics().getTopic(topicName)) match { + case None => (None, -1) + case Some(topic) => { +val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]() +val partitions = topic.partitions().keySet() +val upperIndex = topic.partitions().size().min(startIndex + maxCount) +val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1 +for (partitionId <- startIndex until upperIndex) { + topic.partitions().get(partitionId) match { +case partition : PartitionRegistration => { + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { +case None => + result.append(new DescribeTopicPartitionsResponsePartition() +.setPartitionIndex(partitionId) +.setLeaderId(MetadataResponse.NO_LEADER_ID) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +.setEligibleLeaderReplicas(Replicas.toList(partition.elr)) +.setLastKnownElr(Replicas.toList(partition.lastKnownElr))) +case Some(leader) => + result.append(new DescribeTopicPartitionsResponsePartition() +.setPartitionIndex(partitionId) +.setLeaderId(leader.id()) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +.setEligibleLeaderReplicas(Replicas.toList(partition.elr)) +.setLastKnownElr(Replicas.toList(partition.lastKnownElr))) + } +} +case _ => Review Comment: Definitely should log an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]
jolshan commented on code in PR #15212: URL: https://github.com/apache/kafka/pull/15212#discussion_r1457917435 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -875,30 +875,43 @@ private void maybeRemovePartitionEpoch( ConsumerGroupMember oldMember ) { if (oldMember != null) { -removePartitionEpochs(oldMember.assignedPartitions()); -removePartitionEpochs(oldMember.partitionsPendingRevocation()); +removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch()); +removePartitionEpochs(oldMember.partitionsPendingRevocation(), oldMember.memberEpoch()); } } /** * Removes the partition epochs based on the provided assignment. * * @param assignmentThe assignment. + * @param expectedEpoch The expected epoch. + * @throws IllegalStateException if the epoch does not match the expected one. + * package-private for testing. */ -private void removePartitionEpochs( -Map> assignment +void removePartitionEpochs( +Map> assignment, +int expectedEpoch ) { assignment.forEach((topicId, assignedPartitions) -> { currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { if (partitionsOrNull != null) { -assignedPartitions.forEach(partitionsOrNull::remove); +assignedPartitions.forEach(partitionId -> { +Integer prevValue = partitionsOrNull.remove(partitionId); +if (prevValue != expectedEpoch) { +throw new IllegalStateException( +String.format("Cannot remove the epoch %d from %s-%s because the partition is " + +"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue)); +} +}); if (partitionsOrNull.isEmpty()) { return null; } else { return partitionsOrNull; } } else { -return null; +throw new IllegalStateException( Review Comment: What is the affect of throwing this error? Do we also block removing the rest of the partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16164: Pre-Vote RPCs [part 1] [kafka]
ahuang98 opened a new pull request, #15231: URL: https://github.com/apache/kafka/pull/15231 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]
jolshan commented on code in PR #15212: URL: https://github.com/apache/kafka/pull/15212#discussion_r1457913561 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -861,19 +861,9 @@ private void maybeUpdatePartitionEpoch( ConsumerGroupMember oldMember, ConsumerGroupMember newMember ) { -if (oldMember == null) { -addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch()); -addPartitionEpochs(newMember.partitionsPendingRevocation(), newMember.memberEpoch()); -} else { -if (!oldMember.assignedPartitions().equals(newMember.assignedPartitions())) { -removePartitionEpochs(oldMember.assignedPartitions()); -addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch()); -} -if (!oldMember.partitionsPendingRevocation().equals(newMember.partitionsPendingRevocation())) { -removePartitionEpochs(oldMember.partitionsPendingRevocation()); -addPartitionEpochs(newMember.partitionsPendingRevocation(), newMember.memberEpoch()); -} -} +maybeRemovePartitionEpoch(oldMember); Review Comment: That was the only thing I could think of as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
divijvaidya commented on PR #15133: URL: https://github.com/apache/kafka/pull/15133#issuecomment-1899108113 > @divijvaidya , since Satish is busy, could you help review this PR? We'd like to get it into v3.7.0 for the completion of KIP-963. Thanks. Sorry I have been busy with work lately. Will look at this first thing tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
mumrah commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1457908869 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition + * index that is not included in the result. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerNameThe listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * @param upperIndex The upper limit of the index of the partitions to be included in the result. + *Note that, the upper index can be larger than the largest partition index in + *this topic. + * @returnA collection of topic partition metadata and next partition index (-1 means + *no next partition). + */ + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName, +startIndex: Int, +maxCount: Int + ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = { +Option(image.topics().getTopic(topicName)) match { + case None => (None, -1) + case Some(topic) => { +val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]() +val partitions = topic.partitions().keySet() +val upperIndex = topic.partitions().size().min(startIndex + maxCount) +val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1 +for (partitionId <- startIndex until upperIndex) { + topic.partitions().get(partitionId) match { +case partition : PartitionRegistration => { + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { +case None => + result.append(new DescribeTopicPartitionsResponsePartition() +.setPartitionIndex(partitionId) +.setLeaderId(MetadataResponse.NO_LEADER_ID) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +.setEligibleLeaderReplicas(Replicas.toList(partition.elr)) +.setLastKnownElr(Replicas.toList(partition.lastKnownElr))) +case Some(leader) => + result.append(new DescribeTopicPartitionsResponsePartition() +.setPartitionIndex(partitionId) +.setLeaderId(leader.id()) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +.setEligibleLeaderReplicas(Replicas.toList(partition.elr)) +.setLastKnownElr(Replicas.toList(partition.lastKnownElr))) + } +} +case _ => Review Comment: Should we throw an ISE here rather than silently continue? Maybe we could just log an error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15807) Add support for compression/decompression of metrics
[ https://issues.apache.org/jira/browse/KAFKA-15807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal resolved KAFKA-15807. --- Resolution: Done > Add support for compression/decompression of metrics > > > Key: KAFKA-15807 > URL: https://issues.apache.org/jira/browse/KAFKA-15807 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()
[ https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-5863: --- Fix Version/s: 3.8.0 > Potential null dereference in DistributedHerder#reconfigureConnector() > -- > > Key: KAFKA-5863 > URL: https://issues.apache.org/jira/browse/KAFKA-5863 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Ted Yu >Assignee: Greg Harris >Priority: Minor > Fix For: 3.8.0 > > > Here is the call chain: > {code} > RestServer.httpRequest(reconfigUrl, "POST", > taskProps, null); > {code} > In httpRequest(): > {code} > } else if (responseCode >= 200 && responseCode < 300) { > InputStream is = connection.getInputStream(); > T result = JSON_SERDE.readValue(is, responseFormat); > {code} > For readValue(): > {code} > public T readValue(InputStream src, TypeReference valueTypeRef) > throws IOException, JsonParseException, JsonMappingException > { > return (T) _readMapAndClose(_jsonFactory.createParser(src), > _typeFactory.constructType(valueTypeRef)); > {code} > Then there would be NPE in constructType(): > {code} > public JavaType constructType(TypeReference typeRef) > { > // 19-Oct-2015, tatu: Simpler variant like so should work > return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]
gharris1727 merged PR #13294: URL: https://github.com/apache/kafka/pull/13294 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15190: URL: https://github.com/apache/kafka/pull/15190#discussion_r145796 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1140,9 +1141,9 @@ private[kafka] class Processor( expiredConnectionsKilledCount.record(null, 1, 0) } else { val connectionId = receive.source -val context = new RequestContext(header, connectionId, channel.socketAddress, - channel.principal, listenerName, securityProtocol, - channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde) +val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()), Review Comment: I find RequestContext is either created in SocketServer or during forwarding requests. We currently don't require client port information outside KIP-714, push telemetry request is not marked forwardable, wiring the client port information elsewhere seems not useful at this point of time hence I marked it as Optional. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]
vvcephei commented on code in PR #15219: URL: https://github.com/apache/kafka/pull/15219#discussion_r1457894664 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ## @@ -146,7 +146,7 @@ public static QueryResult handleBasicQueries( "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" ); } -result.setPosition(position); +result.setPosition(position.copy()); Review Comment: Better yet, we could make the copy at the beginning of this method or even on the caller side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]
gharris1727 commented on PR #13294: URL: https://github.com/apache/kafka/pull/13294#issuecomment-1899079016 Test failures appear unrelated, and the connect and mirror tests pass locally for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]
apoorvmittal10 commented on PR #15228: URL: https://github.com/apache/kafka/pull/15228#issuecomment-1899066505 > Thanks @apoorvmittal10 - Could you elaborate on the purpose of this log line? how important is it for the user to know about the "next update time"? I wonder if we could just log it when update is happening. @philipnee The purpose of the log line was to debug if reporter is actually working and what's the time remaining while testing (was helpful in development). Rather than completely removing the line, I have moved it to trace so there is still some way for debugging later. I have added another debug log line which will only be logged when a telemetry request is created in accordance with push interval time interval so that ll be minimal and will still let developer an idea if telemetry is working and in which state (get subscription or push telemetry). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16165) Consumer invalid transition on expired poll interval
Lianet Magrans created KAFKA-16165: -- Summary: Consumer invalid transition on expired poll interval Key: KAFKA-16165 URL: https://issues.apache.org/jira/browse/KAFKA-16165 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]
apoorvmittal10 commented on code in PR #15228: URL: https://github.com/apache/kafka/pull/15228#discussion_r1457875345 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -362,7 +362,6 @@ public long timeToNextUpdate(long requestTimeoutMs) { throw new IllegalStateException("Unknown telemetry state: " + localState); } -log.debug("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg); Review Comment: I think emitting metrics would have been helpful if we want to derive some meaningful information about the running reporter, but here the log was helpful to see if reporter is working correctly while testing/debugging. Rather than removing it, I moved it to trace so still there is a way to know what's the state of reporter if we need to debug application. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]
cmccabe opened a new pull request, #15230: URL: https://github.com/apache/kafka/pull/15230 When a broker is down, and a topic is deleted, this will result in that broker seeing "stray replicas" the next time it starts up. These replicas contain data that used to be important, but which now needs to be deleted. Stray replica deletion is handled during the initial metadata publishing step on the broker. Previously, we deleted these stray replicas after starting up BOTH LogManager and ReplicaManager. However, this wasn't quite correct. The presence of the stray replicas confused ReplicaManager. Instead, we should delete the stray replicas BEFORE starting ReplicaManager. This bug triggered when a topic was deleted and re-created while a broker was down, and some of the replicas of the re-created topic landed on that broker. The impact was that the stray replicas were deleted, but the new replicas for the next iteration of the topic never got created. This, in turn, led to persistent under-replication until the next time the broker was restarted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
OmniaGM commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1457856799 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: it will be very small one which I don't believe it worth it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1457832081 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -712,13 +712,14 @@ public void run() { try { // Apply the records to the state machine. if (result.replayRecords()) { -result.records().forEach(record -> +for (int i = 0; i < result.records().size(); i++) { context.coordinator.replay( +prevLastWrittenOffset + i, Review Comment: could we maybe leave a comment about that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1457831330 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -918,7 +920,7 @@ public void replay( groupId, topic, partition, -OffsetAndMetadata.fromRecord(value) +OffsetAndMetadata.fromRecord(offset, value) Review Comment: I wonder if that is easy to get confused. And if there is a way to make it easier to known what the parameters mean -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]
mjsax commented on PR #15151: URL: https://github.com/apache/kafka/pull/15151#issuecomment-1898982326 `trunk` was broken -- rebased. Also re-triggered system tests just to be sure: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6038/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode
[ https://issues.apache.org/jira/browse/KAFKA-16163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808336#comment-17808336 ] Mickael Maison commented on KAFKA-16163: It looks like this behavior was introduced in https://github.com/apache/kafka/commit/37416e1aebae33d01d5059ba906ec8e0e1107284 > Constant resignation/reelection of controller when starting a single node in > combined mode > -- > > Key: KAFKA-16163 > URL: https://issues.apache.org/jira/browse/KAFKA-16163 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Priority: Major > > When starting a single node in combined mode: > {noformat} > $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" > $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c > config/kraft/server.properties > $ bin/kafka-server-start.sh config/kraft/server.properties{noformat} > > it's constantly spamming the logs with: > {noformat} > [2024-01-18 17:37:09,065] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch > request from the majority of the voters within 3000ms. Current fetched voters > are []. (org.apache.kafka.raft.LeaderState) > [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to > ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, > unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, > epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, > metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], > voterStates={1=ReplicaState(nodeId=1, > endOffset=Optional[LogOffsetMetadata(offset=835, > metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], > lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, > hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState) > [2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,072] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,123] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,124] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,175] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,176] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,227] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 > name=heartbeat] Client requested disconnect from node 1 > (org.apache.kafka.clients.NetworkClient) > [2024-01-18 17:37:13,229] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread) > [2024-01-18 17:37:13,279] INFO > [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, > from now on will use node localhost:9093 (id: 1 rack: null) > (kafka.server.NodeToControllerRequestThread){noformat} > This did not happen in 3.6. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16164) Pre-Vote
Alyssa Huang created KAFKA-16164: Summary: Pre-Vote Key: KAFKA-16164 URL: https://issues.apache.org/jira/browse/KAFKA-16164 Project: Kafka Issue Type: Improvement Reporter: Alyssa Huang Implementing pre-vote as described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode
Mickael Maison created KAFKA-16163: -- Summary: Constant resignation/reelection of controller when starting a single node in combined mode Key: KAFKA-16163 URL: https://issues.apache.org/jira/browse/KAFKA-16163 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Mickael Maison When starting a single node in combined mode: {noformat} $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties $ bin/kafka-server-start.sh config/kraft/server.properties{noformat} it's constantly spamming the logs with: {noformat} [2024-01-18 17:37:09,065] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch request from the majority of the voters within 3000ms. Current fetched voters are []. (org.apache.kafka.raft.LeaderState) [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], voterStates={1=ReplicaState(nodeId=1, endOffset=Optional[LogOffsetMetadata(offset=835, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState) [2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient) [2024-01-18 17:37:13,072] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:13,123] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient) [2024-01-18 17:37:13,124] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:13,175] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient) [2024-01-18 17:37:13,176] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:13,227] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient) [2024-01-18 17:37:13,229] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) [2024-01-18 17:37:13,279] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread){noformat} This did not happen in 3.6. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]
mjsax commented on PR #15197: URL: https://github.com/apache/kafka/pull/15197#issuecomment-1898958611 Thanks. The fix was pushed after I left my comment. Glad it's resolved. Yeah, I did expect that two overlapping PR got merged simultaneously. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16092) Queues for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield updated KAFKA-16092: - Labels: queues-for-kafka (was: ) > Queues for Kafka > > > Key: KAFKA-16092 > URL: https://issues.apache.org/jira/browse/KAFKA-16092 > Project: Kafka > Issue Type: New Feature >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: queues-for-kafka > > This Jira tracks the development of KIP-932: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15727: Added KRaft support in AlterUserScramCredentialsRequestNotAuthorizedTest [kafka]
adixitconfluent commented on PR #15224: URL: https://github.com/apache/kafka/pull/15224#issuecomment-1898856832 As represented by the CLI screenshot in the PR description, the tests that have been changed are passing. However the build is failing. Adding a screenshot of no new test failures from build. I can confirm the 2 tests `testAlterNothingNotAuthorized` and `testAlterSomethingNotAuthorized`I have changed are not a part of failing tests in build. Test failures - https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15224/1/tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]
philipnee commented on code in PR #15210: URL: https://github.com/apache/kafka/pull/15210#discussion_r1457711259 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1547,6 +1553,29 @@ public String toString() { } } +private class ConsumerCoordinatorMetrics { Review Comment: I split the implementation into two classes because there's no need to pass the ref of this entire object to the request manager just for the commitSensor (see addCommitSensor method). Instead, I think it would be a lot easier to pass the Metrics object to the manager and create their own sensors (essentially these metrics objects just hold a bunch of sensors referenced from Metrics). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
nizhikov commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1457709089 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: Can ew introduce new module and config into separate PR? ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: Can we introduce new module and config into separate PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
nizhikov commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1457704867 ## checkstyle/import-control.xml: ## @@ -261,6 +261,10 @@ + + Review Comment: Do we really need this empty block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
nizhikov commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1457704365 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,6 +20,7 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { + Review Comment: Do we really need this empty line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
C0urante commented on PR #15218: URL: https://github.com/apache/kafka/pull/15218#issuecomment-1898819047 @anurag-harness This looks like an accidentally-opened PR and it copies the title from https://github.com/apache/kafka/pull/15215, which may lead to some confusion. I've closed it for now; please feel free to reopen if there are legitimate changes to the code base you'd like to propose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
C0urante closed pull request #15218: KAFKA-16107: Stop fetching while onPartitionsAssign completes URL: https://github.com/apache/kafka/pull/15218 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] DO NOT MERGE: Isolate Connect OffsetsApiIntegrationTest [kafka]
C0urante commented on PR #15226: URL: https://github.com/apache/kafka/pull/15226#issuecomment-1898810463 Looks like isolating a single test suite removes the conditions that lead to flakiness. Closing in favor of https://github.com/apache/kafka/pull/15229 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org