[jira] [Comment Edited] (KAFKA-16603) Data loss when kafka connect sending data to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840652#comment-17840652 ] Anil Dasari edited comment on KAFKA-16603 at 4/25/24 5:04 AM: -- Issue is because of out of order acks. So, Not sure if this is kafka connect issue or connector implementation. Please find the detailed conversation [here|https://debezium.zulipchat.com/#narrow/stream/348249-community-postgresql/topic/Data.20loss.20on.20connector.20restart]. This [doc|https://github.com/apache/kafka/blob/864744ffd4ddc3b0d216a3049ee0c61e9c0d3ad1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L901] says that acks order is guaranteed for records that are send to same partition. IIUC, KafkaProducer#send doesn't the guarantee the order of acks for the records that belong to different partitions. Could someone confirm or clarify ? thanks in advance. was (Author: JIRAUSER283879): Issue is because of out of order acks. So, Not sure if this is kafka connect issue or connector implementation. Please find the detailed conversation [here|https://debezium.zulipchat.com/#narrow/stream/348249-community-postgresql/topic/Data.20loss.20on.20connector.20restart]. This [doc|https://github.com/apache/kafka/blob/864744ffd4ddc3b0d216a3049ee0c61e9c0d3ad1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L901] says that acks order is guaranteed for records that are send to same partition. IIUC, KafkaProducer#send doesn't the guarantee the order of acks for the records that belong to different partitions. Could someone confirm or clarify ? thanks in advance. > Data loss when kafka connect sending data to Kafka > -- > > Key: KAFKA-16603 > URL: https://issues.apache.org/jira/browse/KAFKA-16603 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.3.1 >Reporter: Anil Dasari >Priority: Major > > We are experiencing a data loss when Kafka Source connector is failed to send > data to Kafka topic and offset topic. > Kafka cluster and Kafka connect details: > # Kafka connect version i.e client : Confluent community version 7.3.1 i.e > Kafka 3.3.1 > # Kafka version: 0.11.0 (server) > # Cluster size : 3 brokers > # Number of partitions in all topics = 3 > # Replication factor = 3 > # Min ISR set 2 > # Uses no transformations in Kafka connector > # Use default error tolerance i.e None. > Our connector checkpoints the offsets info received in > SourceTask#commitRecord and resume the data process from the persisted > checkpoint. > The data loss is noticed when broker is unresponsive for few mins due to high > load and kafka connector was restarted. Also, Kafka connector graceful > shutdown failed. > Logs: > > {code:java} > [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group > coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) > Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator > 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due > to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be > attempted. > Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from > last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) > Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected. > Apr 22, 2024 @ 15:56:16.708 [Producer > clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 > disconnected. > Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 > disconnected. > Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator > 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due > to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be > attempted. > Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log > **) > Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was > unreachable for 3000ms. Revoking previous assignment Assignment{error=0, > leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', > leaderUrl='http://10.75.100.46:8083/', offset=4, > connectorIds=[d094a5d7bbb046b99d62398cb84d648c], > taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], > revokedTaskIds=[], delay=0} to avoid running tasks while
[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840652#comment-17840652 ] Anil Dasari commented on KAFKA-16603: - Issue is because of out of order acks. So, Not sure if this is kafka connect issue or connector implementation. Please find the detailed conversation [here|https://debezium.zulipchat.com/#narrow/stream/348249-community-postgresql/topic/Data.20loss.20on.20connector.20restart]. This [doc|https://github.com/apache/kafka/blob/864744ffd4ddc3b0d216a3049ee0c61e9c0d3ad1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L901] says that acks order is guaranteed for records that are send to same partition. IIUC, KafkaProducer#send doesn't the guarantee the order of acks for the records that belong to different partitions. Could someone confirm or clarify ? thanks in advance. > Data loss when kafka connect sending data to Kafka > -- > > Key: KAFKA-16603 > URL: https://issues.apache.org/jira/browse/KAFKA-16603 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.3.1 >Reporter: Anil Dasari >Priority: Major > > We are experiencing a data loss when Kafka Source connector is failed to send > data to Kafka topic and offset topic. > Kafka cluster and Kafka connect details: > # Kafka connect version i.e client : Confluent community version 7.3.1 i.e > Kafka 3.3.1 > # Kafka version: 0.11.0 (server) > # Cluster size : 3 brokers > # Number of partitions in all topics = 3 > # Replication factor = 3 > # Min ISR set 2 > # Uses no transformations in Kafka connector > # Use default error tolerance i.e None. > Our connector checkpoints the offsets info received in > SourceTask#commitRecord and resume the data process from the persisted > checkpoint. > The data loss is noticed when broker is unresponsive for few mins due to high > load and kafka connector was restarted. Also, Kafka connector graceful > shutdown failed. > Logs: > > {code:java} > [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group > coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) > Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator > 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due > to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be > attempted. > Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from > last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) > Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected. > Apr 22, 2024 @ 15:56:16.708 [Producer > clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 > disconnected. > Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 > disconnected. > Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator > 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due > to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be > attempted. > Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log > **) > Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was > unreachable for 3000ms. Revoking previous assignment Assignment{error=0, > leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', > leaderUrl='http://10.75.100.46:8083/', offset=4, > connectorIds=[d094a5d7bbb046b99d62398cb84d648c], > taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], > revokedTaskIds=[], delay=0} to avoid running tasks while not being a member > the group > Apr 22, 2024 @ 15:56:19.866 Stopping connector > d094a5d7bbb046b99d62398cb84d648c > Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0 > Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for > WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c} > Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' > failed to properly shut down, has become unresponsive, and may be consuming > external resources. Correct the configuration for this connector or remove > the connector. After fixing the connector, it may be necessary to restart > this worker to release any consumed resources. > Apr 22, 2024 @ 15:56:24.110 [Producer > clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0]
[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840639#comment-17840639 ] Matthias J. Sax commented on KAFKA-16584: - Yes, it does required to write code. – It does also require to write a KIP: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
github-actions[bot] commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-2076286592 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: make sure all ConsumerGroupServices get closed [kafka]
showuon commented on code in PR #15801: URL: https://github.com/apache/kafka/pull/15801#discussion_r1578771823 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -171,8 +162,8 @@ private String[] getArgs(String group, String topic) { }; } -private void setupConsumerGroupService(String[] args) { -consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService( +private static ConsumerGroupCommand.ConsumerGroupService consumerGroupService(String[] args) { Review Comment: Why should we use static method here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840631#comment-17840631 ] dujian commented on KAFKA-16584: hello [~mjsax] I would like to confirm whether this problem requires code modification. If so, can assign it to me? > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] HOTFIX: make sure all ConsumerGroupServices get closed [kafka]
FrankYang0529 commented on PR #15801: URL: https://github.com/apache/kafka/pull/15801#issuecomment-2076141433 @chia7712, thanks for the hotfix. I'm very sorry for the inconvenient. The fix LGTM. It's better to use try-with-resource to close resource. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
chia7712 commented on PR #15786: URL: https://github.com/apache/kafka/pull/15786#issuecomment-2076135599 > It looks like the previous CI build had an issue with the Java 8/Scala 2.12 pipeline. I rekicked a build. The root cause is that some services are not closed. I file a PR (#15801) to fix that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] HOTFIX: make sure all ConsumerGroupServices get closed [kafka]
chia7712 opened a new pull request, #15801: URL: https://github.com/apache/kafka/pull/15801 Some services are not closed, so our CI print following error. ``` org.opentest4j.AssertionFailedError: Found 16 unexpected threads during @BeforeAll: `kafka-admin-client-thread | adminclient-287,kafka-admin-client-thread | adminclient-276,kafka-admin-client-thread | adminclient-271,kafka-admin-client-thread | adminclient-293,kafka-admin-client-thread | adminclient-281,kafka-admin-client-thread | adminclient-302,kafka-admin-client-thread | adminclient-334,kafka-admin-client-thread | adminclient-323,kafka-admin-client-thread | adminclient-257,kafka-admin-client-thread | adminclient-336,kafka-admin-client-thread | adminclient-308,kafka-admin-client-thread | adminclient-263,kafka-admin-client-thread | adminclient-273,kafka-admin-client-thread | adminclient-278,kafka-admin-client-thread | adminclient-283,kafka-admin-client-thread | adminclient-317` ==> expected: but was: ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16610) Replace "Map#entrySet#forEach" by "Map#forEach"
[ https://issues.apache.org/jira/browse/KAFKA-16610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16610. - Resolution: Resolved > Replace "Map#entrySet#forEach" by "Map#forEach" > --- > > Key: KAFKA-16610 > URL: https://issues.apache.org/jira/browse/KAFKA-16610 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.8.0 > > > {quote} > Targets > Occurrences of 'entrySet().forEach' in Project > Found occurrences in Project (16 usages found) > Unclassified (16 usages found) > kafka.core.main (9 usages found) > kafka.server (4 usages found) > ControllerApis.scala (2 usages found) > ControllerApis (2 usages found) > handleIncrementalAlterConfigs (1 usage found) > 774 controllerResults.entrySet().forEach(entry => > response.responses().add( > handleLegacyAlterConfigs (1 usage found) > 533 controllerResults.entrySet().forEach(entry => > response.responses().add( > ControllerConfigurationValidator.scala (2 usages found) > ControllerConfigurationValidator (2 usages found) > validate (2 usages found) > 99 config.entrySet().forEach(e => { > 114 config.entrySet().forEach(e => > properties.setProperty(e.getKey, e.getValue)) > kafka.server.metadata (5 usages found) > AclPublisher.scala (1 usage found) > AclPublisher (1 usage found) > onMetadataUpdate (1 usage found) > 73 aclsDelta.changes().entrySet().forEach(e => > ClientQuotaMetadataManager.scala (3 usages found) > ClientQuotaMetadataManager (3 usages found) > handleIpQuota (1 usage found) > 119 quotaDelta.changes().entrySet().forEach { e => > update (2 usages found) > 54 quotasDelta.changes().entrySet().forEach { e => > 99 quotaDelta.changes().entrySet().forEach { e => > KRaftMetadataCache.scala (1 usage found) > KRaftMetadataCache (1 usage found) > getClusterMetadata (1 usage found) > 491 topic.partitions().entrySet().forEach { entry > => > kafka.core.test (1 usage found) > unit.kafka.integration (1 usage found) > KafkaServerTestHarness.scala (1 usage found) > KafkaServerTestHarness (1 usage found) > getTopicNames (1 usage found) > 349 > controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach > { > kafka.metadata.main (3 usages found) > org.apache.kafka.controller (2 usages found) > QuorumFeatures.java (1 usage found) > toString() (1 usage found) > 144 localSupportedFeatures.entrySet().forEach(f -> > features.add(f.getKey() + ": " + f.getValue())); > ReplicationControlManager.java (1 usage found) > createTopic(ControllerRequestContext, CreatableTopic, > List, Map, > List, boolean) (1 usage found) > 732 newParts.entrySet().forEach(e -> > assignments.put(e.getKey(), > org.apache.kafka.metadata.properties (1 usage found) > MetaPropertiesEnsemble.java (1 usage found) > toString() (1 usage found) > 610 logDirProps.entrySet().forEach( > kafka.metadata.test (1 usage found) > org.apache.kafka.controller (1 usage found) > ReplicationControlManagerTest.java (1 usage found) > createTestTopic(String, int[][], Map, > short) (1 usage found) > 307 configs.entrySet().forEach(e -> > topic.configs().add( > kafka.streams.main (1 usage found) > org.apache.kafka.streams.processor.internals (1 usage found) > StreamsMetadataState.java (1 usage found) > onChange(Map>, > Map>, Map) (1 > usage found) > 317 topicPartitionInfo.entrySet().forEach(entry -> > this.partitionsByTopic > kafka.tools.main (1 usage found) > org.apache.kafka.tools (1 usage found) > LeaderElectionCommand.java (1 usage found) > electLeaders(Admin, ElectionType, >
[jira] [Updated] (KAFKA-16610) Replace "Map#entrySet#forEach" by "Map#forEach"
[ https://issues.apache.org/jira/browse/KAFKA-16610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16610: Fix Version/s: 3.8.0 > Replace "Map#entrySet#forEach" by "Map#forEach" > --- > > Key: KAFKA-16610 > URL: https://issues.apache.org/jira/browse/KAFKA-16610 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.8.0 > > > {quote} > Targets > Occurrences of 'entrySet().forEach' in Project > Found occurrences in Project (16 usages found) > Unclassified (16 usages found) > kafka.core.main (9 usages found) > kafka.server (4 usages found) > ControllerApis.scala (2 usages found) > ControllerApis (2 usages found) > handleIncrementalAlterConfigs (1 usage found) > 774 controllerResults.entrySet().forEach(entry => > response.responses().add( > handleLegacyAlterConfigs (1 usage found) > 533 controllerResults.entrySet().forEach(entry => > response.responses().add( > ControllerConfigurationValidator.scala (2 usages found) > ControllerConfigurationValidator (2 usages found) > validate (2 usages found) > 99 config.entrySet().forEach(e => { > 114 config.entrySet().forEach(e => > properties.setProperty(e.getKey, e.getValue)) > kafka.server.metadata (5 usages found) > AclPublisher.scala (1 usage found) > AclPublisher (1 usage found) > onMetadataUpdate (1 usage found) > 73 aclsDelta.changes().entrySet().forEach(e => > ClientQuotaMetadataManager.scala (3 usages found) > ClientQuotaMetadataManager (3 usages found) > handleIpQuota (1 usage found) > 119 quotaDelta.changes().entrySet().forEach { e => > update (2 usages found) > 54 quotasDelta.changes().entrySet().forEach { e => > 99 quotaDelta.changes().entrySet().forEach { e => > KRaftMetadataCache.scala (1 usage found) > KRaftMetadataCache (1 usage found) > getClusterMetadata (1 usage found) > 491 topic.partitions().entrySet().forEach { entry > => > kafka.core.test (1 usage found) > unit.kafka.integration (1 usage found) > KafkaServerTestHarness.scala (1 usage found) > KafkaServerTestHarness (1 usage found) > getTopicNames (1 usage found) > 349 > controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach > { > kafka.metadata.main (3 usages found) > org.apache.kafka.controller (2 usages found) > QuorumFeatures.java (1 usage found) > toString() (1 usage found) > 144 localSupportedFeatures.entrySet().forEach(f -> > features.add(f.getKey() + ": " + f.getValue())); > ReplicationControlManager.java (1 usage found) > createTopic(ControllerRequestContext, CreatableTopic, > List, Map, > List, boolean) (1 usage found) > 732 newParts.entrySet().forEach(e -> > assignments.put(e.getKey(), > org.apache.kafka.metadata.properties (1 usage found) > MetaPropertiesEnsemble.java (1 usage found) > toString() (1 usage found) > 610 logDirProps.entrySet().forEach( > kafka.metadata.test (1 usage found) > org.apache.kafka.controller (1 usage found) > ReplicationControlManagerTest.java (1 usage found) > createTestTopic(String, int[][], Map, > short) (1 usage found) > 307 configs.entrySet().forEach(e -> > topic.configs().add( > kafka.streams.main (1 usage found) > org.apache.kafka.streams.processor.internals (1 usage found) > StreamsMetadataState.java (1 usage found) > onChange(Map>, > Map>, Map) (1 > usage found) > 317 topicPartitionInfo.entrySet().forEach(entry -> > this.partitionsByTopic > kafka.tools.main (1 usage found) > org.apache.kafka.tools (1 usage found) > LeaderElectionCommand.java (1 usage found) > electLeaders(Admin, ElectionType, > Optional>)
Re: [PR] KAFKA-16610 Replace "Map#entrySet#forEach" by "Map#forEach" [kafka]
soarez merged PR #15795: URL: https://github.com/apache/kafka/pull/15795 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-6527:Enable DynamicBrokerReconfigurationTest.testDefaultTopicConfig [kafka]
showuon commented on PR #15796: URL: https://github.com/apache/kafka/pull/15796#issuecomment-2076114639 @TaiJuWu , thanks for the PR. One comment: Could you left a comment in this PR to mention you're going to take it over? https://github.com/apache/kafka/pull/13953 . Otherwise, it's a duplicated PR in the community. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16617) Add KRaft info for the `advertised.listeners` doc description
Luke Chen created KAFKA-16617: - Summary: Add KRaft info for the `advertised.listeners` doc description Key: KAFKA-16617 URL: https://issues.apache.org/jira/browse/KAFKA-16617 Project: Kafka Issue Type: Improvement Reporter: Luke Chen Currently, we only write ZK handler in the `advertised.listeners` doc description: > Listeners to publish to ZooKeeper for clients to use, if different than the > listeners config property. We should also add KRaft handler info in the doc ref: https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16571: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition [kafka]
jolshan merged PR #15739: URL: https://github.com/apache/kafka/pull/15739 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2076088235 Separately I need to fix the update tool. It will always say the finalized version is 0 if the tool knows of the feature, even if the broker doesn't include it in the list of finalized features. ``` Feature: metadata.versionSupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.7-IV4FinalizedVersionLevel: 3.7-IV4 Epoch: 17 Feature: test.feature.versionSupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0Epoch: 17 ``` ``` "supportedFeatures":[{"name":"metadata.version","minVersion":1,"maxVersion":19},{"name":"test.feature.version","minVersion":0,"maxVersion":1}],"finalizedFeaturesEpoch":17,"finalizedFeatures":[{"name":"metadata.version","maxVersionLevel":19,"minVersionLevel":19}]} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2076086916 I've cleaned up the code to not set the record in the storage tool when the version is 0. I also cleaned up the log since it is not always the case that the controller doesn't know the version. For now, 0 is a reasonable default. Here are the controller logs when i set the test version to 0 and to 1. ``` [2024-04-24 17:15:05,566] INFO [QuorumController id=1] Creating new QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:15:05,568] INFO [QuorumController id=1] Becoming the active controller at epoch 1, next write offset 1. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:15:05,570] WARN [QuorumController id=1] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap source 'the binary bootstrap metadata file: /tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed BeginTransactionRecord(name='Bootstrap records') at offset 1. (org.apache.kafka.controller.OffsetControlManager) [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed a FeatureLevelRecord setting metadata.version to 3.7-IV4 (org.apache.kafka.controller.FeatureControlManager) [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed EndTransactionRecord() at offset 4. (org.apache.kafka.controller.OffsetControlManager) [2024-04-24 17:15:05,645] INFO [QuorumController id=1] Replayed RegisterControllerRecord contaning ControllerRegistration(id=1, incarnationId=h3WYlEEtTUCG6nOjFnIQxQ, zkMigrationReady=false, listeners=[Endpoint(listenerName='CONTROLLER', securityProtocol=PLAINTEXT, host='10.200.4.27', port=9093)], supportedFeatures={metadata.version: 1-19, test.feature.version: 0-1}). (org.apache.kafka.controller.ClusterControlManager) [2024-04-24 17:15:05,686] INFO [QuorumController id=1] Replayed initial RegisterBrokerRecord for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=6, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) (org.apache.kafka.controller.ClusterControlManager) [2024-04-24 17:15:05,745] INFO [QuorumController id=1] Replayed RegisterBrokerRecord modifying the registration for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=7, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) (org.apache.kafka.controller.ClusterControlManager) [2024-04-24 17:15:05,786] INFO [QuorumController id=1] The request from broker 1 to unfence has been granted because it has caught up with the offset of its register broker record 7. (org.apache.kafka.controller.BrokerHeartbeatManager) [2024-04-24 17:15:05,788] INFO [QuorumController id=1] Replayed BrokerRegistrationChangeRecord modifying the registration for broker 1: BrokerRegistrationChangeRecord(brokerId=1, brokerEpoch=7, fenced=-1, inControlledShutdown=0, logDirs=[]) (org.apache.kafka.controller.ClusterControlManager) ``` ``` 24-04-24 17:16:29,048] INFO [QuorumController id=1] Creating new QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:16:29,050] INFO [QuorumController id=1] Becoming the active controller at epoch 1, next write offset 1. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:16:29,051] WARN [QuorumController id=1] Performing controller activation. The metadata log appears to be empty. Appending 2 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap source 'the binary bootstrap metadata file: /tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:16:29,052] INFO [QuorumController id=1] Replayed BeginTransactionRecord(name='Bootstrap records') at offset 1.
Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
kirktrue commented on code in PR #15737: URL: https://github.com/apache/kafka/pull/15737#discussion_r1578634103 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -140,22 +150,32 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): def __init__(self, node, verify_offsets, idx): super().__init__(node, verify_offsets, idx) -def handle_partitions_revoked(self, event): +def handle_partitions_revoked(self, event, node, logger): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} +revoked = [] + for topic_partition in event["partitions"]: -topic = topic_partition["topic"] -partition = topic_partition["partition"] -self.assignment.remove(TopicPartition(topic, partition)) +tp = _create_partition_from_dict(topic_partition) -def handle_partitions_assigned(self, event): +if tp in self.assignment: +self.assignment.remove(tp) +revoked.append(tp) +else: +logger.warn("Could not remove topic partition %s from assignment as it was not previously assigned to %s" % (tp, node.account.hostname)) Review Comment: @lianetm—I will file a JIra on this in the next day or two. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16164) Pre-Vote
[ https://issues.apache.org/jira/browse/KAFKA-16164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alyssa Huang reassigned KAFKA-16164: Assignee: Alyssa Huang > Pre-Vote > > > Key: KAFKA-16164 > URL: https://issues.apache.org/jira/browse/KAFKA-16164 > Project: Kafka > Issue Type: Improvement >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Major > > Implementing pre-vote as described in > https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16616) refactor mergeWith in MetadataSnapshot
Alyssa Huang created KAFKA-16616: Summary: refactor mergeWith in MetadataSnapshot Key: KAFKA-16616 URL: https://issues.apache.org/jira/browse/KAFKA-16616 Project: Kafka Issue Type: Improvement Affects Versions: 3.7.0 Reporter: Alyssa Huang Right now we keep track of topic ids and partition metadata to add/update separately in mergeWith (e.g. two maps passed as arguments). This means we iterate over topic metadata twice which could be costly when we're dealing with a large number of updates. `updatePartitionLeadership` which calls `mergeWith` does something similarly (generates map of topic ids to update in a loop separate from the list of partition metadata to update) and should be refactored as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
TaiJuWu commented on PR #15800: URL: https://github.com/apache/kafka/pull/15800#issuecomment-2076019523 > @TaiJuWu thanks for this contribution. btw, is this check ( > > https://github.com/apache/kafka/blob/81c222e9779c3339aa139ab930a74aba2c7c8685/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java#L92 > > ) unnecessary? Yes, I believe it’s unnecessary. If we give default value (code on truncated branch), the size must be greater zero, we just need to select one. Here we have two solutions, one is current version, the other is you look last time. I don’t take any position, both are great for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15924) Flaky test - QuorumControllerTest.testFatalMetadataReplayErrorOnActive
[ https://issues.apache.org/jira/browse/KAFKA-15924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840596#comment-17840596 ] Igor Soarez commented on KAFKA-15924: - Another instance: {code:java} org.opentest4j.AssertionFailedError: expected: but was:at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145) at app//org.apache.kafka.controller.QuorumControllerTest.testFatalMetadataReplayErrorOnActive(QuorumControllerTest.java:1274) {code} https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15795/1/tests/ > Flaky test - QuorumControllerTest.testFatalMetadataReplayErrorOnActive > -- > > Key: KAFKA-15924 > URL: https://issues.apache.org/jira/browse/KAFKA-15924 > Project: Kafka > Issue Type: Bug >Reporter: Haruki Okada >Priority: Major > Labels: flaky-test > Attachments: stdout.log > > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/15/tests] > > {code:java} > Error > org.opentest4j.AssertionFailedError: expected: > but was: > Stacktrace > org.opentest4j.AssertionFailedError: expected: > but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) > at > app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) > at > app//org.apache.kafka.controller.QuorumControllerTest.testFatalMetadataReplayErrorOnActive(QuorumControllerTest.java:1132) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at >
[jira] [Commented] (KAFKA-15146) Flaky test ConsumerBounceTest.testConsumptionWithBrokerFailures
[ https://issues.apache.org/jira/browse/KAFKA-15146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840595#comment-17840595 ] Igor Soarez commented on KAFKA-15146: - Another instance https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15795/1/tests/ > Flaky test ConsumerBounceTest.testConsumptionWithBrokerFailures > --- > > Key: KAFKA-15146 > URL: https://issues.apache.org/jira/browse/KAFKA-15146 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Divij Vaidya >Priority: Major > Labels: flaky-test > > Flaky test that fails with the following error. Example build - > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13953/2] > {noformat} > Gradle Test Run :core:integrationTest > Gradle Test Executor 177 > > ConsumerBounceTest > testConsumptionWithBrokerFailures() FAILED > org.apache.kafka.clients.consumer.CommitFailedException: Offset commit > cannot be completed since the consumer is not part of an active group for > auto partition assignment; it is likely that the consumer was kicked out of > the group. > at > app//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1351) > at > app//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1188) > at > app//org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1518) > at > app//org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1417) > at > app//org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1374) > at > app//kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:109) > at > app//kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:81){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
chia7712 commented on code in PR #15800: URL: https://github.com/apache/kafka/pull/15800#discussion_r1578597252 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -123,8 +123,7 @@ private void processClusterTemplate(ExtensionContext context, ClusterTemplate an if (!annot.value().isEmpty()) { generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add); } else { -// Ensure we have at least one cluster config - generatedClusterConfigs.add(ClusterConfig.defaultClusterBuilder().build()); +throw new IllegalStateException("Annotation value can't be empty string."); Review Comment: Could we check `annot.value().isEmpty()` first? for example: ```java if (annot.value().isEmpty()) throw exception; ``` With above change, we can simplify the if-else -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
TaiJuWu opened a new pull request, #15800: URL: https://github.com/apache/kafka/pull/15800 As title. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16609: Update parse_describe_topic to support new topic describe output [kafka]
kirktrue commented on PR #15799: URL: https://github.com/apache/kafka/pull/15799#issuecomment-2075814568 @lucasbru—would you mind taking a look at this change? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16609: Update parse_describe_topic to support new topic describe output [kafka]
kirktrue opened a new pull request, #15799: URL: https://github.com/apache/kafka/pull/15799 The format of the 'describe topic' output was changed as part of KAFKA-15585 which required an update in the parsing logic used by system tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rename RaftConfig to QuorumConfigs [kafka]
chia7712 commented on code in PR #15797: URL: https://github.com/apache/kafka/pull/15797#discussion_r1578498362 ## raft/src/main/java/org/apache/kafka/raft/QuorumConfigs.java: ## @@ -47,7 +47,7 @@ * controller should be able to transition from standby to active without reloading all of * the metadata. The standby is a "hot" standby, not a "cold" one. */ -public class RaftConfig { +public class QuorumConfigs { Review Comment: It seems to me QuorumConfig`s` implies that we should not have instance of it. However, we do instantiate it, and so could we rename it to `QuorumConfig`? Another similar case is `LogConfig`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16615) JoinGroup API for upgrading ConsumerGroup
Dongnuo Lyu created KAFKA-16615: --- Summary: JoinGroup API for upgrading ConsumerGroup Key: KAFKA-16615 URL: https://issues.apache.org/jira/browse/KAFKA-16615 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lianetm commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1578363393 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: This test was not testing the cooperative case before because it was using the base [setup_consumer](https://github.com/apache/kafka/blob/b8b2415d5e006cf91c0f74dcf60b764933c9c1d0/tests/kafkatest/tests/verifiable_consumer_test.py#L56) that has a range assignor by default. We do have a way to detect rebalances (number of calls to partitions assigned), but that does not allow any special check for dynamic only, since static members will get those same calls to partitions assigned (the static membership is only making that the group assignment is not re-computed, so partition is not re-assigned while the session lasts, expecting that the member might come back, but from the callbacks POV, the static member that leaves gets the same as the dynamic one). I agree that the value of the dynamic + cooperative is questionable, but there is value in verifying the message delivery semantics further down, when checking the total consumed. The `test_consumer_bounce` does a very similar check for dynamic members btw, and again, focusing on the delivery semantics, but both tests are not exactly the same (mainly keeping 1 node alive or not), so I would lean towards not removing the combination from this static test, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lianetm commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1578363393 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: This test was not testing the cooperative case before because it was using the base [setup_consumer](https://github.com/apache/kafka/blob/b8b2415d5e006cf91c0f74dcf60b764933c9c1d0/tests/kafkatest/tests/verifiable_consumer_test.py#L56) that has a range assignor by default. We do have a way to detect rebalances (number of calls to partitions assigned), but that does not allow any special check for dynamic only, since static members will get those same calls to partitions assigned (the static membership is only making that the group assignment is not re-computed, so partition is not re-assigned while the session lasts, expecting that the member might come back, but from the callbacks POV the static member that leaves gets the same as the dynamic one). I agree that the value of the dynamic + cooperative is questionable, but there is value in verifying the message delivery semantics further down, when checking the total consumed. The `test_consumer_bounce` does a very similar check for dynamic members btw, and again, focusing on the delivery semantics, but both tests are not exactly the same (mainly keeping 1 node alive or not), so I would lean towards not removing the combination from this static test, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lianetm commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1578363393 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: This test was not testing the cooperative case before because it was using the base [setup_consumer](https://github.com/apache/kafka/blob/b8b2415d5e006cf91c0f74dcf60b764933c9c1d0/tests/kafkatest/tests/verifiable_consumer_test.py#L56) that has a range assignor by default. We do have a way to detect rebalances (number of calls to partitions assigned), but that does not allow any special check for dynamic only, since static members will get those same calls to partitions assigned (the static membership is only making that the group assignment is not re-computed, so partition is not re-assigned while the session lasts, expecting that the member might come back). I agree that the value of the dynamic + cooperative is questionable, but there is value in verifying the message delivery semantics further down, when checking the total consumed. The `test_consumer_bounce` does a very similar check for dynamic members btw, and again, focusing on the delivery semantics, but both tests are not exactly the same (mainly keeping 1 node alive or not), so I would lean towards not removing the combination from this static test, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lianetm commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1578363393 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: This test was not testing the cooperative case before because it was using the base [setup_consumer](https://github.com/apache/kafka/blob/b8b2415d5e006cf91c0f74dcf60b764933c9c1d0/tests/kafkatest/tests/verifiable_consumer_test.py#L56) that has a range assignor by default. We do have a way to detect rebalances (number of calls to partitions assigned), but that does not allow any special check for dynamic only, since static members will get those same calls to partitions assigned (the static membership is only making that the partition is not re-assigned while the session lasts). I agree that the value of the dynamic + cooperative is questionable, but there is value in verifying the message delivery semantics further down, when checking the total consumed. The `test_consumer_bounce` does a very similar check for dynamic members btw, and again, focusing on the delivery semantics, but both tests are not exactly the same (mainly keeping 1 node alive or not), so I would lean towards not removing the combination from this static test, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16614) Disallow `@ClusterTemplate("")`
[ https://issues.apache.org/jira/browse/KAFKA-16614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16614: -- Assignee: TaiJuWu (was: Chia-Ping Tsai) > Disallow `@ClusterTemplate("")` > --- > > Key: KAFKA-16614 > URL: https://issues.apache.org/jira/browse/KAFKA-16614 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > > `@ClusterTemplate` enable us to create dynamic configs, and it expect to > accept a method name which can create server configs at runtime. It throws > error when we pass a nonexistent method name, but it works if we pass an > empty name -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16614) Disallow `@ClusterTemplate("")`
[ https://issues.apache.org/jira/browse/KAFKA-16614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840545#comment-17840545 ] TaiJuWu commented on KAFKA-16614: - Hello [~chia7712] , Could you assign this ticket to me if you don't work currently? > Disallow `@ClusterTemplate("")` > --- > > Key: KAFKA-16614 > URL: https://issues.apache.org/jira/browse/KAFKA-16614 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > `@ClusterTemplate` enable us to create dynamic configs, and it expect to > accept a method name which can create server configs at runtime. It throws > error when we pass a nonexistent method name, but it works if we pass an > empty name -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
lianetm commented on code in PR #15737: URL: https://github.com/apache/kafka/pull/15737#discussion_r1578381540 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -140,22 +150,32 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): def __init__(self, node, verify_offsets, idx): super().__init__(node, verify_offsets, idx) -def handle_partitions_revoked(self, event): +def handle_partitions_revoked(self, event, node, logger): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} +revoked = [] + for topic_partition in event["partitions"]: -topic = topic_partition["topic"] -partition = topic_partition["partition"] -self.assignment.remove(TopicPartition(topic, partition)) +tp = _create_partition_from_dict(topic_partition) -def handle_partitions_assigned(self, event): +if tp in self.assignment: +self.assignment.remove(tp) +revoked.append(tp) +else: +logger.warn("Could not remove topic partition %s from assignment as it was not previously assigned to %s" % (tp, node.account.hostname)) Review Comment: thanks! better I believe. Do we have a Jira to investigate the failure leading to this? it's concerning (and even more if the case is that is happening with the new protocol only??) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16614) Disallow `@ClusterTemplate("")`
Chia-Ping Tsai created KAFKA-16614: -- Summary: Disallow `@ClusterTemplate("")` Key: KAFKA-16614 URL: https://issues.apache.org/jira/browse/KAFKA-16614 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai `@ClusterTemplate` enable us to create dynamic configs, and it expect to accept a method name which can create server configs at runtime. It throws error when we pass a nonexistent method name, but it works if we pass an empty name -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lianetm commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1578363393 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: This test was not testing the cooperative case before because it was using the base [setup_consumer](https://github.com/apache/kafka/blob/b8b2415d5e006cf91c0f74dcf60b764933c9c1d0/tests/kafkatest/tests/verifiable_consumer_test.py#L56) that has a range assignor by default. We do have a way to detect rebalances (number of calls to partitions assigned), but that does not allow any special check for dynamic only, since static members will get those same calls to partitions assigned (the static membership is only making that the partition is not re-assigned/revoked while the session lasts). I agree that the value of the dynamic + cooperative is questionable, but there is value in verifying the message delivery semantics further down, when checking the total consumed. The `test_consumer_bounce` does a very similar check for dynamic members btw, and again, focusing on the delivery semantics, but both tests are not exactly the same (mainly keeping 1 node alive or not), so I would lean towards not removing the combination from this static test, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
chia7712 commented on PR #15786: URL: https://github.com/apache/kafka/pull/15786#issuecomment-2075606243 > It looks like the previous CI build had an issue with the Java 8/Scala 2.12 pipeline. I rekicked a build. oh, I rekicked it too :_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Rename RaftConfig to QuorumConfigs [kafka]
OmniaGM opened a new pull request, #15797: URL: https://github.com/apache/kafka/pull/15797 For context https://github.com/apache/kafka/pull/15775#issuecomment-2075340243 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
OmniaGM commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075595461 > > > Maybe we should rename RaftConfig to QuorumConfig as all configs in QuorumConfig have prefix QUORUM_ > > > This is good point, maybe this can be a followup PR > > After renaming `RaftConfig` to `QuorumConfig`, `KRaftConfigs` created by this PR is a acceptable naming I think Raised here https://github.com/apache/kafka/pull/15797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] :Enable DynamicBrokerReconfigurationTest.testDefaultTopicConfig [kafka]
TaiJuWu opened a new pull request, #15796: URL: https://github.com/apache/kafka/pull/15796 As title ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1578348187 ## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ## @@ -40,6 +40,7 @@ import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET; public class ProduceRequest extends AbstractRequest { +private static final short TRANSACTION_V2_MINIMAL_VERSION = 12; Review Comment: nit: should we have the last version before v2 to avoid the -1s in the methods? For isTransactionalV2Requested, we can have strictly greater than the version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1578346148 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -979,6 +1013,13 @@ void handleCoordinatorReady() { null; this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null && initProducerIdVersion.maxVersion() >= 3; + +// TODO(caliu) use feature version. +ApiVersion produceVersion = nodeApiVersions != null ? +nodeApiVersions.apiVersion(ApiKeys.PRODUCE) : +null; +this.coordinatorSupportsTransactionV2 = produceVersion != null && Review Comment: Sorry I think I used some unclear wording. When I said "checking TV", I meant using the nodeApiVersions to check the TV on requests -- this will be in ApiVersionsResponseData.finalizedFeatures. We will also want to check ApiVersionsResponseData.finalizedFeaturesEpoch when we update the feature version. I was thinking the transaction manager could have some sort of central mechanism that checks the api version and updates on the various requests. This could be done in a helper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on PR #15786: URL: https://github.com/apache/kafka/pull/15786#issuecomment-2075580384 It looks like the previous CI build had an issue with the Java 8/Scala 2.12 pipeline. I rekicked a build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16613) Remove TestUtils#subscribeAndWaitForRecords
[ https://issues.apache.org/jira/browse/KAFKA-16613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16613. Fix Version/s: 3.8.0 Resolution: Fixed > Remove TestUtils#subscribeAndWaitForRecords > --- > > Key: KAFKA-16613 > URL: https://issues.apache.org/jira/browse/KAFKA-16613 > Project: Kafka > Issue Type: Test >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.8.0 > > > After KAFKA-16483, we remove most of usage of > TestUtils#subscribeAndWaitForRecords. The only remaining case uses it is > PlaintextAdminIntegrationTest#testDeleteConsumerGroupOffsets. We can also > remove it because Consumer#poll already has timeout input. We don't need > TestUtils#subscribeAndWaitForRecords to give another waiting wrapper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16613: remove TestUtils#subscribeAndWaitForRecords [kafka]
chia7712 merged PR #15794: URL: https://github.com/apache/kafka/pull/15794 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on PR #15657: URL: https://github.com/apache/kafka/pull/15657#issuecomment-2075537428 Thanks @CalvinConfluent I will try to take a look sometime today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
CalvinConfluent commented on PR #15657: URL: https://github.com/apache/kafka/pull/15657#issuecomment-2075521654 @jolshan Updated the PR 1. Reverted the KafkaApi changes. Now if the produce request with the TransactionV2 supported version, the server will process it no matter what Transaction version is in use. 2. Now the client will use the correct version depending on whether Transaction V2 is enabled. 3. Added Produce request UT and corrected send produce request logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
CalvinConfluent commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1578298683 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -109,7 +109,7 @@ class AddPartitionsToTxnManager( .setTransactionalId(transactionalId) .setProducerId(producerId) .setProducerEpoch(producerEpoch) -.setVerifyOnly(true) +.setVerifyOnly(supportedOperation != addPartition) Review Comment: Reverted the kafka api changes, anywhere else needs to use supportedOperation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
CalvinConfluent commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1578296803 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -402,6 +412,30 @@ public synchronized void maybeAddPartition(TopicPartition topicPartition) { } } +public synchronized void maybeHandlePartitionAdded(TopicPartition topicPartition) { Review Comment: Refactored this part, and removed the previous handling. This method is called as a part of the produce callback and if there is any error in the produceResponse, it will not be called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
chia7712 commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075392820 >> Maybe we should rename RaftConfig to QuorumConfig as all configs in QuorumConfig have prefix QUORUM_ > This is good point, maybe this can be a followup PR After renaming `RaftConfig` to `QuorumConfig`, `KRaftConfigs` created by this PR is a acceptable naming I think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
dajac commented on PR #15755: URL: https://github.com/apache/kafka/pull/15755#issuecomment-2075368110 @jolshan That makes sense. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
OmniaGM commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075357578 > Maybe we should rename RaftConfig to QuorumConfig as all configs in QuorumConfig have prefix QUORUM_ This is good point, maybe this can be a followup PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
OmniaGM commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075356299 > It seems org.apache.kafka.raft.KafkaRaftManager needs org.apache.kafka.server.config.KafkaConfig won't happen if the 5 getters are moved, right? Sorry I maybe wan't clear `org.apache.kafka.raft.KafkaRaftManage` this was an example for depend circle if we start to move quorum raft and Kraft code into `raft` module. `KafkaRaftManage` at the moment is in core under `kafka.raft` package and not `raft` module and it need `KafkaConfig` to initialise `org.apache.kafka.raft.RaftConfig` (which is waiting for `AbstractConfig` and not `KafkaConfig` and to access the following getters `nodeId`, `ProcessRoles`, `metadataLogDir`, `quorumRequestTimeoutMs`, `controllerListenerNames`, `saslMechanismControllerProtocol`, `saslInterBrokerHandshakeRequestEnable`. In the future if we decided to move `kafka.raft.KafkaRaftManage` it is most likely going to be moved into server and not raft -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
chia7712 commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075340243 > I don't think we should merge them with RaftConfig as RaftConfig should be separate as it is only for the quorum raft and not KRAFT mode which are a bit different in my option. Maybe we should rename `RaftConfig` to `QuorumConfig` as all configs in `QuorumConfig` have prefix `QUORUM_` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on PR #15755: URL: https://github.com/apache/kafka/pull/15755#issuecomment-2075323626 Hey @dajac thanks for taking a look. We didn't include https://github.com/apache/kafka/commit/a8203f9c7a2c449ee59ee29d775409b2f1e00d07 in 3.7 so I feel like adding this code is extraneous. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840502#comment-17840502 ] Calvin Liu commented on KAFKA-16217: [~kirktrue] The cherry-pick for the 3.7 is merged. For the 3.6.3 [~chia7712] mentioned we may not have the 3.6.3 release. https://github.com/apache/kafka/pull/15791 > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15743) KRaft support in ReplicationQuotasTest
[ https://issues.apache.org/jira/browse/KAFKA-15743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano reopened KAFKA-15743: --- The {{registerBroker}} function in this test file needs to use the directory UUIDs assigned to the broker and not random UUIDs. If the directory UUIDs do not match then when the broker reregisters as part of startup, any topics with replicas on the broker which were created with the random UUIDs will actually be offline. [~soarez] can comment more here. > KRaft support in ReplicationQuotasTest > -- > > Key: KAFKA-15743 > URL: https://issues.apache.org/jira/browse/KAFKA-15743 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Dmitry Werner >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.8.0 > > > The following tests in ReplicationQuotasTest in > core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala need to be > updated to support KRaft > 59 : def shouldBootstrapTwoBrokersWithLeaderThrottle(): Unit = { > 64 : def shouldBootstrapTwoBrokersWithFollowerThrottle(): Unit = { > 171 : def shouldThrottleOldSegments(): Unit = { > Scanned 240 lines. Found 0 KRaft tests out of 3 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
chia7712 commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075295917 > KafkaConfig will move out of core and into server, however, for the time being I don't plan to make raft depend on server for now as far as I can see the only case I might need to do so is to use the getters from KafkaConfig but they are only 5 getters as far as I can see so don't worth it. I am planning to just remove these getters from KafkaConfig. It seems `org.apache.kafka.raft.KafkaRaftManager needs org.apache.kafka.server.config.KafkaConfig` won't happen if the 5 getters are moved, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
OmniaGM commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075270699 > > If we start to move related kraft classes to raft module like KafkaRaftManager this will be tricky as now org.apache.kafka.raft.KafkaRaftManager needs org.apache.kafka.server.config.KafkaConfig, org.apache.kafka.server.config.KafkaConfig needs org.apache.kafka.raft.RaftConfig and org.apache.kafka.server.BrokerServer needs org.apache.kafka.raft.KafkaRaftManager > > `org.apache.kafka.server.config.KafkaConfig` -> this means `KafkaConfig` will be moved to server module in the future? If so, is it weird to make raft module depend on server module? Please correct me if I misunderstand anything. KafkaConfig will move out of core and into server, however, for the time being I don't plan to make raft depend on server for now as far as I can see the only case I might need to do so is to use the getters from KafkaConfig but they are only 5 getters as far as I can see so don't worth it. I am planning to just remove these getters from KafkaConfig. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]
chia7712 commented on PR #15696: URL: https://github.com/apache/kafka/pull/15696#issuecomment-2075243743 > Thanks for the reply. I have tried the following code and received the results. Each time I run the code, the DescribeTopicsResult gives a different configurations of the brokers in my cluster. That is great. Could you include that in this PR to be the test to prove your fix gets work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1578083329 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1578083329 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1578074445 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1578074445 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
chia7712 commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075158081 > If we start to move related kraft classes to raft module like KafkaRaftManager this will be tricky as now org.apache.kafka.raft.KafkaRaftManager needs org.apache.kafka.server.config.KafkaConfig, org.apache.kafka.server.config.KafkaConfig needs org.apache.kafka.raft.RaftConfig and org.apache.kafka.server.BrokerServer needs org.apache.kafka.raft.KafkaRaftManager `org.apache.kafka.server.config.KafkaConfig` -> this means `KafkaConfig` will be moved to server module in the future? If so, is it weird to make raft module depend on server module? Please correct me if I misunderstand anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840472#comment-17840472 ] Jakub Scholz commented on KAFKA-16606: -- Ok, thanks [~soarez] and [~mimaison]. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Assignee: Igor Soarez >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541) [kafka]
chia7712 merged PR #15792: URL: https://github.com/apache/kafka/pull/15792 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
OmniaGM commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075131518 > just curios. Why moving to raft module can cause circle dependencies? Currently raft doesn't depend of core or server but I just fear that mixing quorum raft and KRaft mode related classes in same module might led to situations where we hit this issue when we start to move things more and more out of core. One example I have in mind is `KafkaRaftManager` - `KafkaConfig` depends on `RaftConfig` for raft configs - `KafkaRaftManager` depend on `KafkaConfig` and `RaftConfig` and it is used by `BrokerServer` and `ControllerServer` - If we start to move related kraft classes to raft module like `KafkaRaftManager` this will be tricky as now `org.apache.kafka.raft.KafkaRaftManager` needs `org.apache.kafka.server.config.KafkaConfig`, `org.apache.kafka.server.config.KafkaConfig` needs `org.apache.kafka.raft.RaftConfig` and `org.apache.kafka.server.BrokerServer` needs `org.apache.kafka.raft.KafkaRaftManager` Also one other thing to notice is at the moment `RaftConfig` signature accept `AbstractConfig` instead of `KafkaConfig` and it redefine the getters for all raft configs instead of using `KafkaConfig.quorum*` methods which at the moment aren't used at all except `KafkaConfig.quorumRequestTimeoutMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541) [kafka]
chia7712 commented on PR #15791: URL: https://github.com/apache/kafka/pull/15791#issuecomment-2075129365 not sure whether we have 3.6.3 release. It seems we don't have `.3` usually :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-16606: - Assignee: Igor Soarez > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Assignee: Igor Soarez >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840469#comment-17840469 ] Igor Soarez commented on KAFKA-16606: - I'm inclined to agree, I think we can still do this. We are already checking that the MV is high enough before allowing migration from JBOD ZK. Even if someone is already running KRaft in JBOD this change could still be useful: Preventing the broker from starting could be disruptive, but that's better than having an outage at a random moment in the future. I'll re-open this and submit a PR. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]
chia7712 merged PR #15793: URL: https://github.com/apache/kafka/pull/15793 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 System test for reverting migration to ZK [kafka]
mumrah commented on code in PR #15754: URL: https://github.com/apache/kafka/pull/15754#discussion_r1577998958 ## tests/kafkatest/tests/core/zookeeper_migration_test.py: ## @@ -86,10 +87,35 @@ def do_migration(self, roll_controller = False, downgrade_to_zk = False): controller.stop_node(node) controller.start_node(node) +if downgrade_to_zk: +self.logger.info("Shutdown brokers to avoid waiting on unclean shutdown") +for node in self.kafka.nodes: +self.kafka.stop_node(node) +metadata_log_dir = KafkaService.METADATA_LOG_DIR + "/__cluster_metadata-0" +for node in self.kafka.nodes: +assert path_exists(node, metadata_log_dir), "Should still have a metadata log on the brokers." + +self.logger.info("Shutdown KRaft quorum") +for node in controller.nodes: +controller.stop_node(node) + +self.logger.info("Deleting controller ZNode") +self.zk.delete(path="/controller", recursive=True) + +self.logger.info("Rolling brokers back to ZK mode") +self.kafka.downgrade_kraft_broker_to_zk(controller) +for node in self.kafka.nodes: +self.kafka.start_node(node) + +# This blocks until all brokers have a full ISR +self.wait_until_rejoin() Review Comment: No, it doesn't require a full shutdown. Let me see if I can fix this here. Good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 System test for reverting migration to ZK [kafka]
mumrah commented on code in PR #15754: URL: https://github.com/apache/kafka/pull/15754#discussion_r1577997204 ## tests/kafkatest/services/kafka/kafka.py: ## @@ -463,6 +463,18 @@ def reconfigure_zk_for_migration(self, kraft_quorum): # This is not added to "advertised.listeners" because of configured_for_zk_migration=True self.port_mappings[kraft_quorum.controller_listener_names] = kraft_quorum.port_mappings.get(kraft_quorum.controller_listener_names) +def downgrade_kraft_broker_to_zk(self, kraft_quorum): +self.configured_for_zk_migration = True +self.quorum_info = quorum.ServiceQuorumInfo(quorum.zk, self) +self.controller_quorum = kraft_quorum + +# Set the migration properties +self.server_prop_overrides.extend([ +["zookeeper.metadata.migration.enable", "true"], +["controller.quorum.voters", kraft_quorum.controller_quorum_voters], +["controller.listener.names", kraft_quorum.controller_listener_names] +]) Review Comment: Well, as long as `process.roles` is not set, a broker is in ZK mode. When reverting back to ZK mode, we can still have the migration configs set. Since the behavior added in KAFKA-16463 is only enabled when `zookeeper.metadata.migration.enable` is set, we include it in this test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15203) Remove dependency on Reflections
[ https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Owen C.H. Leung reassigned KAFKA-15203: --- Assignee: (was: Owen C.H. Leung) > Remove dependency on Reflections > - > > Key: KAFKA-15203 > URL: https://issues.apache.org/jira/browse/KAFKA-15203 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Divij Vaidya >Priority: Major > Labels: newbie > > We currently depend on reflections library which is EOL. Quoting from the > GitHub site: > _> Please note: Reflections library is currently NOT under active development > or maintenance_ > > This poses a supply chain risk for our project where the security fixes and > other major bugs in underlying dependency may not be addressed timely. > Hence, we should plan to remove this dependency. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
chia7712 commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2075082750 > Personally I think the first option (which keep the pr as it is) is easier to navigate as we know where all the config used by Kafka Raft server (maybe renaming this to KafkaRaftServerConfigs instead of KRaftConfigs would be better) are and where all the config used by Quorum Raft. WDYT? I'm ok with it. BTW, `KafkaRaftServerConfigs` is a bit verbose to me. Maybe `KRaftServerConfigs` is good enough as `KRaft` is a term in kafka world. > I am concerned that this will create other circle dependencies later when we move the raft package and KafkaRaftServer out of core just curios. Why moving to raft module can cause circle dependencies? It seems raft module depends on only `clients` and `server-common`. I feel `KafkaRaftServer` won't be in either `clients` or `server-common` in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15203) Remove dependency on Reflections
[ https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Owen C.H. Leung reassigned KAFKA-15203: --- Assignee: Owen C.H. Leung > Remove dependency on Reflections > - > > Key: KAFKA-15203 > URL: https://issues.apache.org/jira/browse/KAFKA-15203 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Divij Vaidya >Assignee: Owen C.H. Leung >Priority: Major > Labels: newbie > > We currently depend on reflections library which is EOL. Quoting from the > GitHub site: > _> Please note: Reflections library is currently NOT under active development > or maintenance_ > > This poses a supply chain risk for our project where the security fixes and > other major bugs in underlying dependency may not be addressed timely. > Hence, we should plan to remove this dependency. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
johnnychhsu commented on PR #15788: URL: https://github.com/apache/kafka/pull/15788#issuecomment-2075021540 thanks for the review and suggestions @chia7712 @OmniaGM let me check and address it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
OmniaGM commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2074952186 > > Note: We have already RaftConfig but it seems to contain limited amount of configs that only configure controller raft and shouldn't include configs shared by both broker/controller in KRAFT mode or only for broker in KRAFT mode. > > What about moving `KRaftConfig` to raft module? and we can moving all configs from `RaftConfig` to `KRaftConfig`. With those changes, all raft-related configs are in `KRaftConfig`. `RaftConfig` can be a POJO. We can move it to raft module but 1. I am concerned that this will create other circle dependencies later when we move the raft package and KafkaRaftServer out of core 2. I don't think we should merge them with RaftConfig as RaftConfig should be separate as it is only for the quorum raft and not KRAFT mode which are a bit different in my option. Most of the configs in KRaftConfigs are used by SharedServer, KafkaRaftServer or BrokerLifeCycle. I think we either keep the pr as proposed or maybe we can have in server-common module the following 1. MetadataLogConfigs for anything with prefix `metadata` 2. KafkaRaftServerConfigs for the rest anything 3. Keep RaftServer separated but move it to server-common Personally I think the first option is easier to navigate as we know where all the config used by Kafka Raft server (maybe renaming this to `KafkaRaftServerConfigs` instead of `KRaftConfigs` would be better) are and where all the config used by Quorum Raft. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15749) KRaft support in KafkaMetricReporterClusterIdTest
[ https://issues.apache.org/jira/browse/KAFKA-15749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840439#comment-17840439 ] Walter Hernandez commented on KAFKA-15749: -- PR is officially stale: [https://github.com/apache/kafka/pull/15181] [~anishlukk123] can I assign this to you? Please review his PR and other Jira tickets with the "KRaft support in" in their name that were resolved for example PR's. > KRaft support in KafkaMetricReporterClusterIdTest > - > > Key: KAFKA-15749 > URL: https://issues.apache.org/jira/browse/KAFKA-15749 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Chirag Wadhwa >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in KafkaMetricReporterClusterIdTest in > core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala > need to be updated to support KRaft > 96 : def testClusterIdPresent(): Unit = { > Scanned 119 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1577874954 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerRunnable.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.common.GroupType.CONSUMER; + +public class ConsumerRunnable implements Runnable { +final String broker; +final String groupId; +final Optional customPropsOpt; +final boolean syncCommit; +final String topic; +final String groupProtocol; +final String assignmentStrategy; +final Optional remoteAssignor; +final Properties props = new Properties(); +volatile boolean isShutdown = false; +KafkaConsumer consumer; + +boolean configured = false; + +public ConsumerRunnable(String broker, +String groupId, +String groupProtocol, +String topic, +String assignmentStrategy, +Optional remoteAssignor, +Optional customPropsOpt, +boolean syncCommit) { +this.broker = broker; +this.groupId = groupId; +this.customPropsOpt = customPropsOpt; +this.syncCommit = syncCommit; + +this.topic = topic; +this.groupProtocol = groupProtocol; +this.assignmentStrategy = assignmentStrategy; +this.remoteAssignor = remoteAssignor; +} + +void configure() { +configured = true; +configure(props); +customPropsOpt.ifPresent(props::putAll); +consumer = new KafkaConsumer<>(props); +} + +void configure(Properties props) { +props.put("bootstrap.servers", broker); +props.put("group.id", groupId); +props.put("key.deserializer", StringDeserializer.class.getName()); +props.put("value.deserializer", StringDeserializer.class.getName()); +props.put(GROUP_PROTOCOL_CONFIG, groupProtocol); +if (Objects.equals(groupProtocol, CONSUMER.toString())) { +remoteAssignor.ifPresent(assignor -> props.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor)); +} else { +props.put("partition.assignment.strategy", assignmentStrategy); +} +} + +void subscribe() { Review Comment: We don't need this method as it is used by `run` only ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerRunnable.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration;
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on PR #15761: URL: https://github.com/apache/kafka/pull/15761#issuecomment-2074923817 @brandboat Could you rebase code to trigger QA again? I run those tests on my local. They pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16592) ConfigKey constructor update can break clients using it
[ https://issues.apache.org/jira/browse/KAFKA-16592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16592. Fix Version/s: 3.8.0 Resolution: Fixed > ConfigKey constructor update can break clients using it > --- > > Key: KAFKA-16592 > URL: https://issues.apache.org/jira/browse/KAFKA-16592 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: clients > Fix For: 3.8.0 > > > In [KAFKA-14957|https://issues.apache.org/jira/browse/KAFKA-14957], the > constructor of ConfigDef.ConfigKey was updated to add a new argument called > {*}alternativeString{*}. As part of the PR, new *define* methods were also > added which makes sense. However, since the constructor of > *ConfigDef.ConfigKey* itself can be used directly by other clients which > import the dependency, this can break all clients who were using the older > constructor w/o the *alternativeString* argument. > I bumped into this when I was testing > the[kafka-connect-redis|[https://github.com/jcustenborder/kafka-connect-redis/tree/master]] > connector. It starts up correctly against the official 3.7 release, but > fails with the following error when run against a 3.8 snapshot > > > {code:java} > Caused by: java.lang.NoSuchMethodError: > org.apache.kafka.common.config.ConfigDef$ConfigKey.(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Ljava/lang/Object;Lorg/apache/kafka/common/config/ConfigDef$Validator;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Z)V > at > com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder.build(ConfigKeyBuilder.java:62) > at > com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig.config(RedisConnectorConfig.java:133) > at > com.github.jcustenborder.kafka.connect.redis.RedisSinkConnectorConfig.config(RedisSinkConnectorConfig.java:46) > at > com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector.config(RedisSinkConnector.java:73) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:538) > at > org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$3(AbstractHerder.java:412) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > > {code} > > The reason for that is that the connector uses another library called > connect-utils which invokes the old constructor > [directly|https://github.com/jcustenborder/connect-utils/blob/master/connect-utils/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/ConfigKeyBuilder.java#L62] > It is not expected for connector invocations to fail across versions so this > would cause confusion. > We could argue that why is the constructor being invoked directly instead of > using the *define* method, but there might be other clients doing the same. > We should add the old constructor back which calls the new one by setting the > *alternativeString* to null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
chia7712 commented on PR #15762: URL: https://github.com/apache/kafka/pull/15762#issuecomment-2074861414 I don't see any related failure, so I'm going to merge it to trunk and branch 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
chia7712 commented on PR #15775: URL: https://github.com/apache/kafka/pull/15775#issuecomment-2074832899 What about moving `KRaftConfig` to raft module? and we can moving all configs from `RaftConfig` to `KRaftConfig`. With those changes, all raft-related configs are in `KRaftConfig`. `RaftConfig` can be a POJO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16613: remove TestUtils#subscribeAndWaitForRecords [kafka]
FrankYang0529 opened a new pull request, #15794: URL: https://github.com/apache/kafka/pull/15794 After https://github.com/apache/kafka/pull/15679, we remove most of usage of `TestUtils#subscribeAndWaitForRecords`. The only remaining case uses it is `PlaintextAdminIntegrationTest#testDeleteConsumerGroupOffsets`. We can also remove it because `Consumer#poll` already has timeout input. We don't need `TestUtils#subscribeAndWaitForRecords` to give another waiting wrapper. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16613) Remove TestUtils#subscribeAndWaitForRecords
PoAn Yang created KAFKA-16613: - Summary: Remove TestUtils#subscribeAndWaitForRecords Key: KAFKA-16613 URL: https://issues.apache.org/jira/browse/KAFKA-16613 Project: Kafka Issue Type: Test Reporter: PoAn Yang Assignee: PoAn Yang After KAFKA-16483, we remove most of usage of TestUtils#subscribeAndWaitForRecords. The only remaining case uses it is PlaintextAdminIntegrationTest#testDeleteConsumerGroupOffsets. We can also remove it because Consumer#poll already has timeout input. We don't need TestUtils#subscribeAndWaitForRecords to give another waiting wrapper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16612) Talking to controllers via AdminClient requires reconfiguring controller listener
[ https://issues.apache.org/jira/browse/KAFKA-16612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840402#comment-17840402 ] Luke Chen commented on KAFKA-16612: --- [~cmccabe] , any thoughts about this? > Talking to controllers via AdminClient requires reconfiguring controller > listener > - > > Key: KAFKA-16612 > URL: https://issues.apache.org/jira/browse/KAFKA-16612 > Project: Kafka > Issue Type: Improvement >Reporter: Gantigmaa Selenge >Priority: Major > > After KIP-919, Kafka controllers register themselves with the active > controller once they start up. This registration includes information about > the endpoints which the controller listener is configured with. This endpoint > is then sent to admin clients (via DescribeClusterResponse) so that clients > send requests to the active controller. If the controller listener is > configured with "CONTROLLER://0.0.0.0:9093" , this will result in admin > clients requests failing (trying to connect to localhost). This was not > clearly stated in the KIP or the documentation. > When clients talking to brokers, advertised.listeners is used, however > advertised.listener is forbidden for controllers. Should we allow > advertised.listeners for controllers so that admin client can use it to talk > to controllers, in the same way it uses it to talk to brokers? Or should the > endpoints provided in controller.quorum.voters, be returned to admin client? > If the intention is to use the regular "listeners" configuration of > controller for clients, this should be clearly documented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16612) Talking to controllers via AdminClient requires reconfiguring controller listener
[ https://issues.apache.org/jira/browse/KAFKA-16612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-16612: - Assignee: (was: Luke Chen) > Talking to controllers via AdminClient requires reconfiguring controller > listener > - > > Key: KAFKA-16612 > URL: https://issues.apache.org/jira/browse/KAFKA-16612 > Project: Kafka > Issue Type: Improvement >Reporter: Gantigmaa Selenge >Priority: Major > > After KIP-919, Kafka controllers register themselves with the active > controller once they start up. This registration includes information about > the endpoints which the controller listener is configured with. This endpoint > is then sent to admin clients (via DescribeClusterResponse) so that clients > send requests to the active controller. If the controller listener is > configured with "CONTROLLER://0.0.0.0:9093" , this will result in admin > clients requests failing (trying to connect to localhost). This was not > clearly stated in the KIP or the documentation. > When clients talking to brokers, advertised.listeners is used, however > advertised.listener is forbidden for controllers. Should we allow > advertised.listeners for controllers so that admin client can use it to talk > to controllers, in the same way it uses it to talk to brokers? Or should the > endpoints provided in controller.quorum.voters, be returned to admin client? > If the intention is to use the regular "listeners" configuration of > controller for clients, this should be clearly documented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16612) Talking to controllers via AdminClient requires reconfiguring controller listener
[ https://issues.apache.org/jira/browse/KAFKA-16612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-16612: - Assignee: Luke Chen > Talking to controllers via AdminClient requires reconfiguring controller > listener > - > > Key: KAFKA-16612 > URL: https://issues.apache.org/jira/browse/KAFKA-16612 > Project: Kafka > Issue Type: Improvement >Reporter: Gantigmaa Selenge >Assignee: Luke Chen >Priority: Major > > After KIP-919, Kafka controllers register themselves with the active > controller once they start up. This registration includes information about > the endpoints which the controller listener is configured with. This endpoint > is then sent to admin clients (via DescribeClusterResponse) so that clients > send requests to the active controller. If the controller listener is > configured with "CONTROLLER://0.0.0.0:9093" , this will result in admin > clients requests failing (trying to connect to localhost). This was not > clearly stated in the KIP or the documentation. > When clients talking to brokers, advertised.listeners is used, however > advertised.listener is forbidden for controllers. Should we allow > advertised.listeners for controllers so that admin client can use it to talk > to controllers, in the same way it uses it to talk to brokers? Or should the > endpoints provided in controller.quorum.voters, be returned to admin client? > If the intention is to use the regular "listeners" configuration of > controller for clients, this should be clearly documented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16612) Talking to controllers via AdminClient requires reconfiguring controller listener
Gantigmaa Selenge created KAFKA-16612: - Summary: Talking to controllers via AdminClient requires reconfiguring controller listener Key: KAFKA-16612 URL: https://issues.apache.org/jira/browse/KAFKA-16612 Project: Kafka Issue Type: Improvement Reporter: Gantigmaa Selenge After KIP-919, Kafka controllers register themselves with the active controller once they start up. This registration includes information about the endpoints which the controller listener is configured with. This endpoint is then sent to admin clients (via DescribeClusterResponse) so that clients send requests to the active controller. If the controller listener is configured with "CONTROLLER://0.0.0.0:9093" , this will result in admin clients requests failing (trying to connect to localhost). This was not clearly stated in the KIP or the documentation. When clients talking to brokers, advertised.listeners is used, however advertised.listener is forbidden for controllers. Should we allow advertised.listeners for controllers so that admin client can use it to talk to controllers, in the same way it uses it to talk to brokers? Or should the endpoints provided in controller.quorum.voters, be returned to admin client? If the intention is to use the regular "listeners" configuration of controller for clients, this should be clearly documented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 merged PR #15679: URL: https://github.com/apache/kafka/pull/15679 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16483) Apply `ClusterTestExtensions` to DeleteOffsetsConsumerGroupCommandIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-16483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16483. Fix Version/s: 3.8.0 Resolution: Fixed > Apply `ClusterTestExtensions` to > DeleteOffsetsConsumerGroupCommandIntegrationTest > - > > Key: KAFKA-16483 > URL: https://issues.apache.org/jira/browse/KAFKA-16483 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.8.0 > > > By using ClusterTestExtensions, > DeleteOffsetsConsumerGroupCommandIntegrationTest get get away from > KafkaServerTestHarness dependency -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16610) Replace "Map#entrySet#forEach" by "Map#forEach"
[ https://issues.apache.org/jira/browse/KAFKA-16610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16610: -- Assignee: TengYao Chi (was: Chia-Ping Tsai) > Replace "Map#entrySet#forEach" by "Map#forEach" > --- > > Key: KAFKA-16610 > URL: https://issues.apache.org/jira/browse/KAFKA-16610 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > {quote} > Targets > Occurrences of 'entrySet().forEach' in Project > Found occurrences in Project (16 usages found) > Unclassified (16 usages found) > kafka.core.main (9 usages found) > kafka.server (4 usages found) > ControllerApis.scala (2 usages found) > ControllerApis (2 usages found) > handleIncrementalAlterConfigs (1 usage found) > 774 controllerResults.entrySet().forEach(entry => > response.responses().add( > handleLegacyAlterConfigs (1 usage found) > 533 controllerResults.entrySet().forEach(entry => > response.responses().add( > ControllerConfigurationValidator.scala (2 usages found) > ControllerConfigurationValidator (2 usages found) > validate (2 usages found) > 99 config.entrySet().forEach(e => { > 114 config.entrySet().forEach(e => > properties.setProperty(e.getKey, e.getValue)) > kafka.server.metadata (5 usages found) > AclPublisher.scala (1 usage found) > AclPublisher (1 usage found) > onMetadataUpdate (1 usage found) > 73 aclsDelta.changes().entrySet().forEach(e => > ClientQuotaMetadataManager.scala (3 usages found) > ClientQuotaMetadataManager (3 usages found) > handleIpQuota (1 usage found) > 119 quotaDelta.changes().entrySet().forEach { e => > update (2 usages found) > 54 quotasDelta.changes().entrySet().forEach { e => > 99 quotaDelta.changes().entrySet().forEach { e => > KRaftMetadataCache.scala (1 usage found) > KRaftMetadataCache (1 usage found) > getClusterMetadata (1 usage found) > 491 topic.partitions().entrySet().forEach { entry > => > kafka.core.test (1 usage found) > unit.kafka.integration (1 usage found) > KafkaServerTestHarness.scala (1 usage found) > KafkaServerTestHarness (1 usage found) > getTopicNames (1 usage found) > 349 > controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach > { > kafka.metadata.main (3 usages found) > org.apache.kafka.controller (2 usages found) > QuorumFeatures.java (1 usage found) > toString() (1 usage found) > 144 localSupportedFeatures.entrySet().forEach(f -> > features.add(f.getKey() + ": " + f.getValue())); > ReplicationControlManager.java (1 usage found) > createTopic(ControllerRequestContext, CreatableTopic, > List, Map, > List, boolean) (1 usage found) > 732 newParts.entrySet().forEach(e -> > assignments.put(e.getKey(), > org.apache.kafka.metadata.properties (1 usage found) > MetaPropertiesEnsemble.java (1 usage found) > toString() (1 usage found) > 610 logDirProps.entrySet().forEach( > kafka.metadata.test (1 usage found) > org.apache.kafka.controller (1 usage found) > ReplicationControlManagerTest.java (1 usage found) > createTestTopic(String, int[][], Map, > short) (1 usage found) > 307 configs.entrySet().forEach(e -> > topic.configs().add( > kafka.streams.main (1 usage found) > org.apache.kafka.streams.processor.internals (1 usage found) > StreamsMetadataState.java (1 usage found) > onChange(Map>, > Map>, Map) (1 > usage found) > 317 topicPartitionInfo.entrySet().forEach(entry -> > this.partitionsByTopic > kafka.tools.main (1 usage found) > org.apache.kafka.tools (1 usage found) > LeaderElectionCommand.java (1 usage found) > electLeaders(Admin, ElectionType, >
[jira] [Commented] (KAFKA-16610) Replace "Map#entrySet#forEach" by "Map#forEach"
[ https://issues.apache.org/jira/browse/KAFKA-16610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840392#comment-17840392 ] TengYao Chi commented on KAFKA-16610: - I can handle this issue :) > Replace "Map#entrySet#forEach" by "Map#forEach" > --- > > Key: KAFKA-16610 > URL: https://issues.apache.org/jira/browse/KAFKA-16610 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > {quote} > Targets > Occurrences of 'entrySet().forEach' in Project > Found occurrences in Project (16 usages found) > Unclassified (16 usages found) > kafka.core.main (9 usages found) > kafka.server (4 usages found) > ControllerApis.scala (2 usages found) > ControllerApis (2 usages found) > handleIncrementalAlterConfigs (1 usage found) > 774 controllerResults.entrySet().forEach(entry => > response.responses().add( > handleLegacyAlterConfigs (1 usage found) > 533 controllerResults.entrySet().forEach(entry => > response.responses().add( > ControllerConfigurationValidator.scala (2 usages found) > ControllerConfigurationValidator (2 usages found) > validate (2 usages found) > 99 config.entrySet().forEach(e => { > 114 config.entrySet().forEach(e => > properties.setProperty(e.getKey, e.getValue)) > kafka.server.metadata (5 usages found) > AclPublisher.scala (1 usage found) > AclPublisher (1 usage found) > onMetadataUpdate (1 usage found) > 73 aclsDelta.changes().entrySet().forEach(e => > ClientQuotaMetadataManager.scala (3 usages found) > ClientQuotaMetadataManager (3 usages found) > handleIpQuota (1 usage found) > 119 quotaDelta.changes().entrySet().forEach { e => > update (2 usages found) > 54 quotasDelta.changes().entrySet().forEach { e => > 99 quotaDelta.changes().entrySet().forEach { e => > KRaftMetadataCache.scala (1 usage found) > KRaftMetadataCache (1 usage found) > getClusterMetadata (1 usage found) > 491 topic.partitions().entrySet().forEach { entry > => > kafka.core.test (1 usage found) > unit.kafka.integration (1 usage found) > KafkaServerTestHarness.scala (1 usage found) > KafkaServerTestHarness (1 usage found) > getTopicNames (1 usage found) > 349 > controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach > { > kafka.metadata.main (3 usages found) > org.apache.kafka.controller (2 usages found) > QuorumFeatures.java (1 usage found) > toString() (1 usage found) > 144 localSupportedFeatures.entrySet().forEach(f -> > features.add(f.getKey() + ": " + f.getValue())); > ReplicationControlManager.java (1 usage found) > createTopic(ControllerRequestContext, CreatableTopic, > List, Map, > List, boolean) (1 usage found) > 732 newParts.entrySet().forEach(e -> > assignments.put(e.getKey(), > org.apache.kafka.metadata.properties (1 usage found) > MetaPropertiesEnsemble.java (1 usage found) > toString() (1 usage found) > 610 logDirProps.entrySet().forEach( > kafka.metadata.test (1 usage found) > org.apache.kafka.controller (1 usage found) > ReplicationControlManagerTest.java (1 usage found) > createTestTopic(String, int[][], Map, > short) (1 usage found) > 307 configs.entrySet().forEach(e -> > topic.configs().add( > kafka.streams.main (1 usage found) > org.apache.kafka.streams.processor.internals (1 usage found) > StreamsMetadataState.java (1 usage found) > onChange(Map>, > Map>, Map) (1 > usage found) > 317 topicPartitionInfo.entrySet().forEach(entry -> > this.partitionsByTopic > kafka.tools.main (1 usage found) > org.apache.kafka.tools (1 usage found) > LeaderElectionCommand.java (1 usage found) > electLeaders(Admin, ElectionType,
[jira] [Created] (KAFKA-16611) Consider adding test name to "client.id" of Admin in testing
Chia-Ping Tsai created KAFKA-16611: -- Summary: Consider adding test name to "client.id" of Admin in testing Key: KAFKA-16611 URL: https://issues.apache.org/jira/browse/KAFKA-16611 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai I observed following errors many times. {quote} org.opentest4j.AssertionFailedError: Found 16 unexpected threads during @BeforeAll: `kafka-admin-client-thread | adminclient-287,kafka-admin-client-thread | adminclient-276,kafka-admin-client-thread | adminclient-271,kafka-admin-client-thread | adminclient-293,kafka-admin-client-thread | adminclient-281,kafka-admin-client-thread | adminclient-302,kafka-admin-client-thread | adminclient-334,kafka-admin-client-thread | adminclient-323,kafka-admin-client-thread | adminclient-257,kafka-admin-client-thread | adminclient-336,kafka-admin-client-thread | adminclient-308,kafka-admin-client-thread | adminclient-263,kafka-admin-client-thread | adminclient-273,kafka-admin-client-thread | adminclient-278,kafka-admin-client-thread | adminclient-283,kafka-admin-client-thread | adminclient-317` ==> expected: but was: {quote} That could be caused by exceptional shutdown. Or we do have resource leaks in some failed tests. Adding the test name to "client.id" can give hints about that -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16610) Replace "Map#entrySet#forEach" by "Map#forEach"
Chia-Ping Tsai created KAFKA-16610: -- Summary: Replace "Map#entrySet#forEach" by "Map#forEach" Key: KAFKA-16610 URL: https://issues.apache.org/jira/browse/KAFKA-16610 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai {quote} Targets Occurrences of 'entrySet().forEach' in Project Found occurrences in Project (16 usages found) Unclassified (16 usages found) kafka.core.main (9 usages found) kafka.server (4 usages found) ControllerApis.scala (2 usages found) ControllerApis (2 usages found) handleIncrementalAlterConfigs (1 usage found) 774 controllerResults.entrySet().forEach(entry => response.responses().add( handleLegacyAlterConfigs (1 usage found) 533 controllerResults.entrySet().forEach(entry => response.responses().add( ControllerConfigurationValidator.scala (2 usages found) ControllerConfigurationValidator (2 usages found) validate (2 usages found) 99 config.entrySet().forEach(e => { 114 config.entrySet().forEach(e => properties.setProperty(e.getKey, e.getValue)) kafka.server.metadata (5 usages found) AclPublisher.scala (1 usage found) AclPublisher (1 usage found) onMetadataUpdate (1 usage found) 73 aclsDelta.changes().entrySet().forEach(e => ClientQuotaMetadataManager.scala (3 usages found) ClientQuotaMetadataManager (3 usages found) handleIpQuota (1 usage found) 119 quotaDelta.changes().entrySet().forEach { e => update (2 usages found) 54 quotasDelta.changes().entrySet().forEach { e => 99 quotaDelta.changes().entrySet().forEach { e => KRaftMetadataCache.scala (1 usage found) KRaftMetadataCache (1 usage found) getClusterMetadata (1 usage found) 491 topic.partitions().entrySet().forEach { entry => kafka.core.test (1 usage found) unit.kafka.integration (1 usage found) KafkaServerTestHarness.scala (1 usage found) KafkaServerTestHarness (1 usage found) getTopicNames (1 usage found) 349 controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach { kafka.metadata.main (3 usages found) org.apache.kafka.controller (2 usages found) QuorumFeatures.java (1 usage found) toString() (1 usage found) 144 localSupportedFeatures.entrySet().forEach(f -> features.add(f.getKey() + ": " + f.getValue())); ReplicationControlManager.java (1 usage found) createTopic(ControllerRequestContext, CreatableTopic, List, Map, List, boolean) (1 usage found) 732 newParts.entrySet().forEach(e -> assignments.put(e.getKey(), org.apache.kafka.metadata.properties (1 usage found) MetaPropertiesEnsemble.java (1 usage found) toString() (1 usage found) 610 logDirProps.entrySet().forEach( kafka.metadata.test (1 usage found) org.apache.kafka.controller (1 usage found) ReplicationControlManagerTest.java (1 usage found) createTestTopic(String, int[][], Map, short) (1 usage found) 307 configs.entrySet().forEach(e -> topic.configs().add( kafka.streams.main (1 usage found) org.apache.kafka.streams.processor.internals (1 usage found) StreamsMetadataState.java (1 usage found) onChange(Map>, Map>, Map) (1 usage found) 317 topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic kafka.tools.main (1 usage found) org.apache.kafka.tools (1 usage found) LeaderElectionCommand.java (1 usage found) electLeaders(Admin, ElectionType, Optional>) (1 usage found) 178 failed.entrySet().forEach(entry -> { {quote} origin discussion: [https://github.com/apache/kafka/pull/15786#discussion_r1577656938] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Various cleanups in core [kafka]
chia7712 commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1577684664 ## core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala: ## @@ -347,7 +347,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { if (isKRaftTest()) { val result = new util.HashMap[Uuid, String]() controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach { Review Comment: I file https://issues.apache.org/jira/browse/KAFKA-16610 to trace it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org