[jira] [Created] (KAFKA-16153) kraft_upgrade_test system test is broken
Mickael Maison created KAFKA-16153: -- Summary: kraft_upgrade_test system test is broken Key: KAFKA-16153 URL: https://issues.apache.org/jira/browse/KAFKA-16153 Project: Kafka Issue Type: Bug Components: system tests Reporter: Mickael Maison I get the following failure from all `from_kafka_version` versions: Command '/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server ducker05:9092,ducker06:9092,ducker07:9092 upgrade --metadata 3.8' returned non-zero exit status 1. Remote error message: b'SLF4J: Class path contains multiple SLF4J bindings.\nSLF4J: Found binding in [jar:file:/opt/kafka-dev/tools/build/dependant-libs-2.13.12/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J: Found binding in [jar:file:/opt/kafka-dev/trogdor/build/dependant-libs-2.13.12/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\nSLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]\nUnsupported metadata version 3.8. Supported metadata versions are 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 3.7-IV4, 3.8-IV0\n' -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]
mimaison commented on code in PR #15159: URL: https://github.com/apache/kafka/pull/15159#discussion_r1454953584 ## checkstyle/import-control-core.xml: ## @@ -94,6 +94,7 @@ + Review Comment: I think we can remove that line too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]
satishd closed pull request #15209: MINOR Removed unused CommittedOffsetsFile class. URL: https://github.com/apache/kafka/pull/15209 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
AndrewJSchofield commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1455084554 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { assertPoll(1, commitRequestManger); } +@Test Review Comment: I think they should. Looks to me like the legacy consumer called the interceptors when closing the consumer coordinator (`ConsumerCoordinator.maybeAutoCommitOffsetsSync`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-4759) Add support for subnet masks in SimpleACLAuthorizer
[ https://issues.apache.org/jira/browse/KAFKA-4759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807680#comment-17807680 ] Szymon Scharmach commented on KAFKA-4759: - Updated PR is ready: [https://github.com/apache/kafka/pull/9937] Some tests are failing but it seems that failures are related to other issues present on trunk as well. > Add support for subnet masks in SimpleACLAuthorizer > --- > > Key: KAFKA-4759 > URL: https://issues.apache.org/jira/browse/KAFKA-4759 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Shun Takebayashi >Assignee: Shun Takebayashi >Priority: Major > > SimpleACLAuthorizer currently accepts only single IP addresses. > Supporting subnet masks with SimpleACLAuthorizer can make ACL configurations > simpler. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]
DL1231 opened a new pull request, #15211: URL: https://github.com/apache/kafka/pull/15211 While using —list —state the current accepted values correspond to the classic group type states. This PR include support for the new group type states. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]
DL1231 commented on PR #15211: URL: https://github.com/apache/kafka/pull/15211#issuecomment-1895572781 @rreddy-22 PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16147) Partition is assigned to two members at the same time
[ https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-16147: --- Assignee: David Jacot > Partition is assigned to two members at the same time > - > > Key: KAFKA-16147 > URL: https://issues.apache.org/jira/browse/KAFKA-16147 > Project: Kafka > Issue Type: Sub-task >Reporter: Emanuele Sabellico >Assignee: David Jacot >Priority: Major > Attachments: broker1.log, broker2.log, broker3.log, librdkafka.log, > server.properties, server1.properties, server2.properties > > > While running [test 0113 of > librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384], > subtest _u_multiple_subscription_changes_ have received this error saying > that a partition is assigned to two members at the same time. > {code:java} > Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] > which is already assigned to consumer C_5#consumer-8 {code} > I've reconstructed this sequence: > C_5 SUBSCRIBES TO T1 > {noformat} > %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id > "(null)", current assignment "", subscribe topics > "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat} > C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12 > {noformat} > [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, > targetMemberEpoch=6, state=assigning, assignedPartitions={}, > partitionsPendingRevocation={}, > partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}). > (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat} > > C_5 RECEIVES TARGET ASSIGNMENT > {noformat} > %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 ACKS TARGET ASSIGNMENT > {noformat} > %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment > "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", > subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are > pending > {noformat} > %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment "NULL", subscribe topics > "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], > rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated > its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, > rdkafkatest_rnd5a91902462d61c2e_0113u_1]. > (org.apache.kafka.coordinator.group.GroupMetadataManager) > [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=16, state=revoking, assignedPartitions={}, > partitionsPendingRevocation={IKXGrFR1
[PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]
dajac opened a new pull request, #15212: URL: https://github.com/apache/kafka/pull/15212 We had a case where a partition got assigned to two members and we found a bug in the partition epochs bookkeeping. Basically, when a member has a partition pending revocation re-assigned to him before the revocation is completed, the partition epoch is lost. Here is an example of such transition: ``` [2024-01-16 12:10:52,613] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member M2 transitioned from CurrentAssignment(memberEpoch=11, previousMemberEpoch=9, targetMemberEpoch=14, state=revoking, assignedPartitions={}, partitionsPendingRevocation={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[0, 5]}) to CurrentAssignment(memberEpoch=15, previousMemberEpoch=11, targetMemberEpoch=15, state=stable, assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}). (org.apache.kafka.coordinator.group.GroupMetadataManager) ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
DL1231 commented on PR #15067: URL: https://github.com/apache/kafka/pull/15067#issuecomment-1895664514 @dajac, PTAL, thanks in advance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16154) Make broker changes to return an offset for LATEST_TIERED_TIMESTAMP
Christo Lolov created KAFKA-16154: - Summary: Make broker changes to return an offset for LATEST_TIERED_TIMESTAMP Key: KAFKA-16154 URL: https://issues.apache.org/jira/browse/KAFKA-16154 Project: Kafka Issue Type: Sub-task Reporter: Christo Lolov Assignee: Christo Lolov Fix For: 3.8.0 A broker should start returning offsets when given a timestamp of -5, which signifies a LATEST_TIERED_TIMESTAMP. There are 3 cases. Tiered Storage is not enabled. In such a situation asking for LATEST_TIERED_TIMESTAMP should always return no offset. Tiered Storage is enabled and there is nothing in remote storage. In such a situation the offset returned should be 0. Tiered Storage is enabled and there is something in remote storage. In such a situation the offset returned should be the highest offset the broker is aware of. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov opened a new pull request, #15213: URL: https://github.com/apache/kafka/pull/15213 ### Summary This is the first part of the implementation of [KIP-1005](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1005%3A+Expose+EarliestLocalOffset+and+TieredOffset) The purpose of this pull request is for the broker to start returning the correct offset when it receives a -5 as a timestamp in a ListOffsets API request -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1455525955 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() -val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { +val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() + if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() }) -val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) -else Optional.empty[Integer]() - Review Comment: I didn't really see a point in this check `earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset`. As far as I can tell the `cache.epochForOffset` already carries it out. Let me know in case I have misunderstood something. ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -2126,6 +2126,94 @@ class UnifiedLogTest { log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } + @Test + def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = { +val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) +val log = createLog(logDir, logConfig) + +assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + +val firstTimestamp = mockTime.milliseconds +val leaderEpoch = 0 +log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + +val secondTimestamp = firstTimestamp + 1 +log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = secondTimestamp), + leaderEpoch = leaderEpoch) + +log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + +assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + } + + @Test + def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = { Review Comment: This test could be combined with `testFetchOffsetByTimestampFromRemoteStorage` as the only difference it has are lines 2167, 2193, 2203 and 2204. Let me know your thoughts! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16155) Investigate testAutoCommitIntercept
Lucas Brutschy created KAFKA-16155: -- Summary: Investigate testAutoCommitIntercept Key: KAFKA-16155 URL: https://issues.apache.org/jira/browse/KAFKA-16155 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Lucas Brutschy Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept flakes on the the initial setup (before using interceptors, so interceptors are unrelated here, except for being used later in the test). The problem is that we are seeking two topic partitions to offset 10 and 20, respectively, but when we commit, we seem to have lost one of the offsets, likely due to a race condition. When I output `subscriptionState.allConsumed` repeatedly, I get this output: {code:java} allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: null)], epoch=0}} seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: null)], epoch=0}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} {code} So we after we seek to 10 / 20, we lose one of the offsets, maybe because we haven't reconciled the assignment yet. Later, we get the second topic partition assigned, but the offset is initialized to 0. We should investigate whether this can be made more like the behavior in the original consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16155) Investigate testAutoCommitIntercept
[ https://issues.apache.org/jira/browse/KAFKA-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16155: --- Description: Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept flakes on the the initial setup (before using interceptors, so interceptors are unrelated here, except for being used later in the test). The problem is that we are seeking two topic partitions to offset 10 and 20, respectively, but when we commit, we seem to have lost one of the offsets, likely due to a race condition. When I output `subscriptionState.allConsumed` repeatedly, I get this output: {noformat} allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: null)], epoch=0}} seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: null)], epoch=0}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} {noformat} So we after we seek to 10 / 20, we lose one of the offsets, maybe because we haven't reconciled the assignment yet. Later, we get the second topic partition assigned, but the offset is initialized to 0. We should investigate whether this can be made more like the behavior in the original consumer. was: Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept flakes on the the initial setup (before using interceptors, so interceptors are unrelated here, except for being used later in the test). The problem is that we are seeking two topic partitions to offset 10 and 20, respectively, but when we commit, we seem to have lost one of the offsets, likely due to a race condition. When I output `subscriptionState.allConsumed` repeatedly, I get this output: {code:java} allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: null)], epoch=0}} seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: null)], epoch=0}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} {code} So we after we seek to 10 / 20, we lose one of the offsets, maybe because we haven't reconciled the assignment yet. Later, we get the second topic partition assigned, but the offset is initialized to 0. We should investigate whether this can be made more like the behavior in the original consumer. > Investigate testAutoCommitIntercept > --- > > Key: KAFKA-16155 > URL: https://issues.apache.org/jira/browse/KAFKA-16155 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor > > Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept > flakes on the the initial setup (before using interceptors, so interceptors > are unrelated here, except for being used later in the test). > The problem is that we are seeking two topic partitions to offset 10 and 20, > respectively, but when we commit, we seem to have lost one of the offsets, > likely due to a race condition. > When I output `subscriptionState.allConsumed` repeatedly, I get this output: > {noformat} > allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, > metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, > metadata=''}} > seeking to
Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]
OmniaGM commented on code in PR #15159: URL: https://github.com/apache/kafka/pull/15159#discussion_r1455645909 ## checkstyle/import-control-core.xml: ## @@ -94,6 +94,7 @@ + Review Comment: remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1455660640 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { assertPoll(1, commitRequestManger); } +@Test Review Comment: Yes, they should. This is tested in the integration test `PlaintextConsumer.testAutoCommitIntercept` - which works in terms of interceptors, but I have to keep disabled in this PR because of KAFKA-16155. This PR does call the interceptors after closing the network thread (I pinged you about it above). I can add a little unit test to `AsyncKafkaConsumerTest`. I don't think we can add a unit test for it in `CommitRequestManagerTest`, because the autocommit on close is triggered from the application thread, and the interceptors are run from the application thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1455670090 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -93,9 +94,11 @@ public CommitRequestManager( final SubscriptionState subscriptions, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, +final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, Review Comment: This is very mixed across the clients codebase. Sometimes you put the final sometimes you don't. In the streams module there is a strict rule to do. Not sure, but as long as there is no guideline around this, and we are not completely repulsed by it, I'd suggest to just stick with whatever the existing code is doing for consistency to not mess up git blame too much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1455660640 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { assertPoll(1, commitRequestManger); } +@Test Review Comment: Yes, they should. This is tested in the integration test `PlaintextConsumer.testAutoCommitIntercept` - which works in terms of interceptors, but I have to keep disabled in this PR because of KAFKA-16155. This PR does call the interceptors after closing the network thread (I pinged you about it above). I can add a little unit test to `AsyncKafkaConsumerTest`. I don't think we can add a unit test for it in `CommitRequestManagerTest`, because the autocommit on close is triggered from the application thread, and the interceptors are run from the application thread, so in this class it does look very much like any normal commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1455660640 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { assertPoll(1, commitRequestManger); } +@Test Review Comment: Yes, they should. This is tested in the integration test `PlaintextConsumer.testAutoCommitIntercept` - which works in terms of interceptors, but I have to keep disabled in this PR because of KAFKA-16155. This PR does call the interceptors after closing the network thread (I pinged you about it above). I can add a little unit test to `AsyncKafkaConsumerTest`. I don't think we can add a unit test for it in `CommitRequestManagerTest`, because the autocommit on close is triggered from the application thread, so in this class it does look very much like any normal commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]
pprovenzano commented on code in PR #15197: URL: https://github.com/apache/kafka/pull/15197#discussion_r1455684322 ## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ## @@ -46,6 +46,18 @@ public final class FeaturesImage { ZkMigrationState.NONE ); +public static final FeaturesImage LATEST = new FeaturesImage( Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16097: Disable state updater in trunk (#15146) [kafka]
lucasbru merged PR #15204: URL: https://github.com/apache/kafka/pull/15204 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16156) System test failing on endOffsets with negative timestamps
Lianet Magrans created KAFKA-16156: -- Summary: System test failing on endOffsets with negative timestamps Key: KAFKA-16156 URL: https://issues.apache.org/jira/browse/KAFKA-16156 Project: Kafka Issue Type: Sub-task Components: clients Reporter: Lianet Magrans TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid negative timestamp". Trace: [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event loop (org.apache.kafka.tools.TransactionalMessageCopier) org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Invalid negative timestamp at org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) at org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) at org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) at org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) at org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) at org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) at org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) at org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16156) System test failing on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16156: --- Description: TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid negative timestamp". Trace: [2024-01-15 07:42:33,932] TRACE [Consumer clientId=consumer-transactions-test-consumer-group-1, groupId=transactions-test-consumer-group] Received ListOffsetResponse ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='input-topic', partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from broker worker2:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) [2024-01-15 07:42:33,932] DEBUG [Consumer clientId=consumer-transactions-test-consumer-group-1, groupId=transactions-test-consumer-group] Handling ListOffsetResponse response for input-topic-0. Fetched offset 42804, timestamp -1 (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) [2024-01-15 07:42:33,932] TRACE [Consumer clientId=consumer-transactions-test-consumer-group-1, groupId=transactions-test-consumer-group] Updating last stable offset for partition input-topic-0 to 42804 (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) [2024-01-15 07:42:33,933] DEBUG [Consumer clientId=consumer-transactions-test-consumer-group-1, groupId=transactions-test-consumer-group] Fetch offsets completed successfully for partitions and timestamps {input-topic-0=-1}. Result org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) [2024-01-15 07:42:33,933] TRACE [Consumer clientId=consumer-transactions-test-consumer-group-1, groupId=transactions-test-consumer-group] No events to process (org.apache.kafka.clients.consumer.internals.events.EventProcessor) [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event loop (org.apache.kafka.tools.TransactionalMessageCopier) org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Invalid negative timestamp at org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) at org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) at org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) at org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) at org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) at org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) at org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) at org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp at org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) at org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) at org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetsRequests$3(OffsetsRequestManager.java:305) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.kafka.clients.consumer.internals.OffsetsRequestManager$MultiNodeRequest.addPartialResult(OffsetsRequestMana
[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16156: --- Summary: System test failing for new consumer on endOffsets with negative timestamps (was: System test failing on endOffsets with negative timestamps) > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Lianet Magrans >Priority: Major > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(Comple
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1455787900 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -844,6 +849,54 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } +@Test Review Comment: Added a test. Interceptors are called. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.FencedInstanceIdException; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback} amd + * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}s. This is + * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ +public class OffsetCommitCallbackInvoker { +private final ConsumerInterceptors interceptors; + +OffsetCommitCallbackInvoker(ConsumerInterceptors interceptors) { +this.interceptors = interceptors; +} + +// Thread-safe queue to store user-defined callbacks and interceptors to be executed +private final BlockingQueue callbackQueue = new LinkedBlockingQueue<>(); + +public void submitCommitInterceptors(final Map offsets) { +if (!interceptors.isEmpty()) { +callbackQueue.add(new OffsetCommitCallbackTask( +(innerOffsets, exception) -> interceptors.onCommit(innerOffsets), +offsets, +null +)); +} +} + +public void submitUserCallback(final OffsetCommitCallback callback, + final Map offsets, + final Exception exception) { +callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, exception)); +} + +/** + * @return true if an offset commit was fenced. + */ +public boolean executeCallbacks() { Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -93,9 +94,11 @@ public CommitRequestManager( final SubscriptionState subscriptions, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, +final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, final String groupId, final Optional groupInstanceId) { -this(time, logContext, subscriptions, config, coordinatorRequestManager, groupId, +this(time, logContext, subscriptions, config, coordinatorRequestManager, Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1455790243 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1902,65 +1912,14 @@ private void maybeThrowFencedInstanceException() { } private void maybeInvokeCommitCallbacks() { -if (callbacks() > 0) { -invoker.executeCallbacks(); +if (offsetCommitCallbackInvoker.executeCallbacks()) { Review Comment: Good point, done. I could actually simplify the code by moving `isFenced` inside the invoker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807767#comment-17807767 ] Lianet Magrans commented on KAFKA-16134: Might be related > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > -- > > Key: KAFKA-16134 > URL: https://issues.apache.org/jira/browse/KAFKA-16134 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Stanislav Kozlovski >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The following test is very flaky. It failed 3 times consecutively in Jenkins > runs for the 3.7 release candidate. > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807767#comment-17807767 ] Lianet Magrans edited comment on KAFKA-16134 at 1/17/24 3:21 PM: - Might be related/due to https://issues.apache.org/jira/browse/KAFKA-16107 was (Author: JIRAUSER300183): Might be related > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > -- > > Key: KAFKA-16134 > URL: https://issues.apache.org/jira/browse/KAFKA-16134 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Stanislav Kozlovski >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The following test is very flaky. It failed 3 times consecutively in Jenkins > runs for the 3.7 release candidate. > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807772#comment-17807772 ] Lianet Magrans commented on KAFKA-16150: [~kirktrue] this seems the same as https://issues.apache.org/jira/browse/KAFKA-16134 reported as part of the 3.7 RC validation. > Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe > > > Key: KAFKA-16150 > URL: https://issues.apache.org/jira/browse/KAFKA-16150 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807773#comment-17807773 ] Lianet Magrans commented on KAFKA-16151: [~kirktrue] isn't this the same as https://issues.apache.org/jira/browse/KAFKA-16135? > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16137: Add missing RPC field descriptions [kafka]
AndrewJSchofield opened a new pull request, #15214: URL: https://github.com/apache/kafka/pull/15214 The `ListClientMetricsResourcesResponse` definition is missing several `"about"` descriptions. The main effect of this is that the Kafka protocol documentation misses the descriptions of these fields which are blank. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15538: --- Description: When using subscribe with a java regex (Pattern), we need to resolve it on the client side to send the broker a list of topic names to subscribe to. Context: The new consumer group protocol uses [Google RE2/J|https://github.com/google/re2j] for regular expressions and introduces new methods in the consumer API to subscribe using a `SubscribePattern`. The subscribe using a java `Pattern` will be still supported for a while but eventually removed. * When the subscribe with SubscriptionPattern is used, the client should just send the regex to the broker and it will be resolved on the server side. * In the case of the subscribe with Pattern, the regex should be resolved on the client side. As part of this task, we should re-enable all integration tests defined in the PlainTextAsyncConsumer that relate to subscription with pattern and that are currently disabled for the new consumer + new protocol was: When using subscribe with a java regex (Pattern), we need to resolve it on the client side to send the broker a list of topic names to subscribe to. Context: The new consumer group protocol uses [Google RE2/J|https://github.com/google/re2j] for regular expressions and introduces new methods in the consumer API to subscribe using a `SubscribePattern`. The subscribe using a java `Pattern` will be still supported for a while but eventually removed. * When the subscribe with SubscriptionPattern is used, the client should just send the regex to the broker and it will be resolved on the server side. * In the case of the subscribe with Pattern, the regex should be resolved on the client side. > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807783#comment-17807783 ] Lianet Magrans commented on KAFKA-15538: Hey [~phuctran], you're right that the intention is that this section [here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537] where the regex is sent in the HB request, should apply only to the SubscriptionPattern, but that's not what the current implementation achieves. With the current code, it would apply the section also to the Pattern, because the check is done based on #hasPatternSubscription (the moment we call [subscribe(Pattern..)|https://github.com/apache/kafka/blob/26465c64092868c972e2a0e4d9a4fc0ed13a7a39/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1719] the subscription state will have subscription type AUTO_PATTERN so #hasPatternSubscription will be true). > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16104: Enable additional PlaintextConsumerTest tests for new consumer [kafka]
kirktrue closed pull request #15206: KAFKA-16104: Enable additional PlaintextConsumerTest tests for new consumer URL: https://github.com/apache/kafka/pull/15206 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16104: Enable additional PlaintextConsumerTest tests for new consumer [kafka]
kirktrue commented on PR #15206: URL: https://github.com/apache/kafka/pull/15206#issuecomment-1896159822 Closing and reopening to kick off another test run -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly
Gaurav Narula created KAFKA-16157: - Summary: Topic recreation with offline disk doesn't update leadership/shrink ISR correctly Key: KAFKA-16157 URL: https://issues.apache.org/jira/browse/KAFKA-16157 Project: Kafka Issue Type: Bug Components: jbod, kraft Affects Versions: 3.7.1 Reporter: Gaurav Narula In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` in each broker, we perform the following operations: # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`. # Start a producer in the background to produce to `foo.test`. # Break disk `d1` in `broker-1`. We simulate this by marking the log dir read-only. # Delete topic `foo.test` # Recreate topic `foo.test`. Let's assume the topic was created with id `bgdrsv-1QjCLFEqLOzVCHg`. # Wait for 5 minutes # Describe the recreated topic `foo.test`. We observe that `broker-1` is the leader and in-sync for few partitions {code:java} Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10 ReplicationFactor: 4 Configs: min.insync.replicas=1,unclean.leader.election.enable=false Topic: foo.test Partition: 0 Leader: 101 Replicas: 101,102,103,104 Isr: 101,102,103,104 Topic: foo.test Partition: 1 Leader: 102 Replicas: 102,103,104,101 Isr: 102,103,104 Topic: foo.test Partition: 2 Leader: 103 Replicas: 103,104,101,102 Isr: 103,104,102 Topic: foo.test Partition: 3 Leader: 104 Replicas: 104,101,102,103 Isr: 104,102,103 Topic: foo.test Partition: 4 Leader: 104 Replicas: 104,102,101,103 Isr: 104,102,103 Topic: foo.test Partition: 5 Leader: 102 Replicas: 102,101,103,104 Isr: 102,103,104 Topic: foo.test Partition: 6 Leader: 101 Replicas: 101,103,104,102 Isr: 101,103,104,102 Topic: foo.test Partition: 7 Leader: 103 Replicas: 103,104,102,101 Isr: 103,104,102 Topic: foo.test Partition: 8 Leader: 101 Replicas: 101,102,104,103 Isr: 101,102,104,103 Topic: foo.test Partition: 9 Leader: 102 Replicas: 102,104,103,101 Isr: 102,104,103 {code} In this example, it is the leader of partitions `0, 6 and 8`. Consider `foo.test-8`. It is present in the following brokers/disks: {code:java} $ fd foo.test-8 broker-1/d1/foo.test-8/ broker-2/d2/foo.test-8/ broker-3/d2/foo.test-8/ broker-4/d1/foo.test-8/{code} `broker-1/d1` still refers to the topic id which is pending deletion because the log dir is marked offline. {code:java} $ cat broker-1/d1/foo.test-8/partition.metadata version: 0 topic_id: rAujIqcjRbu_-E4UxgQT8Q{code} However, other brokers have the correct topic-id {code:java} $ cat broker-2/d2/foo.test-8/partition.metadata version: 0 topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code} Now, let's consider `foo.test-0`. We observe that the replica isn't present in `broker-1`: {code:java} $ fd foo.test-0 broker-2/d1/foo.test-0/ broker-3/d1/foo.test-0/ broker-4/d2/foo.test-0/{code} In both cases, `broker-1` shouldn't be the leader or in-sync replica for the partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly
[ https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula updated KAFKA-16157: -- Attachment: broker.log broker.log.1 broker.log.2 broker.log.3 broker.log.4 broker.log.5 broker.log.6 broker.log.7 broker.log.8 broker.log.9 broker.log.10 > Topic recreation with offline disk doesn't update leadership/shrink ISR > correctly > - > > Key: KAFKA-16157 > URL: https://issues.apache.org/jira/browse/KAFKA-16157 > Project: Kafka > Issue Type: Bug > Components: jbod, kraft >Affects Versions: 3.7.1 >Reporter: Gaurav Narula >Priority: Major > Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, > broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, > broker.log.8, broker.log.9 > > > In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` > in each broker, we perform the following operations: > > # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the > topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`. > # Start a producer in the background to produce to `foo.test`. > # Break disk `d1` in `broker-1`. We simulate this by marking the log dir > read-only. > # Delete topic `foo.test` > # Recreate topic `foo.test`. Let's assume the topic was created with id > `bgdrsv-1QjCLFEqLOzVCHg`. > # Wait for 5 minutes > # Describe the recreated topic `foo.test`. > > We observe that `broker-1` is the leader and in-sync for few partitions > > > {code:java} > > Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10 > ReplicationFactor: 4 Configs: > min.insync.replicas=1,unclean.leader.election.enable=false > Topic: foo.test Partition: 0 Leader: 101 Replicas: > 101,102,103,104 Isr: 101,102,103,104 > Topic: foo.test Partition: 1 Leader: 102 Replicas: > 102,103,104,101 Isr: 102,103,104 > Topic: foo.test Partition: 2 Leader: 103 Replicas: > 103,104,101,102 Isr: 103,104,102 > Topic: foo.test Partition: 3 Leader: 104 Replicas: > 104,101,102,103 Isr: 104,102,103 > Topic: foo.test Partition: 4 Leader: 104 Replicas: > 104,102,101,103 Isr: 104,102,103 > Topic: foo.test Partition: 5 Leader: 102 Replicas: > 102,101,103,104 Isr: 102,103,104 > Topic: foo.test Partition: 6 Leader: 101 Replicas: > 101,103,104,102 Isr: 101,103,104,102 > Topic: foo.test Partition: 7 Leader: 103 Replicas: > 103,104,102,101 Isr: 103,104,102 > Topic: foo.test Partition: 8 Leader: 101 Replicas: > 101,102,104,103 Isr: 101,102,104,103 > Topic: foo.test Partition: 9 Leader: 102 Replicas: > 102,104,103,101 Isr: 102,104,103 > {code} > > > In this example, it is the leader of partitions `0, 6 and 8`. > > Consider `foo.test-8`. It is present in the following brokers/disks: > > > {code:java} > $ fd foo.test-8 > broker-1/d1/foo.test-8/ > broker-2/d2/foo.test-8/ > broker-3/d2/foo.test-8/ > broker-4/d1/foo.test-8/{code} > > > `broker-1/d1` still refers to the topic id which is pending deletion because > the log dir is marked offline. > > > {code:java} > $ cat broker-1/d1/foo.test-8/partition.metadata > version: 0 > topic_id: rAujIqcjRbu_-E4UxgQT8Q{code} > > > However, other brokers have the correct topic-id > > > {code:java} > $ cat broker-2/d2/foo.test-8/partition.metadata > version: 0 > topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code} > > > Now, let's consider `foo.test-0`. We observe that the replica isn't present > in `broker-1`: > {code:java} > $ fd foo.test-0 > broker-2/d1/foo.test-0/ > broker-3/d1/foo.test-0/ > broker-4/d2/foo.test-0/{code} > In both cases, `broker-1` shouldn't be the leader or in-sync replica for the > partitions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
mimaison commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456011084 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/PartitionAssignmentState.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import org.apache.kafka.common.Node; + +import java.util.Optional; + +class PartitionAssignmentState { +public final String group; +public final Optional coordinator; +public final Optional topic; +public final Optional partition; +public final Optional offset; +public final Optional lag; +public final Optional consumerId; +public final Optional host; +public final Optional clientId; +public final Optional logEndOffset; + +public PartitionAssignmentState(String group, Optional coordinator, Optional topic, +Optional partition, Optional offset, Optional lag, +Optional consumerId, Optional host, Optional clientId, +Optional logEndOffset) { +this.group = group; +this.coordinator = coordinator; +this.topic = topic; +this.partition = partition; +this.offset = offset; +this.lag = lag; +this.consumerId = consumerId; +this.host = host; +this.clientId = clientId; +this.logEndOffset = logEndOffset; +} +} Review Comment: Nit: Can we add a newline? ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ConsumerGroupCommandOptions extends CommandDefaultOptions { +public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class); + +public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; +public static final String GROUP_DOC = "The consumer group we wish to act on."; +public static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + +"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + +"Reset-offsets also supports multiple topic inputs."; +public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; +public static final String LIST_DOC = "List all consumer groups."; +public static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."; +public static final String ALL_GROUPS_DOC = "Apply to all consumer groups."; +public static final String NL = System.lineSeparator(); +public static final String DELETE_DOC = "Pass in groups to delete topic part
[jira] [Commented] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly
[ https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807814#comment-17807814 ] Mickael Maison commented on KAFKA-16157: Did you hit this issue while testing 3.7.0 RC2 or with trunk? > Topic recreation with offline disk doesn't update leadership/shrink ISR > correctly > - > > Key: KAFKA-16157 > URL: https://issues.apache.org/jira/browse/KAFKA-16157 > Project: Kafka > Issue Type: Bug > Components: jbod, kraft >Affects Versions: 3.7.1 >Reporter: Gaurav Narula >Priority: Major > Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, > broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, > broker.log.8, broker.log.9 > > > In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` > in each broker, we perform the following operations: > > # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the > topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`. > # Start a producer in the background to produce to `foo.test`. > # Break disk `d1` in `broker-1`. We simulate this by marking the log dir > read-only. > # Delete topic `foo.test` > # Recreate topic `foo.test`. Let's assume the topic was created with id > `bgdrsv-1QjCLFEqLOzVCHg`. > # Wait for 5 minutes > # Describe the recreated topic `foo.test`. > > We observe that `broker-1` is the leader and in-sync for few partitions > > > {code:java} > > Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10 > ReplicationFactor: 4 Configs: > min.insync.replicas=1,unclean.leader.election.enable=false > Topic: foo.test Partition: 0 Leader: 101 Replicas: > 101,102,103,104 Isr: 101,102,103,104 > Topic: foo.test Partition: 1 Leader: 102 Replicas: > 102,103,104,101 Isr: 102,103,104 > Topic: foo.test Partition: 2 Leader: 103 Replicas: > 103,104,101,102 Isr: 103,104,102 > Topic: foo.test Partition: 3 Leader: 104 Replicas: > 104,101,102,103 Isr: 104,102,103 > Topic: foo.test Partition: 4 Leader: 104 Replicas: > 104,102,101,103 Isr: 104,102,103 > Topic: foo.test Partition: 5 Leader: 102 Replicas: > 102,101,103,104 Isr: 102,103,104 > Topic: foo.test Partition: 6 Leader: 101 Replicas: > 101,103,104,102 Isr: 101,103,104,102 > Topic: foo.test Partition: 7 Leader: 103 Replicas: > 103,104,102,101 Isr: 103,104,102 > Topic: foo.test Partition: 8 Leader: 101 Replicas: > 101,102,104,103 Isr: 101,102,104,103 > Topic: foo.test Partition: 9 Leader: 102 Replicas: > 102,104,103,101 Isr: 102,104,103 > {code} > > > In this example, it is the leader of partitions `0, 6 and 8`. > > Consider `foo.test-8`. It is present in the following brokers/disks: > > > {code:java} > $ fd foo.test-8 > broker-1/d1/foo.test-8/ > broker-2/d2/foo.test-8/ > broker-3/d2/foo.test-8/ > broker-4/d1/foo.test-8/{code} > > > `broker-1/d1` still refers to the topic id which is pending deletion because > the log dir is marked offline. > > > {code:java} > $ cat broker-1/d1/foo.test-8/partition.metadata > version: 0 > topic_id: rAujIqcjRbu_-E4UxgQT8Q{code} > > > However, other brokers have the correct topic-id > > > {code:java} > $ cat broker-2/d2/foo.test-8/partition.metadata > version: 0 > topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code} > > > Now, let's consider `foo.test-0`. We observe that the replica isn't present > in `broker-1`: > {code:java} > $ fd foo.test-0 > broker-2/d1/foo.test-0/ > broker-3/d1/foo.test-0/ > broker-4/d2/foo.test-0/{code} > In both cases, `broker-1` shouldn't be the leader or in-sync replica for the > partitions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1456105594 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1336,8 +1336,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { MockProducerInterceptor.resetCounters() } + // This is disabled for the the consumer group until KAFKA-16155 is resolved. Review Comment: thanks for reporting this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly
[ https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano updated KAFKA-16157: -- Priority: Blocker (was: Major) > Topic recreation with offline disk doesn't update leadership/shrink ISR > correctly > - > > Key: KAFKA-16157 > URL: https://issues.apache.org/jira/browse/KAFKA-16157 > Project: Kafka > Issue Type: Bug > Components: jbod, kraft >Affects Versions: 3.7.1 >Reporter: Gaurav Narula >Priority: Blocker > Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, > broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, > broker.log.8, broker.log.9 > > > In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` > in each broker, we perform the following operations: > > # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the > topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`. > # Start a producer in the background to produce to `foo.test`. > # Break disk `d1` in `broker-1`. We simulate this by marking the log dir > read-only. > # Delete topic `foo.test` > # Recreate topic `foo.test`. Let's assume the topic was created with id > `bgdrsv-1QjCLFEqLOzVCHg`. > # Wait for 5 minutes > # Describe the recreated topic `foo.test`. > > We observe that `broker-1` is the leader and in-sync for few partitions > > > {code:java} > > Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10 > ReplicationFactor: 4 Configs: > min.insync.replicas=1,unclean.leader.election.enable=false > Topic: foo.test Partition: 0 Leader: 101 Replicas: > 101,102,103,104 Isr: 101,102,103,104 > Topic: foo.test Partition: 1 Leader: 102 Replicas: > 102,103,104,101 Isr: 102,103,104 > Topic: foo.test Partition: 2 Leader: 103 Replicas: > 103,104,101,102 Isr: 103,104,102 > Topic: foo.test Partition: 3 Leader: 104 Replicas: > 104,101,102,103 Isr: 104,102,103 > Topic: foo.test Partition: 4 Leader: 104 Replicas: > 104,102,101,103 Isr: 104,102,103 > Topic: foo.test Partition: 5 Leader: 102 Replicas: > 102,101,103,104 Isr: 102,103,104 > Topic: foo.test Partition: 6 Leader: 101 Replicas: > 101,103,104,102 Isr: 101,103,104,102 > Topic: foo.test Partition: 7 Leader: 103 Replicas: > 103,104,102,101 Isr: 103,104,102 > Topic: foo.test Partition: 8 Leader: 101 Replicas: > 101,102,104,103 Isr: 101,102,104,103 > Topic: foo.test Partition: 9 Leader: 102 Replicas: > 102,104,103,101 Isr: 102,104,103 > {code} > > > In this example, it is the leader of partitions `0, 6 and 8`. > > Consider `foo.test-8`. It is present in the following brokers/disks: > > > {code:java} > $ fd foo.test-8 > broker-1/d1/foo.test-8/ > broker-2/d2/foo.test-8/ > broker-3/d2/foo.test-8/ > broker-4/d1/foo.test-8/{code} > > > `broker-1/d1` still refers to the topic id which is pending deletion because > the log dir is marked offline. > > > {code:java} > $ cat broker-1/d1/foo.test-8/partition.metadata > version: 0 > topic_id: rAujIqcjRbu_-E4UxgQT8Q{code} > > > However, other brokers have the correct topic-id > > > {code:java} > $ cat broker-2/d2/foo.test-8/partition.metadata > version: 0 > topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code} > > > Now, let's consider `foo.test-0`. We observe that the replica isn't present > in `broker-1`: > {code:java} > $ fd foo.test-0 > broker-2/d1/foo.test-0/ > broker-3/d1/foo.test-0/ > broker-4/d2/foo.test-0/{code} > In both cases, `broker-1` shouldn't be the leader or in-sync replica for the > partitions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly
[ https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula updated KAFKA-16157: -- Description: In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` in each broker, we perform the following operations: # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`. # Start a producer in the background to produce to `foo.test`. # Break disk `d1` in `broker-1`. We simulate this by marking the log dir read-only. # Delete topic `foo.test` # Recreate topic `foo.test`. Let's assume the topic was created with id `bgdrsv-1QjCLFEqLOzVCHg`. # Wait for 5 minutes # Describe the recreated topic `foo.test`. We observe that `broker-1` is the leader and in-sync for few partitions {code:java} Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10 ReplicationFactor: 4 Configs: min.insync.replicas=1,unclean.leader.election.enable=false Topic: foo.test Partition: 0 Leader: 101 Replicas: 101,102,103,104 Isr: 101,102,103,104 Topic: foo.test Partition: 1 Leader: 102 Replicas: 102,103,104,101 Isr: 102,103,104 Topic: foo.test Partition: 2 Leader: 103 Replicas: 103,104,101,102 Isr: 103,104,102 Topic: foo.test Partition: 3 Leader: 104 Replicas: 104,101,102,103 Isr: 104,102,103 Topic: foo.test Partition: 4 Leader: 104 Replicas: 104,102,101,103 Isr: 104,102,103 Topic: foo.test Partition: 5 Leader: 102 Replicas: 102,101,103,104 Isr: 102,103,104 Topic: foo.test Partition: 6 Leader: 101 Replicas: 101,103,104,102 Isr: 101,103,104,102 Topic: foo.test Partition: 7 Leader: 103 Replicas: 103,104,102,101 Isr: 103,104,102 Topic: foo.test Partition: 8 Leader: 101 Replicas: 101,102,104,103 Isr: 101,102,104,103 Topic: foo.test Partition: 9 Leader: 102 Replicas: 102,104,103,101 Isr: 102,104,103 {code} In this example, it is the leader of partitions `0, 6 and 8`. Consider `foo.test-8`. It is present in the following brokers/disks: {code:java} $ fd foo.test-8 broker-1/d1/foo.test-8/ broker-2/d2/foo.test-8/ broker-3/d2/foo.test-8/ broker-4/d1/foo.test-8/{code} `broker-1/d1` still refers to the topic id which is pending deletion because the log dir is marked offline. {code:java} $ cat broker-1/d1/foo.test-8/partition.metadata version: 0 topic_id: rAujIqcjRbu_-E4UxgQT8Q{code} However, other brokers have the correct topic-id {code:java} $ cat broker-2/d2/foo.test-8/partition.metadata version: 0 topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code} Now, let's consider `foo.test-0`. We observe that the replica isn't present in `broker-1`: {code:java} $ fd foo.test-0 broker-2/d1/foo.test-0/ broker-3/d1/foo.test-0/ broker-4/d2/foo.test-0/{code} In both cases, `broker-1` shouldn't be the leader or in-sync replica for the partitions. was: In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` in each broker, we perform the following operations: # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`. # Start a producer in the background to produce to `foo.test`. # Break disk `d1` in `broker-1`. We simulate this by marking the log dir read-only. # Delete topic `foo.test` # Recreate topic `foo.test`. Let's assume the topic was created with id `bgdrsv-1QjCLFEqLOzVCHg`. # Wait for 5 minutes # Describe the recreated topic `foo.test`. We observe that `broker-1` is the leader and in-sync for few partitions {code:java} Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10 ReplicationFactor: 4 Configs: min.insync.replicas=1,unclean.leader.election.enable=false Topic: foo.test Partition: 0 Leader: 101 Replicas: 101,102,103,104 Isr: 101,102,103,104 Topic: foo.test Partition: 1 Leader: 102 Replicas: 102,103,104,101 Isr: 102,103,104 Topic: foo.test Partition: 2 Leader: 103 Replicas: 103,104,101,102 Isr: 103,104,102 Topic: foo.test Partition: 3 Leader: 104 Replicas: 104,101,102,103 Isr: 104,102,103 Topic: foo.test Partition: 4 Leader: 104 Replicas: 104,102,101,103 Isr: 104,102,103 Topic: foo.test Partition: 5 Leader: 102 Replicas: 102,101,103,104 Isr: 102,103,104 Topic: foo.test Partition: 6 Leader: 101 Replicas: 101,103,104,102 Isr: 101,103,104,102 Topic: foo.test Partition: 7 Leader: 103 Replicas: 103,104,102,101 Isr: 103,104,102 Topic: foo.test Partition: 8 Leader: 101 Replicas: 101,102,104,103 Isr: 101,102,104,103 Topic: foo.
Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]
philipnee commented on PR #15210: URL: https://github.com/apache/kafka/pull/15210#issuecomment-1896275067 @lucasbru - Would you have time to review this? Seems like the failed tests aren't necessary related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
lianetm opened a new pull request, #15215: URL: https://github.com/apache/kafka/pull/15215 This ensures that no records are fetched in the background thread while the onPartitionsAssigned callback completes running in the Application thread. This is achieved by pausing the partitions before triggering the callback, and resuming them when the callback successfully completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly
[ https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-16157: --- Affects Version/s: 3.7.0 (was: 3.7.1) > Topic recreation with offline disk doesn't update leadership/shrink ISR > correctly > - > > Key: KAFKA-16157 > URL: https://issues.apache.org/jira/browse/KAFKA-16157 > Project: Kafka > Issue Type: Bug > Components: jbod, kraft >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Priority: Blocker > Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, > broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, > broker.log.8, broker.log.9 > > > In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` > in each broker, we perform the following operations: > > # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the > topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`. > # Start a producer in the background to produce to `foo.test`. > # Break disk `d1` in `broker-1`. We simulate this by marking the log dir > read-only. > # Delete topic `foo.test` > # Recreate topic `foo.test`. Let's assume the topic was created with id > `bgdrsv-1QjCLFEqLOzVCHg`. > # Wait for 5 minutes > # Describe the recreated topic `foo.test`. > > We observe that `broker-1` is the leader and in-sync for few partitions > > > {code:java} > > Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10 > ReplicationFactor: 4 Configs: > min.insync.replicas=1,unclean.leader.election.enable=false > Topic: foo.test Partition: 0 Leader: 101 Replicas: > 101,102,103,104 Isr: 101,102,103,104 > Topic: foo.test Partition: 1 Leader: 102 Replicas: > 102,103,104,101 Isr: 102,103,104 > Topic: foo.test Partition: 2 Leader: 103 Replicas: > 103,104,101,102 Isr: 103,104,102 > Topic: foo.test Partition: 3 Leader: 104 Replicas: > 104,101,102,103 Isr: 104,102,103 > Topic: foo.test Partition: 4 Leader: 104 Replicas: > 104,102,101,103 Isr: 104,102,103 > Topic: foo.test Partition: 5 Leader: 102 Replicas: > 102,101,103,104 Isr: 102,103,104 > Topic: foo.test Partition: 6 Leader: 101 Replicas: > 101,103,104,102 Isr: 101,103,104,102 > Topic: foo.test Partition: 7 Leader: 103 Replicas: > 103,104,102,101 Isr: 103,104,102 > Topic: foo.test Partition: 8 Leader: 101 Replicas: > 101,102,104,103 Isr: 101,102,104,103 > Topic: foo.test Partition: 9 Leader: 102 Replicas: > 102,104,103,101 Isr: 102,104,103 > {code} > > > In this example, it is the leader of partitions `0, 6 and 8`. > > Consider `foo.test-8`. It is present in the following brokers/disks: > > > {code:java} > $ fd foo.test-8 > broker-1/d1/foo.test-8/ > broker-2/d2/foo.test-8/ > broker-3/d2/foo.test-8/ > broker-4/d1/foo.test-8/{code} > > > `broker-1/d1` still refers to the topic id which is pending deletion because > the log dir is marked offline. > > > {code:java} > $ cat broker-1/d1/foo.test-8/partition.metadata > version: 0 > topic_id: rAujIqcjRbu_-E4UxgQT8Q{code} > > > However, other brokers have the correct topic-id > > > {code:java} > $ cat broker-2/d2/foo.test-8/partition.metadata > version: 0 > topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code} > > > Now, let's consider `foo.test-0`. We observe that the replica isn't present > in `broker-1`: > {code:java} > $ fd foo.test-0 > broker-2/d1/foo.test-0/ > broker-3/d1/foo.test-0/ > broker-4/d2/foo.test-0/{code} > In both cases, `broker-1` shouldn't be the leader or in-sync replica for the > partitions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
jolshan commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1456228457 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -935,8 +935,9 @@ private[group] class GroupCoordinator( producerId, producerEpoch, RecordBatch.NO_SEQUENCE, - requestLocal, - postVerificationCallback + // Wrap the callback to be handled on an arbitrary request handler thread + // when transaction verification is complete. + KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, requestLocal) Review Comment: Oh sorry. I guess I was just confused I didn't see it in the replica manager flow. (for produce) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
jolshan commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456242441 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* Review Comment: Is the tools/consumergroup a new folder? I wonder if there is a name that is more consistent with the other folders. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
jolshan commented on PR #14856: URL: https://github.com/apache/kafka/pull/14856#issuecomment-1896365557 Did we want to delete the old files in this PR or a follow up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]
kirktrue commented on PR #14818: URL: https://github.com/apache/kafka/pull/14818#issuecomment-1896409297 @jcme—Can you trigger a rebuild of the CI job? It looks like the last run didn't work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]
philipnee opened a new pull request, #15216: URL: https://github.com/apache/kafka/pull/15216 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]
jolshan commented on code in PR #15212: URL: https://github.com/apache/kafka/pull/15212#discussion_r1456314640 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -861,19 +861,9 @@ private void maybeUpdatePartitionEpoch( ConsumerGroupMember oldMember, ConsumerGroupMember newMember ) { -if (oldMember == null) { -addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch()); -addPartitionEpochs(newMember.partitionsPendingRevocation(), newMember.memberEpoch()); -} else { -if (!oldMember.assignedPartitions().equals(newMember.assignedPartitions())) { -removePartitionEpochs(oldMember.assignedPartitions()); -addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch()); -} -if (!oldMember.partitionsPendingRevocation().equals(newMember.partitionsPendingRevocation())) { -removePartitionEpochs(oldMember.partitionsPendingRevocation()); -addPartitionEpochs(newMember.partitionsPendingRevocation(), newMember.memberEpoch()); -} -} +maybeRemovePartitionEpoch(oldMember); Review Comment: This logic seems simpler. Why didn't we just use it before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]
pprovenzano commented on PR #15197: URL: https://github.com/apache/kafka/pull/15197#issuecomment-1896448532 Test failures are not related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]
junrao commented on code in PR #15190: URL: https://github.com/apache/kafka/pull/15190#discussion_r1456303419 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1140,9 +1141,9 @@ private[kafka] class Processor( expiredConnectionsKilledCount.record(null, 1, 0) } else { val connectionId = receive.source -val context = new RequestContext(header, connectionId, channel.socketAddress, - channel.principal, listenerName, securityProtocol, - channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde) +val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()), Review Comment: Hmm, channel.socketPort() is never null. Why do we need to convert it to an Optional? ## server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java: ## @@ -59,6 +60,7 @@ public static RequestContext requestContext() throws UnknownHostException { new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, "producer-1", 0), "1", InetAddress.getLocalHost(), +Optional.of(56078), Review Comment: Could we define a constant for 56078 and reuse? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]
gharris1727 commented on PR #14729: URL: https://github.com/apache/kafka/pull/14729#issuecomment-1896468099 @chia7712 @showuon @mimaison Are any of you able to review this resource leak fix? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]
gharris1727 closed pull request #14762: KAFKA-15826: Close consumer when sink task is cancelled URL: https://github.com/apache/kafka/pull/14762 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]
gharris1727 commented on PR #14762: URL: https://github.com/apache/kafka/pull/14762#issuecomment-1896474948 I think due to the locking concerns I raised earlier, and that we can resolve this resource leak in our tests, this PR is not viable to merge. We can revisit this in the future if the locking model of the consumer can be changed, or if this becomes a demonstrable problem in real situations and not just tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807858#comment-17807858 ] Justine Olshan commented on KAFKA-16156: The transactional copier is pretty old. We can probably update it. If I get time I can take a look. > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Lianet Magrans >Priority: Major > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(Comp
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16116: -- Parent: (was: KAFKA-14246) Issue Type: Improvement (was: Sub-task) > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
[ https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807866#comment-17807866 ] Kirk True commented on KAFKA-15878: --- [~philomathanuj]—for this to work, I assume the user would have to implement custom callback handlers that use a non-JWT validation approach, correct? > KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER > > > Key: KAFKA-15878 > URL: https://issues.apache.org/jira/browse/KAFKA-15878 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Anuj Sharma >Priority: Major > Labels: oauth > Fix For: 3.8.0 > > > {code:java} > // code placeholder > {code} > h1. Overview > * This issue pertains to > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism of Kafka authentication. > * Kafka clients can use [SASL/OAUTHBEARER > |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism > by overriding the [custom call back > handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] > . > * > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > available from v3.1 further extends the mechanism with a production grade > implementation. > * Kafka's > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is > because of a more restrictive set of characters than what > [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] > recommends. > * This JIRA can be considered an extension of > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > to support the opaque tokens as well apart from the JWT tokens. > > In summary the following character set should be supported as per the RFC - > {code:java} > 1*( ALPHA / DIGIT / >"-" / "." / "_" / "~" / "+" / "/" ) *"=" > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16150. --- Resolution: Duplicate > Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe > > > Key: KAFKA-16150 > URL: https://issues.apache.org/jira/browse/KAFKA-16150 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16151. --- Resolution: Duplicate > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Metadata schema checker [kafka]
cmccabe commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1456440526 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.CheckoutCommand; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.InitCommand; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.internal.storage.file.FileRepository; +import org.eclipse.jgit.lib.*; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.storage.file.FileRepositoryBuilder; +import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; + +static String[] filesCheckMetadata = {"AccessControlEntryRecord.json", "BrokerRegistrationChangeRecord.json", "ClientQuotaRecord.json", +"ConfigRecord.json", "DelegationTokenRecord.json", "FeatureLevelRecord.json", "FenceBrokerRecord.json", "NoOpRecord.json", +"PartitionChangeRecord.json", "PartitionRecord.json", "ProducerIdsRecord.json", "RegisterBrokerRecord.json", +"RemoveAccessControlEntryRecord.json", "RemoveTopicRecord.json", "RemoveUserScramCredentialRecord.json", "TopicRecord.json", +"UnfenceBrokerRecord.json", "UnregisterBrokerRecord.json", "UserScramCredentialRecord.json", "ZkMigrationRecord.json"}; +public static void main(String[] args) throws Exception { + +try { +List localContent = new ArrayList<>(); +for(String jsonSchema: filesCheckMetadata) { +final String dir = System.getProperty("user.dir"); +String path = dir + "/metadata/src/main/resources/common/metadata/" + jsonSchema; +BufferedReader reader = new BufferedReader(new FileReader(path)); +for (int i = 0; i < 15; i++) { Review Comment: Please change this not to use a raw 15, but to use an appropriate constant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Metadata schema checker [kafka]
cmccabe commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1456442184 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; + +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; +static String[] filesCheckMetadata = new File(System.getProperty("user.dir") + "/metadata/src/main/resources/common/metadata/").list(); Review Comment: please change this to use something like `Paths.get` so that it's not dependent on the system path separator (which is different on Windows) ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; + +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; +static String[] filesCheckMetadata = new File(System.getProperty("user.dir") + "/metadata/src/main/resources/common/metadata/").list(); +public static void main(String[] args) throws Exception { + +
Re: [PR] Metadata schema checker [kafka]
cmccabe commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1456441308 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; + +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; +static String[] filesCheckMetadata = new File(System.getProperty("user.dir") + "/metadata/src/main/resources/common/metadata/").list(); +public static void main(String[] args) throws Exception { + +try { +List localContent = new ArrayList<>(); +for(String fileName: filesCheckMetadata) { +final String dir = System.getProperty("user.dir"); +String path = dir + "/metadata/src/main/resources/common/metadata/" + fileName; +BufferedReader reader = new BufferedReader(new FileReader(path)); +for (int i = 0; i < 15; i++) { +reader.readLine(); +} +StringBuilder content = new StringBuilder(); +boolean print = false; +for (String line = reader.readLine(); line != null; line = reader.readLine()) { +if (line.charAt(0) == '{') { +print = true; +} +if (print && !line.contains("//")) { Review Comment: It's not enough for the line to contain `//`. It needs to start with `//` (after whitespace has been trimmed) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Metadata schema checker [kafka]
cmccabe commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1456447370 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; + +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; +static String[] filesCheckMetadata = new File(System.getProperty("user.dir") + "/metadata/src/main/resources/common/metadata/").list(); +public static void main(String[] args) throws Exception { + +try { +List localContent = new ArrayList<>(); +for(String fileName: filesCheckMetadata) { +final String dir = System.getProperty("user.dir"); +String path = dir + "/metadata/src/main/resources/common/metadata/" + fileName; +BufferedReader reader = new BufferedReader(new FileReader(path)); +for (int i = 0; i < 15; i++) { +reader.readLine(); +} +StringBuilder content = new StringBuilder(); +boolean print = false; +for (String line = reader.readLine(); line != null; line = reader.readLine()) { +if (line.charAt(0) == '{') { +print = true; +} +if (print && !line.contains("//")) { +content.append(line); +} +} +localContent.add(content.toString()); +} + +List gitContent = GetDataFromGit(); +if (localContent.size() != gitContent.size()) { +throw new IllegalStateException("missing schemas"); +} +for(int i = 0; i < localContent.size(); i++) { +if (Objects.equals(localContent.get(i), gitContent.get(i))) { +continue; +} + +ObjectMapper objectMapper = new ObjectMapper(); +JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i)); +JsonNode jsonNode2 = objectMapper.readTree(localContent.get(i)); + +checkApiTypeVersions(jsonNode1, jsonNode2); +parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) jsonNode2.get("fields")); +} + +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +private static List GetDataFromGit() throws IOException, GitAPIException { +List gitSchemas = new ArrayList<>(); + +Git git = Git.open(new File(System.getProperty("user.dir") + "/.git")); Review Comment: you don't need the slash here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Metadata schema checker [kafka]
cmccabe commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1456448643 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; + +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; +static String[] filesCheckMetadata = new File(System.getProperty("user.dir") + "/metadata/src/main/resources/common/metadata/").list(); +public static void main(String[] args) throws Exception { + +try { +List localContent = new ArrayList<>(); +for(String fileName: filesCheckMetadata) { +final String dir = System.getProperty("user.dir"); +String path = dir + "/metadata/src/main/resources/common/metadata/" + fileName; +BufferedReader reader = new BufferedReader(new FileReader(path)); +for (int i = 0; i < 15; i++) { +reader.readLine(); +} +StringBuilder content = new StringBuilder(); +boolean print = false; +for (String line = reader.readLine(); line != null; line = reader.readLine()) { +if (line.charAt(0) == '{') { +print = true; +} +if (print && !line.contains("//")) { +content.append(line); +} +} +localContent.add(content.toString()); +} + +List gitContent = GetDataFromGit(); +if (localContent.size() != gitContent.size()) { +throw new IllegalStateException("missing schemas"); +} +for(int i = 0; i < localContent.size(); i++) { +if (Objects.equals(localContent.get(i), gitContent.get(i))) { +continue; +} + +ObjectMapper objectMapper = new ObjectMapper(); +JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i)); +JsonNode jsonNode2 = objectMapper.readTree(localContent.get(i)); + +checkApiTypeVersions(jsonNode1, jsonNode2); +parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) jsonNode2.get("fields")); +} + +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +private static List GetDataFromGit() throws IOException, GitAPIException { +List gitSchemas = new ArrayList<>(); + +Git git = Git.open(new File(System.getProperty("user.dir") + "/.git")); +Repository repository = git.getRepository(); +Ref head = git.getRepository().getRefDatabase().firstExactRef("refs/heads/trunk"); + +try (RevWalk revWalk = new RevWalk(repository)) { +RevCommit commit = revWalk.parseCommit(head.getObjectId()); +RevTree tree = commit.getTree(); +for (String fileName : filesCheckMetadata)
Re: [PR] Metadata schema checker [kafka]
cmccabe commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1456448973 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.CheckoutCommand; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.InitCommand; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.internal.storage.file.FileRepository; +import org.eclipse.jgit.lib.*; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.storage.file.FileRepositoryBuilder; +import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; + +static String[] filesCheckMetadata = {"AccessControlEntryRecord.json", "BrokerRegistrationChangeRecord.json", "ClientQuotaRecord.json", +"ConfigRecord.json", "DelegationTokenRecord.json", "FeatureLevelRecord.json", "FenceBrokerRecord.json", "NoOpRecord.json", +"PartitionChangeRecord.json", "PartitionRecord.json", "ProducerIdsRecord.json", "RegisterBrokerRecord.json", +"RemoveAccessControlEntryRecord.json", "RemoveTopicRecord.json", "RemoveUserScramCredentialRecord.json", "TopicRecord.json", +"UnfenceBrokerRecord.json", "UnregisterBrokerRecord.json", "UserScramCredentialRecord.json", "ZkMigrationRecord.json"}; +public static void main(String[] args) throws Exception { + +try { +List localContent = new ArrayList<>(); +for(String jsonSchema: filesCheckMetadata) { +final String dir = System.getProperty("user.dir"); +String path = dir + "/metadata/src/main/resources/common/metadata/" + jsonSchema; +BufferedReader reader = new BufferedReader(new FileReader(path)); +for (int i = 0; i < 15; i++) { +reader.readLine(); +} +StringBuilder content = new StringBuilder(); +for (String line = reader.readLine(); line != null; line = reader.readLine()) { +content.append(line); +} +localContent.add(content.toString()); +} + +List gitContent = GetDataFromGit(); +if (localContent.size() != gitContent.size()) { +throw new IllegalStateException("missing schemas"); +} +for(int i = 0; i < localContent.size(); i++) { +if (Objects.equals(localContent.get(i), gitContent.get(i))) { +continue; +} + +ObjectMapper objectMapper = new ObjectMapper(); +JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i)); +JsonNode jsonNode2 = objectMapper.readTree(localContent.get(i)); + +checkApiTypeVersions(jsonNode1, jsonNode2); +parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) jsonNode2.get("fields")); +} + +} catch (FileNotFoundException e) { +System.out.println("An error occurred."); +e.printStackTrace(); +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +private static List GetDataFromGit() throws IOExcepti
Re: [PR] Metadata schema checker [kafka]
cmccabe commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1456451709 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; + +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; +static String[] filesCheckMetadata = new File(System.getProperty("user.dir") + "/metadata/src/main/resources/common/metadata/").list(); +public static void main(String[] args) throws Exception { + +try { +List localContent = new ArrayList<>(); +for(String fileName: filesCheckMetadata) { +final String dir = System.getProperty("user.dir"); +String path = dir + "/metadata/src/main/resources/common/metadata/" + fileName; +BufferedReader reader = new BufferedReader(new FileReader(path)); +for (int i = 0; i < 15; i++) { +reader.readLine(); +} +StringBuilder content = new StringBuilder(); +boolean print = false; +for (String line = reader.readLine(); line != null; line = reader.readLine()) { +if (line.charAt(0) == '{') { +print = true; +} +if (print && !line.contains("//")) { +content.append(line); +} +} +localContent.add(content.toString()); +} + +List gitContent = GetDataFromGit(); +if (localContent.size() != gitContent.size()) { +throw new IllegalStateException("missing schemas"); +} +for(int i = 0; i < localContent.size(); i++) { +if (Objects.equals(localContent.get(i), gitContent.get(i))) { +continue; +} + +ObjectMapper objectMapper = new ObjectMapper(); +JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i)); +JsonNode jsonNode2 = objectMapper.readTree(localContent.get(i)); + +checkApiTypeVersions(jsonNode1, jsonNode2); +parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) jsonNode2.get("fields")); +} + +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +private static List GetDataFromGit() throws IOException, GitAPIException { +List gitSchemas = new ArrayList<>(); + +Git git = Git.open(new File(System.getProperty("user.dir") + "/.git")); +Repository repository = git.getRepository(); +Ref head = git.getRepository().getRefDatabase().firstExactRef("refs/heads/trunk"); + +try (RevWalk revWalk = new RevWalk(repository)) { +RevCommit commit = revWalk.parseCommit(head.getObjectId()); +RevTree tree = commit.getTree(); +for (String fileName : filesCheckMetadata)
[jira] [Assigned] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-16141: --- Assignee: Matthias J. Sax (was: Almog Gavra) > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Matthias J. Sax >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807890#comment-17807890 ] Matthias J. Sax commented on KAFKA-16141: - After discussion with Almog (and testing it) implementing `WrappedStateStore` won't do the trick. Given that this is a regression, it seems best (most pragmatic) to just revert the change on `KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement `TimestampedBytesStore` for now (even if this still seems to be the right thing to do – however, it might require some larger changes to make it work – will file a follow up ticket for this). > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Matthias J. Sax >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]
mjsax opened a new pull request, #15217: URL: https://github.com/apache/kafka/pull/15217 KAFKA-15629 added `TimestampedByteStore` interface to `KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore code path and thus some system tests. This PR reverts this change for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]
mjsax commented on PR #15217: URL: https://github.com/apache/kafka/pull/15217#issuecomment-1896696469 \cc @agavra @stanislavkozlovski -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]
mjsax commented on code in PR #15217: URL: https://github.com/apache/kafka/pull/15217#discussion_r1456485535 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ## @@ -422,6 +423,16 @@ public static Function getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), byteArray); } +public static boolean isAdapter(final StateStore stateStore) { Review Comment: Adding this helper (which is kinda "hack") for now to not break IQv2 code path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16158) Cleanup usage of `TimestampedBytesStore` interface
Matthias J. Sax created KAFKA-16158: --- Summary: Cleanup usage of `TimestampedBytesStore` interface Key: KAFKA-16158 URL: https://issues.apache.org/jira/browse/KAFKA-16158 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax We added `TimestampedBytesStore` interface many release ago. It's purpose is to indicate if a byte-store's binary value contains a "plain value" or a "" format. Stores with "" format should implement the interface, however not all stores which this format do. We tried to fix one occurrence via https://issues.apache.org/jira/browse/KAFKA-15629 by adding `TimestampedBytesStore` to `KeyValueToTimestampedKeyValueByteStoreAdapter`, whoever this change broke the restore code path (cf https://issues.apache.org/jira/browse/KAFKA-16141) and thus we reverted the change. During the investigation, we also notices that `InMemoryTimestampedKeyValueStoreMarker` implements `TimestampedBytesStore` but does not do a byte-array translation (it's unclear why no byte array translation happens) – and it's also unclear if in-memory store is testes properly. We should try to clean this all up, adding `TimestampedBytesStore` to `KeyValueToTimestampedKeyValueByteStoreAdapter` and figure out how avoid breaking the restore code path. In addition, we should verify if `InMemoryTimestampedKeyValueStoreMarker` is correct or not, and if the restore code path (and maybe also IQv2 code path) is tested properly and correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807890#comment-17807890 ] Matthias J. Sax edited comment on KAFKA-16141 at 1/17/24 9:15 PM: -- After discussion with Almog (and testing it) implementing `WrappedStateStore` won't do the trick. Given that this is a regression, it seems best (most pragmatic) to just revert the change on `KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement `TimestampedBytesStore` for now (even if this still seems to be the right thing to do – however, it might require some larger changes to make it work – will filed a follow up ticket for this: https://issues.apache.org/jira/browse/KAFKA-16158). was (Author: mjsax): After discussion with Almog (and testing it) implementing `WrappedStateStore` won't do the trick. Given that this is a regression, it seems best (most pragmatic) to just revert the change on `KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement `TimestampedBytesStore` for now (even if this still seems to be the right thing to do – however, it might require some larger changes to make it work – will file a follow up ticket for this). > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Matthias J. Sax >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]
mjsax commented on PR #15217: URL: https://github.com/apache/kafka/pull/15217#issuecomment-1896752852 Triggered a system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6035/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
anurag-harness opened a new pull request, #15218: URL: https://github.com/apache/kafka/pull/15218 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]
jeffkbkim commented on PR #15212: URL: https://github.com/apache/kafka/pull/15212#issuecomment-1896786496 > when a member has a partition pending revocation re-assigned to him before the revocation is completed, the partition epoch is lost to confirm, the `previousMemberEpoch=11` should be `14` in the given example right? > a partition got assigned to two members this seems a bit scary. should we have some check that ensures the invariant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16141: Issue Type: Bug (was: Test) > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Matthias J. Sax >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax merged PR #15207: URL: https://github.com/apache/kafka/pull/15207 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax commented on PR #15207: URL: https://github.com/apache/kafka/pull/15207#issuecomment-1896826978 Merged to `trunk` and cherry-picked to `3.7` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16139. - Fix Version/s: 3.7.0 3.6.1 Resolution: Fixed > StreamsUpgradeTest fails consistently in 3.7.0 > -- > > Key: KAFKA-16139 > URL: https://issues.apache.org/jira/browse/KAFKA-16139 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > h1. > kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{ > “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”} > > {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on > ubuntu@worker2')}} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
nizhikov commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456518422 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/PartitionAssignmentState.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import org.apache.kafka.common.Node; + +import java.util.Optional; + +class PartitionAssignmentState { +public final String group; +public final Optional coordinator; +public final Optional topic; +public final Optional partition; +public final Optional offset; +public final Optional lag; +public final Optional consumerId; +public final Optional host; +public final Optional clientId; +public final Optional logEndOffset; + +public PartitionAssignmentState(String group, Optional coordinator, Optional topic, +Optional partition, Optional offset, Optional lag, +Optional consumerId, Optional host, Optional clientId, +Optional logEndOffset) { +this.group = group; +this.coordinator = coordinator; +this.topic = topic; +this.partition = partition; +this.offset = offset; +this.lag = lag; +this.consumerId = consumerId; +this.host = host; +this.clientId = clientId; +this.logEndOffset = logEndOffset; +} +} Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
nizhikov commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456521165 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ConsumerGroupCommandOptions extends CommandDefaultOptions { +public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class); + +public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; +public static final String GROUP_DOC = "The consumer group we wish to act on."; +public static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + +"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + +"Reset-offsets also supports multiple topic inputs."; +public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; +public static final String LIST_DOC = "List all consumer groups."; +public static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."; +public static final String ALL_GROUPS_DOC = "Apply to all consumer groups."; +public static final String NL = System.lineSeparator(); +public static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " + +"over the entire consumer group. For instance --group g1 --group g2"; +public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + +"to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + +"or is going through some changes)."; +public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer."; +public static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL + +"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " + +"Additionally, the --export option is used to export the results to a CSV format." + NL + +"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + +"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + +"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'."; +public static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets."; +public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; +public static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets."; +public static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; +public static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; +public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: '-MM-DDTHH:mm:SS.sss'"; +public static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"; +pub
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1456522509 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: Fine with me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
nizhikov commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456523688 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ConsumerGroupCommandOptions extends CommandDefaultOptions { +public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class); + +public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; +public static final String GROUP_DOC = "The consumer group we wish to act on."; +public static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + +"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + +"Reset-offsets also supports multiple topic inputs."; +public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; +public static final String LIST_DOC = "List all consumer groups."; +public static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."; +public static final String ALL_GROUPS_DOC = "Apply to all consumer groups."; +public static final String NL = System.getProperty("line.separator"); +public static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " + +"over the entire consumer group. For instance --group g1 --group g2"; +public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + +"to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + +"or is going through some changes)."; +public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer."; +public static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL + +"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " + +"Additionally, the --export option is used to export the results to a CSV format." + NL + +"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + +"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + +"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'."; +public static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets."; +public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; +public static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets."; +public static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; +public static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; +public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: '-MM-DDTHH:mm:SS.sss'"; +public static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnM
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax merged PR #15148: URL: https://github.com/apache/kafka/pull/15148 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jeffkbkim commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1456526190 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1098,30 +1054,36 @@ private static boolean isGroupIdNotEmpty(String groupId) { return groupId != null && !groupId.isEmpty(); } -/** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. - */ -private static Errors normalizeException(Throwable exception) { -exception = Errors.maybeUnwrapException(exception); - -if (exception instanceof UnknownTopicOrPartitionException || -exception instanceof NotEnoughReplicasException || -exception instanceof TimeoutException) { -return Errors.COORDINATOR_NOT_AVAILABLE; -} - -if (exception instanceof NotLeaderOrFollowerException || -exception instanceof KafkaStorageException) { -return Errors.NOT_COORDINATOR; -} - -if (exception instanceof RecordTooLargeException || -exception instanceof RecordBatchTooLargeException || -exception instanceof InvalidFetchSizeException) { -return Errors.UNKNOWN_SERVER_ERROR; +private RSP handleOperationException( +String requestName, +REQ request, +Throwable exception, +BiFunction responseBuilder +) { +ApiError apiError = ApiError.fromThrowable(exception); + +switch (apiError.error()) { +case UNKNOWN_SERVER_ERROR: +log.error("{} request {} hit an unexpected exception: {}.", +requestName, request, exception.getMessage(), exception); +return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, null); + +case UNKNOWN_TOPIC_OR_PARTITION: +case NOT_ENOUGH_REPLICAS: +case REQUEST_TIMED_OUT: +return responseBuilder.apply(Errors.COORDINATOR_NOT_AVAILABLE, null); + +case NOT_LEADER_OR_FOLLOWER: +case KAFKA_STORAGE_ERROR: +return responseBuilder.apply(Errors.NOT_COORDINATOR, null); + +case MESSAGE_TOO_LARGE: +case RECORD_LIST_TOO_LARGE: +case INVALID_FETCH_SIZE: +return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, null); + +default: +return responseBuilder.apply(apiError.error(), apiError.message()); Review Comment: it looks like the error message is only set by `consumerGroupHeartbeat`. what's an example of an error message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
nizhikov commented on PR #14856: URL: https://github.com/apache/kafka/pull/14856#issuecomment-1896962207 @jolshan > Did we want to delete the old files in this PR or a follow up? For the previous command we remove old files only when command was merged 1. Reassign case classes and options moved to java - 1fd58e30cf38587a66c1f7188c7667b555624485 3. Command moved to java and old classes removed - 76b1b50b644149e77ee1ec42d882e2cb80742bdf -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
nizhikov commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456530381 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* Review Comment: > Is the tools/consumergroup a new folder? yes. > I wonder if there is a name that is more consistent with the other folders. Couldn't come up with the better naming :) Do you have one in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
jolshan commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456536862 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* Review Comment: I wasn't sure if we wanted to use the same convention of the other java consumer group files. But maybe that is confusing. They follow a pattern of coordinator/group. Not sure if we really have reusability amongst coordinator tools though. I think when we have had two word folders we usually put a dash between them (ie consumer-group) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]
cmccabe merged PR #15197: URL: https://github.com/apache/kafka/pull/15197 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
Philip Nee created KAFKA-16159: -- Summary: Prune excessive logging from Telemetry Reporter Key: KAFKA-16159 URL: https://issues.apache.org/jira/browse/KAFKA-16159 Project: Kafka Issue Type: Task Components: consumer, log Reporter: Philip Nee Assignee: Apoorv Mittal While running system tests locally, I've noticed excessive logging of the Telemtry Reporter. This I believe was introduced in KIP-714. {code:java} [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, returning the value 224678 ms; the client will wait before submitting the next GetTelemetrySubscriptions network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} This is logged several times per ms - Also, given the amount of log being emitted, can we also check the CPU profile to see if there's a process running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
nizhikov commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1456546534 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* Review Comment: > I think when we have had two word folders we usually put a dash between them (ie consumer-group) Dash can't be used in java package name. > They follow a pattern of coordinator/group We can move classes to `org.apache.kafka.tools.consumer.group` package. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Labels: logging (was: ) > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Labels: logging > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Labels: consumer logging (was: logging) > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Labels: consumer, logging > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Fix Version/s: 3.8.0 > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16131) Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 KRaft cluster with metadata version 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-16131. -- Resolution: Fixed > Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 > KRaft cluster with metadata version 3.6 > > > Key: KAFKA-16131 > URL: https://issues.apache.org/jira/browse/KAFKA-16131 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Assignee: Proven Provenzano >Priority: Blocker > Fix For: 3.7.0 > > > When running Kafka 3.7.0-RC2 as a KRaft cluster with metadata version set to > 3.6-IV2 metadata version, it throws repeated errors like this in the > controller logs: > {quote}2024-01-13 16:58:01,197 INFO [QuorumController id=0] > assignReplicasToDirs: event failed with UnsupportedVersionException in 15 > microseconds. (org.apache.kafka.controller.QuorumController) > [quorum-controller-0-event-handler] > 2024-01-13 16:58:01,197 ERROR [ControllerApis nodeId=0] Unexpected error > handling request RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, apiVersion=0, > clientId=1000, correlationId=14, headerVersion=2) – > AssignReplicasToDirsRequestData(brokerId=1000, brokerEpoch=5, > directories=[DirectoryData(id=w_uxN7pwQ6eXSMrOKceYIQ, > topics=[TopicData(topicId=bvAKLSwmR7iJoKv2yZgygQ, > partitions=[PartitionData(partitionIndex=2), > PartitionData(partitionIndex=1)]), TopicData(topicId=uNe7f5VrQgO0zST6yH1jDQ, > partitions=[PartitionData(partitionIndex=0)])])]) with context > RequestContext(header=RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, > apiVersion=0, clientId=1000, correlationId=14, headerVersion=2), > connectionId='172.16.14.219:9090-172.16.14.217:53590-7', > clientAddress=/[172.16.14.217|http://172.16.14.217/], > principal=User:CN=my-cluster-kafka,O=io.strimzi, > listenerName=ListenerName(CONTROLPLANE-9090), securityProtocol=SSL, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.7.0), fromPrivilegedListener=false, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@71004ad2]) > (kafka.server.ControllerApis) [quorum-controller-0-event-handler] > java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.UnsupportedVersionException: Directory > assignment is not supported yet. > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:880) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:148) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:137) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:840) > Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: > Directory assignment is not supported yet. > {quote} > > With the metadata version set to 3.6-IV2, it makes sense that the request is > not supported. But the request should in such case not be sent at all. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Description: While running system tests locally, I've noticed excessive logging of the Telemtry Reporter. This I believe was introduced in KIP-714. {code:java} [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, returning the value 224678 ms; the client will wait before submitting the next GetTelemetrySubscriptions network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} This is logged several times per ms - Also, given the amount of log being emitted, can we also check the CPU profile to see if there's a process running a tight loop? Update --- Looking from the beginning, is this caused by the following? {code:java} DEBUG The broker generated an error for the get telemetry network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) 146 org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS {code} was: While running system tests locally, I've noticed excessive logging of the Telemtry Reporter. This I believe was introduced in KIP-714. {code:java} [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, returning the value 224678 ms; the client will wait before submitting the next GetTelemetrySubscriptions network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} This is logged several times per ms - Also, given the amount of log being emitted, can we also check the CPU profile to see if there's a process running a tight loop? > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Labels: consumer, logging > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? > > Update > --- > Looking from the beginning, is this caused by the following? > {code:java} > DEBUG The broker generated an error for the get telemetry network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) > 146 org.apache.kafka.common.errors.UnsupportedVersionException: The node > does not support GET_TELEMETRY_SUBSCRIPTIONS {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)