Re: [PR] KAFKA-15541: Add num-open-iterators metric [kafka]
mjsax commented on code in PR #15975: URL: https://github.com/apache/kafka/pull/15975#discussion_r1607621599 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java: ## @@ -144,6 +145,10 @@ private StateStoreMetrics() {} private static final String SUPPRESSION_BUFFER_SIZE_MAX_DESCRIPTION = MAX_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_SIZE_DESCRIPTION; +private static final String NUM_OPEN_ITERATORS = "num-open-iterators"; +private static final String NUM_OPEN_ITERATORS_DESCRIPTION = +"The current number of Iterators on the store that have been created, but not yet closed"; Review Comment: ```suggestion "The current number of iterators on the store that have been created, but not yet closed"; ``` ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ## @@ -162,6 +165,8 @@ private void registerMetrics() { flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics); deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); +StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, Review Comment: Would it be better to add a `Sensor` that allows us to track the different metrics in one go? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
rreddy-22 commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1607621132 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) { return targetAssignment.getOrDefault(memberId, Assignment.EMPTY); } +/** + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ +public Map> partitionAssignments() { +return Collections.unmodifiableMap(partitionAssignments); +} + /** * Updates target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { +updatePartitionAssignments( +memberId, +targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), +newTargetAssignment +); targetAssignment.put(memberId, newTargetAssignment); } +/** + * Updates partition assignments of the topics. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + * + * Package private for testing. + */ +void updatePartitionAssignments( +String memberId, +Assignment oldTargetAssignment, +Assignment newTargetAssignment +) { +// Combine keys from both old and new assignments. +Set allTopicIds = new HashSet<>(); +allTopicIds.addAll(oldTargetAssignment.partitions().keySet()); +allTopicIds.addAll(newTargetAssignment.partitions().keySet()); + +for (Uuid topicId : allTopicIds) { +Set oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); +Set newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + +TimelineHashMap topicPartitionAssignment = partitionAssignments.computeIfAbsent( +topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size())) +); + +// Remove partitions that aren't present in the new assignment. +for (Integer partition : oldPartitions) { +if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) { Review Comment: Imagine there are two members A,B. We remove a partition (P0) from A and assign it to B. If B heartbeats first and we see that it's new target assignment has P0 now we will update the map to 1,B. This way when A heartbeats, we don't remove it from the map unless it's currently assigned to A still. In the case where we just use a byte array with each index as a partition number, it was possible that we would update the assignment as 1,true when B heartbeats and then unset it to false when A heartbeats even though it is currently assigned to B. We need to make sure we only remove assignments if the current member still has ownership -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
rreddy-22 commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1607621132 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) { return targetAssignment.getOrDefault(memberId, Assignment.EMPTY); } +/** + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ +public Map> partitionAssignments() { +return Collections.unmodifiableMap(partitionAssignments); +} + /** * Updates target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { +updatePartitionAssignments( +memberId, +targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), +newTargetAssignment +); targetAssignment.put(memberId, newTargetAssignment); } +/** + * Updates partition assignments of the topics. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + * + * Package private for testing. + */ +void updatePartitionAssignments( +String memberId, +Assignment oldTargetAssignment, +Assignment newTargetAssignment +) { +// Combine keys from both old and new assignments. +Set allTopicIds = new HashSet<>(); +allTopicIds.addAll(oldTargetAssignment.partitions().keySet()); +allTopicIds.addAll(newTargetAssignment.partitions().keySet()); + +for (Uuid topicId : allTopicIds) { +Set oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); +Set newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + +TimelineHashMap topicPartitionAssignment = partitionAssignments.computeIfAbsent( +topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size())) +); + +// Remove partitions that aren't present in the new assignment. +for (Integer partition : oldPartitions) { +if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) { Review Comment: Imagine there are two members A,B. We remove a partition (P0) from A and assign it to B. If B heartbeats first and we see that it's new target assignment has P0 now we will update the map to 1,B. This way when A heartbeats and we don't remove it from the map unless it's currently assigned to A still. In the case where we just use a byte array it was possible that we update the assignment as 1,true and then unset it when A heartbeats even though it is currently assigned to B now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2121704898 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14934: KafkaClusterTestKit makes FaultHandler accessible [kafka]
ahmedryasser opened a new pull request, #16012: URL: https://github.com/apache/kafka/pull/16012 We wrote a function that provide a way to access the MockFaultHandler instances for both fatal and non-fatal faults in the KafkaClusterTestKit class. We created a KafkaClusterKit instance with one broker node and one controller node then we call the 2 functions that we created and assert that they don't return null which means they've been exposed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16619: add log for de-novo KRaft cluster [kafka]
jayim243 opened a new pull request, #16011: URL: https://github.com/apache/kafka/pull/16011 Add a log for de-novo KRaft cluster when zookeeper.metadata.migration.enable=false Modified test cases for this change. This contribution is my original work and I license the work to the project under the project's open source license. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
github-actions[bot] commented on PR #15067: URL: https://github.com/apache/kafka/pull/15067#issuecomment-2121659770 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16760. --- Resolution: Not A Problem > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848032#comment-17848032 ] Luke Chen commented on KAFKA-16760: --- OK, so it looks like this is the expected behavior. I'll close this ticket then. Thank you. > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848031#comment-17848031 ] Luke Chen commented on KAFKA-16414: --- I agree with [~jeqo] . I think the goal we'd like to reach is to be consistency for retention.ms/bytes. So, if we think making small pieces of segments is a big drawback to include active segment, should we not include active segment for both ms and bytes configs? Do you see any side effect if we choose this option? > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16711: make sure update highestOffsetInRemoteStorage after log dir change [kafka]
showuon commented on PR #15947: URL: https://github.com/apache/kafka/pull/15947#issuecomment-2121598733 @satishd , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on PR #15951: URL: https://github.com/apache/kafka/pull/15951#issuecomment-2121598551 @chia7712 , I've updated the PR. Please take a look again when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16666) Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and `OffsetsMessageFormatter`to tools module
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848027#comment-17848027 ] 黃竣陽 commented on KAFKA-1: - I'm interesting in this issue. Please assign to me, Thanks you > Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and > `OffsetsMessageFormatter`to tools module > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > > `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and > `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data > of internal topics. Following the migration plan, we should move them to > tools-api module. Also, we need to keep the compatibility of command line. > That is to say, `ConsoleConsumer` can accept the previous package name and > then use the (java) implementation to parse and make same output. > [0] > https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269 > [1] > https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248 > [2] > https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests
[ https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848022#comment-17848022 ] Matthias J. Sax commented on KAFKA-16801: - There packages contain code for system tests. We put the code under `src/test/java/...`; there is no `src/main/java/...` and the code is not unit test code either What would be the right way to address this? > Streams upgrade :test target doesn't find any junit tests > - > > Key: KAFKA-16801 > URL: https://issues.apache.org/jira/browse/KAFKA-16801 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Greg Harris >Priority: Major > Labels: newbie > > No test executed. This behavior has been deprecated. > This will fail with an error in Gradle 9.0. > There are test sources present but no test was executed. Please check your > test configuration. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed] > > 23 usages > > Task::streams:upgrade-system-tests-0100:test > Task::streams:upgrade-system-tests-0101:test > Task::streams:upgrade-system-tests-0102:test > Task::streams:upgrade-system-tests-0110:test > Task::streams:upgrade-system-tests-10:test > Task::streams:upgrade-system-tests-11:test > Task::streams:upgrade-system-tests-20:test > Task::streams:upgrade-system-tests-21:test > Task::streams:upgrade-system-tests-22:test > Task::streams:upgrade-system-tests-23:test > Task::streams:upgrade-system-tests-24:test > Task::streams:upgrade-system-tests-25:test > Task::streams:upgrade-system-tests-26:test > Task::streams:upgrade-system-tests-27:test > Task::streams:upgrade-system-tests-28:test > Task::streams:upgrade-system-tests-30:test > Task::streams:upgrade-system-tests-31:test > Task::streams:upgrade-system-tests-32:test > Task::streams:upgrade-system-tests-33:test > Task::streams:upgrade-system-tests-34:test > Task::streams:upgrade-system-tests-35:test > Task::streams:upgrade-system-tests-36:test > Task::streams:upgrade-system-tests-37:test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16794) Can't open videos in streams documentation
[ https://issues.apache.org/jira/browse/KAFKA-16794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16794: Component/s: streams > Can't open videos in streams documentation > -- > > Key: KAFKA-16794 > URL: https://issues.apache.org/jira/browse/KAFKA-16794 > Project: Kafka > Issue Type: Bug > Components: docs, streams >Reporter: Kuan Po Tseng >Priority: Minor > Attachments: IMG_4445.png, image.png > > > Can't open videos in page [https://kafka.apache.org/documentation/streams/] > Open console in chrome browser and it shows error message: > {{Refused to frame 'https://www.youtube.com/' because it violates the > following Content Security Policy directive: "frame-src 'self'".}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils
[ https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848021#comment-17848021 ] Matthias J. Sax commented on KAFKA-15143: - As pointed out on KAFKA-15242 cf comments, we also need to do something about `FixedKeyRecord`, because it does not have (and should not have) a public constructor. I'll point this out on the DISCUSS thread of the KIP, too. > MockFixedKeyProcessorContext is missing from test-utils > --- > > Key: KAFKA-15143 > URL: https://issues.apache.org/jira/browse/KAFKA-15143 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 3.5.0 >Reporter: Tomasz Kaszuba >Assignee: Shashwat Pandey >Priority: Major > Labels: needs-kip > > I am trying to test a ContextualFixedKeyProcessor but it is not possible to > call the init method from within a unit test since the MockProcessorContext > doesn't implement > {code:java} > FixedKeyProcessorContext {code} > but only > {code:java} > ProcessorContext > {code} > Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848020#comment-17848020 ] Matthias J. Sax commented on KAFKA-15242: - I don't think that `TestRecord` has anything to do with it, because `TestRecord` is not used in combination with `MockProcessorContext`, but only in combination with the `TopologyTestDriver` (and corresponding `TestInputTopic` and `TestOutputTopic`). I agree though, that we need some more helper class, because `FixedKeyRecord` objects cannot be instantiated directly (no public constructor). Thanks for the call out – the KIP needs to be extended accordingly – we would have missed this... This ticket did not have this dependency in its description either though. I think we can still close it as duplicate, and add anything missing to the other ticket? > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Prototype: Inherit the maxParallelForks from Jenkins CPU count [kafka]
gharris1727 opened a new pull request, #16009: URL: https://github.com/apache/kafka/pull/16009 The maxParallelForks is usually inherited from the CPU count on developer machines, and hardcoded to 2 in Jenkins. This is an experiment to try increasing the parallelism in Jenkins. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.
[ https://issues.apache.org/jira/browse/KAFKA-16197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-16197. - Fix Version/s: 3.8.0 Resolution: Fixed > Connect Worker poll timeout prints Consumer poll timeout specific warnings. > --- > > Key: KAFKA-16197 > URL: https://issues.apache.org/jira/browse/KAFKA-16197 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Fix For: 3.8.0 > > > When a Connect worker's poll timeout expires in Connect, the log lines that > we see are: > {noformat} > consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records. > {noformat} > and the reason for leaving the group is > {noformat} > Member XX sending LeaveGroup request to coordinator XX due to consumer poll > timeout has expired. > {noformat} > which is specific to Consumers and not to Connect workers. The log line above > in specially misleading because the config `max.poll.interval.ms` is not > configurable for a Connect worker and could make someone believe that the > logs are being written for Sink Connectors and not for Connect worker. > Ideally, we should print something specific to Connect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
gharris1727 merged PR #15305: URL: https://github.com/apache/kafka/pull/15305 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
gharris1727 commented on PR #15305: URL: https://github.com/apache/kafka/pull/15305#issuecomment-2121455690 Test failures appear unrelated, and the tests pass for me locally. LGTM, and thanks @vamossagar12 for the fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15494) Remove deprecated calls in build.gradle for preparing future upgrades
[ https://issues.apache.org/jira/browse/KAFKA-15494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848008#comment-17848008 ] Greg Harris commented on KAFKA-15494: - [~bmscomp] Do you have any progress here? I subdivided this task into multiple parts in KAFKA-16800 so that multiple contributors may make some progress. Perhaps you can review those changes, or take one of the tasks for yourself? > Remove deprecated calls in build.gradle for preparing future upgrades > - > > Key: KAFKA-15494 > URL: https://issues.apache.org/jira/browse/KAFKA-15494 > Project: Kafka > Issue Type: Improvement >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Major > > On the purpose of preparing the future upgrades of {*}Gradle wrapper{*}, we > need to get ride of the deprecated calls of certains methods as tasks > registrations, this will make the future upgrades to *Gradle 9* where plenty > of methods are marked for removal for this future big release, that will for > sure brings support for *JDK 21* > > Running *Gradle* build with *--warning-mode all* reveals much warnings and > much deprecations for removal, and we have to be prepared from now to the > future changes > > > This is an example of deprecation warning > > {code:java} > Build file '/Users/bmscomp/codes/kafka/build.gradle': line 3116 > The org.gradle.api.plugins.Convention type has been deprecated. This is > scheduled to be removed in Gradle 9.0. Consult the upgrading guide for > further information: > https://docs.gradle.org/8.2.1/userguide/upgrading_version_8.html#deprecated_access_to_conventions > at > build_bpyr04xfzz0tpxxyqu97xn8xy$_run_closure58.doCall(/Users/bmscomp/codes/kafka/build.gradle:3116) > (Run with --stacktrace to get the full stack trace of this > deprecation warning.) > at > build_bpyr04xfzz0tpxxyqu97xn8xy.run(/Users/bmscomp/codes/kafka/build.gradle:3115) > (Run with --stacktrace to get the full stack trace of this > deprecation warning.) {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests
[ https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16801: Labels: newbie (was: ) > Streams upgrade :test target doesn't find any junit tests > - > > Key: KAFKA-16801 > URL: https://issues.apache.org/jira/browse/KAFKA-16801 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Greg Harris >Priority: Major > Labels: newbie > > No test executed. This behavior has been deprecated. > This will fail with an error in Gradle 9.0. > There are test sources present but no test was executed. Please check your > test configuration. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed] > > 23 usages > > Task::streams:upgrade-system-tests-0100:test > Task::streams:upgrade-system-tests-0101:test > Task::streams:upgrade-system-tests-0102:test > Task::streams:upgrade-system-tests-0110:test > Task::streams:upgrade-system-tests-10:test > Task::streams:upgrade-system-tests-11:test > Task::streams:upgrade-system-tests-20:test > Task::streams:upgrade-system-tests-21:test > Task::streams:upgrade-system-tests-22:test > Task::streams:upgrade-system-tests-23:test > Task::streams:upgrade-system-tests-24:test > Task::streams:upgrade-system-tests-25:test > Task::streams:upgrade-system-tests-26:test > Task::streams:upgrade-system-tests-27:test > Task::streams:upgrade-system-tests-28:test > Task::streams:upgrade-system-tests-30:test > Task::streams:upgrade-system-tests-31:test > Task::streams:upgrade-system-tests-32:test > Task::streams:upgrade-system-tests-33:test > Task::streams:upgrade-system-tests-34:test > Task::streams:upgrade-system-tests-35:test > Task::streams:upgrade-system-tests-36:test > Task::streams:upgrade-system-tests-37:test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16805) Stop using a ClosureBackedAction to configure Spotbugs reports
[ https://issues.apache.org/jira/browse/KAFKA-16805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16805: Labels: newbie (was: ) > Stop using a ClosureBackedAction to configure Spotbugs reports > -- > > Key: KAFKA-16805 > URL: https://issues.apache.org/jira/browse/KAFKA-16805 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Priority: Major > Labels: newbie > > The org.gradle.util.ClosureBackedAction type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_7.html#org_gradle_util_reports_deprecations] > > 1 usage > [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L745-L749] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
[ https://issues.apache.org/jira/browse/KAFKA-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16803: Labels: newbie (was: ) > Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil > > > Key: KAFKA-16803 > URL: https://issues.apache.org/jira/browse/KAFKA-16803 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Priority: Major > Labels: newbie > > The org.gradle.util.ConfigureUtil type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations] > 2 usages > Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin > Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16806) Explicitly declare JUnit dependencies for all test modules
Greg Harris created KAFKA-16806: --- Summary: Explicitly declare JUnit dependencies for all test modules Key: KAFKA-16806 URL: https://issues.apache.org/jira/browse/KAFKA-16806 Project: Kafka Issue Type: Sub-task Reporter: Greg Harris The automatic loading of test framework implementation dependencies has been deprecated. This is scheduled to be removed in Gradle 9.0. Declare the desired test framework directly on the test suite or explicitly declare the test framework implementation dependencies on the test's runtime classpath. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_framework_implementation_dependencies] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16805) Stop using a ClosureBackedAction to configure Spotbugs reports
Greg Harris created KAFKA-16805: --- Summary: Stop using a ClosureBackedAction to configure Spotbugs reports Key: KAFKA-16805 URL: https://issues.apache.org/jira/browse/KAFKA-16805 Project: Kafka Issue Type: Sub-task Reporter: Greg Harris The org.gradle.util.ClosureBackedAction type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_7.html#org_gradle_util_reports_deprecations] 1 usage [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L745-L749] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16802) Move build.gradle java version information inside of a java block
[ https://issues.apache.org/jira/browse/KAFKA-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16802: Description: The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] was: The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] The org.gradle.api.plugins.BasePluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation] 1 usage Script:build.gradle > Move build.gradle java version information inside of a java block > - > > Key: KAFKA-16802 > URL: https://issues.apache.org/jira/browse/KAFKA-16802 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Priority: Major > Labels: newbie > > The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] > > [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16804) Replace gradle archivesBaseName with archivesName
[ https://issues.apache.org/jira/browse/KAFKA-16804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16804: Description: The BasePluginExtension.archivesBaseName property has been deprecated. This is scheduled to be removed in Gradle 9.0. Please use the archivesName property instead. [Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName] 1 usage Script:build.gradle The org.gradle.api.plugins.BasePluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation] 1 usage Script:build.gradle was: The BasePluginExtension.archivesBaseName property has been deprecated. This is scheduled to be removed in Gradle 9.0. Please use the archivesName property instead. [Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName] 1 usage Script:build.gradle > Replace gradle archivesBaseName with archivesName > - > > Key: KAFKA-16804 > URL: https://issues.apache.org/jira/browse/KAFKA-16804 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Priority: Major > Labels: newbie > > The BasePluginExtension.archivesBaseName property has been deprecated. > This is scheduled to be removed in Gradle 9.0. > Please use the archivesName property instead. > [Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName] > 1 usage > Script:build.gradle > > The org.gradle.api.plugins.BasePluginConvention type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation] > 1 usage > Script:build.gradle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16802) Move build.gradle java version information inside of a java block
[ https://issues.apache.org/jira/browse/KAFKA-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16802: Description: The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] The org.gradle.api.plugins.BasePluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation] 1 usage Script:build.gradle was: The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] > Move build.gradle java version information inside of a java block > - > > Key: KAFKA-16802 > URL: https://issues.apache.org/jira/browse/KAFKA-16802 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Priority: Major > Labels: newbie > > The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] > > [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] > > The org.gradle.api.plugins.BasePluginConvention type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation] > 1 usage > Script:build.gradle -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1607404895 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ## @@ -1974,6 +1974,15 @@ public Set nonSourceChangelogTopics() { return topicConfigs; } +/** + * + * @return the set of changelog topics, which includes both source changelog topics and non + * source changelog topics. + */ +public Set changelogTopics() { +return Collections.unmodifiableSet(new HashSet<>(stateChangelogTopics.keySet())); Review Comment: I think you can skip the new HashSet step, that's pretty much redundant with the unmodifiableSet and since we don't plan on modifying the returned set, it's better to just wrap the keySet directly to save a bunch of unnecessary copying -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16804) Replace gradle archivesBaseName with archivesName
Greg Harris created KAFKA-16804: --- Summary: Replace gradle archivesBaseName with archivesName Key: KAFKA-16804 URL: https://issues.apache.org/jira/browse/KAFKA-16804 Project: Kafka Issue Type: Sub-task Reporter: Greg Harris The BasePluginExtension.archivesBaseName property has been deprecated. This is scheduled to be removed in Gradle 9.0. Please use the archivesName property instead. [Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName] 1 usage Script:build.gradle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
Greg Harris created KAFKA-16803: --- Summary: Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil Key: KAFKA-16803 URL: https://issues.apache.org/jira/browse/KAFKA-16803 Project: Kafka Issue Type: Sub-task Reporter: Greg Harris The org.gradle.util.ConfigureUtil type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations] 2 usages Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16802) Move build.gradle java version information inside of a java block
Greg Harris created KAFKA-16802: --- Summary: Move build.gradle java version information inside of a java block Key: KAFKA-16802 URL: https://issues.apache.org/jira/browse/KAFKA-16802 Project: Kafka Issue Type: Sub-task Reporter: Greg Harris The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. This is scheduled to be removed in Gradle 9.0. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on PR #15968: URL: https://github.com/apache/kafka/pull/15968#issuecomment-2121393282 I would recommend taking a look at where we are passing the topic ID through and the checks we do. If we think it is useful to ensure we are writing to the right topic, we should do it, but if it is just adding complexity, we may want to consider changing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found
[ https://issues.apache.org/jira/browse/KAFKA-16799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16799: -- Labels: consumer-threading-refactor (was: ) > NetworkClientDelegate is not backing off if the node is not found > - > > Key: KAFKA-16799 > URL: https://issues.apache.org/jira/browse/KAFKA-16799 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > When performing stress testing, I found that AsycnKafkaConsumer's network > client delegate isn't backing off if the node is not ready, causing a large > number of: > {code:java} > 358 [2024-05-20 22:59:02,591] DEBUG [Consumer > clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, > groupId=consumer-groups-test-5] Node is not ready, handle the request in the > next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd > ev.cloud:9092 (id: 2147483643 rack: null), > request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5', > memberId='', memberEpoch=0, instanceId=null, rackId=null, > rebalanceTimeoutMs=10, subscri > bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, > topicPartitions=[]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761, > node=Optional[b4-pkc-devcmkz697.us-west-2.aws .devel.cpdev.cloud:9092 > (id: 2147483643 rack: null)], > timer=org.apache.kafka.common.utils.Timer@649fffad} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code} > show up in the log. > What should have happened is: 1. node is not ready 2. exponential back off 3. > retry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found
[ https://issues.apache.org/jira/browse/KAFKA-16799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16799: -- Component/s: clients > NetworkClientDelegate is not backing off if the node is not found > - > > Key: KAFKA-16799 > URL: https://issues.apache.org/jira/browse/KAFKA-16799 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > When performing stress testing, I found that AsycnKafkaConsumer's network > client delegate isn't backing off if the node is not ready, causing a large > number of: > {code:java} > 358 [2024-05-20 22:59:02,591] DEBUG [Consumer > clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, > groupId=consumer-groups-test-5] Node is not ready, handle the request in the > next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd > ev.cloud:9092 (id: 2147483643 rack: null), > request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5', > memberId='', memberEpoch=0, instanceId=null, rackId=null, > rebalanceTimeoutMs=10, subscri > bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, > topicPartitions=[]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761, > node=Optional[b4-pkc-devcmkz697.us-west-2.aws .devel.cpdev.cloud:9092 > (id: 2147483643 rack: null)], > timer=org.apache.kafka.common.utils.Timer@649fffad} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code} > show up in the log. > What should have happened is: 1. node is not ready 2. exponential back off 3. > retry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests
Greg Harris created KAFKA-16801: --- Summary: Streams upgrade :test target doesn't find any junit tests Key: KAFKA-16801 URL: https://issues.apache.org/jira/browse/KAFKA-16801 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Greg Harris No test executed. This behavior has been deprecated. This will fail with an error in Gradle 9.0. There are test sources present but no test was executed. Please check your test configuration. [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed] 23 usages Task::streams:upgrade-system-tests-0100:test Task::streams:upgrade-system-tests-0101:test Task::streams:upgrade-system-tests-0102:test Task::streams:upgrade-system-tests-0110:test Task::streams:upgrade-system-tests-10:test Task::streams:upgrade-system-tests-11:test Task::streams:upgrade-system-tests-20:test Task::streams:upgrade-system-tests-21:test Task::streams:upgrade-system-tests-22:test Task::streams:upgrade-system-tests-23:test Task::streams:upgrade-system-tests-24:test Task::streams:upgrade-system-tests-25:test Task::streams:upgrade-system-tests-26:test Task::streams:upgrade-system-tests-27:test Task::streams:upgrade-system-tests-28:test Task::streams:upgrade-system-tests-30:test Task::streams:upgrade-system-tests-31:test Task::streams:upgrade-system-tests-32:test Task::streams:upgrade-system-tests-33:test Task::streams:upgrade-system-tests-34:test Task::streams:upgrade-system-tests-35:test Task::streams:upgrade-system-tests-36:test Task::streams:upgrade-system-tests-37:test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
cmccabe opened a new pull request, #16008: URL: https://github.com/apache/kafka/pull/16008 Fix the code in the RaftControllerNodeProvider to query RaftManager to find Node information, rather than consulting a static map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16800) Resolve Gradle 9.0 deprecations
Greg Harris created KAFKA-16800: --- Summary: Resolve Gradle 9.0 deprecations Key: KAFKA-16800 URL: https://issues.apache.org/jira/browse/KAFKA-16800 Project: Kafka Issue Type: Task Reporter: Greg Harris Gradle prints the following warning in our build: {noformat} Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0.{noformat} We should try to resolve these build warnings to prepare for the future release of Gradle 9. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found
Philip Nee created KAFKA-16799: -- Summary: NetworkClientDelegate is not backing off if the node is not found Key: KAFKA-16799 URL: https://issues.apache.org/jira/browse/KAFKA-16799 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee When performing stress testing, I found that AsycnKafkaConsumer's network client delegate isn't backing off if the node is not ready, causing a large number of: {code:java} 358 [2024-05-20 22:59:02,591] DEBUG [Consumer clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, groupId=consumer-groups-test-5] Node is not ready, handle the request in the next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd ev.cloud:9092 (id: 2147483643 rack: null), request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5', memberId='', memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=10, subscri bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, topicPartitions=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761, node=Optional[b4-pkc-devcmkz697.us-west-2.aws .devel.cpdev.cloud:9092 (id: 2147483643 rack: null)], timer=org.apache.kafka.common.utils.Timer@649fffad} (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code} show up in the log. What should have happened is: 1. node is not ready 2. exponential back off 3. retry -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1607383820 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig, */ private def appendToLocalLog(internalTopicsAllowed: Boolean, origin: AppendOrigin, - entriesPerPartition: Map[TopicPartition, MemoryRecords], + entriesPerPartition: Map[TopicIdPartition, MemoryRecords], Review Comment: is there a reason to pass this data structure here if we are not using the ID to check the append at the log level? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1607379861 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -604,40 +604,53 @@ class KafkaApis(val requestChannel: RequestChannel, } } -val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() -val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() -val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() -val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() +val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() +val nonExistingTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() +val invalidRequestResponses = mutable.Map[TopicIdPartition, PartitionResponse]() +val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]() +val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)] + +produceRequest.data.topicData.forEach { topic => + topic.partitionData.forEach { partition => +val topicIdIsMissing = topic.topicId == null || topic.topicId == Uuid.ZERO_UUID Review Comment: should we be deciding which fields to grab from based on the request version? And do we ever expect a null topic id? I wouldn't think so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1607379861 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -604,40 +604,53 @@ class KafkaApis(val requestChannel: RequestChannel, } } -val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() -val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() -val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() -val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() +val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() +val nonExistingTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() +val invalidRequestResponses = mutable.Map[TopicIdPartition, PartitionResponse]() +val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]() +val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)] + +produceRequest.data.topicData.forEach { topic => + topic.partitionData.forEach { partition => +val topicIdIsMissing = topic.topicId == null || topic.topicId == Uuid.ZERO_UUID Review Comment: should we be deciding which fields to grab from based on the request version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Bump requests from 2.24.0 to 2.32.0 in /tests [kafka]
dependabot[bot] opened a new pull request, #16007: URL: https://github.com/apache/kafka/pull/16007 Bumps [requests](https://github.com/psf/requests) from 2.24.0 to 2.32.0. Release notes Sourced from https://github.com/psf/requests/releases;>requests's releases. v2.32.0 2.32.0 (2024-05-20) PYCON US 2024 EDITION Security Fixed an issue where setting verify=False on the first request from a Session will cause subsequent requests to the same origin to also ignore cert verification, regardless of the value of verify. (https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56;>https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56) Improvements verify=True now reuses a global SSLContext which should improve request time variance between first and subsequent requests. It should also minimize certificate load time on Windows systems when using a Python version built with OpenSSL 3.x. (https://redirect.github.com/psf/requests/issues/6667;>#6667) Requests now supports optional use of character detection (chardet or charset_normalizer) when repackaged or vendored. This enables pip and other projects to minimize their vendoring surface area. The Response.text() and apparent_encoding APIs will default to utf-8 if neither library is present. (https://redirect.github.com/psf/requests/issues/6702;>#6702) Bugfixes Fixed bug in length detection where emoji length was incorrectly calculated in the request content-length. (https://redirect.github.com/psf/requests/issues/6589;>#6589) Fixed deserialization bug in JSONDecodeError. (https://redirect.github.com/psf/requests/issues/6629;>#6629) Fixed bug where an extra leading / (path separator) could lead urllib3 to unnecessarily reparse the request URI. (https://redirect.github.com/psf/requests/issues/6644;>#6644) Deprecations Requests has officially added support for CPython 3.12 (https://redirect.github.com/psf/requests/issues/6503;>#6503) Requests has officially added support for PyPy 3.9 and 3.10 (https://redirect.github.com/psf/requests/issues/6641;>#6641) Requests has officially dropped support for CPython 3.7 (https://redirect.github.com/psf/requests/issues/6642;>#6642) Requests has officially dropped support for PyPy 3.7 and 3.8 (https://redirect.github.com/psf/requests/issues/6641;>#6641) Documentation Various typo fixes and doc improvements. Packaging Requests has started adopting some modern packaging practices. The source files for the projects (formerly requests) is now located in src/requests in the Requests sdist. (https://redirect.github.com/psf/requests/issues/6506;>#6506) Starting in Requests 2.33.0, Requests will migrate to a PEP 517 build system using hatchling. This should not impact the average user, but extremely old versions of packaging utilities may have issues with the new packaging format. New Contributors https://github.com/matthewarmand;>@matthewarmand made their first contribution in https://redirect.github.com/psf/requests/pull/6258;>psf/requests#6258 https://github.com/cpzt;>@cpzt made their first contribution in https://redirect.github.com/psf/requests/pull/6456;>psf/requests#6456 ... (truncated) Changelog Sourced from https://github.com/psf/requests/blob/main/HISTORY.md;>requests's changelog. 2.32.0 (2024-05-20) Security Fixed an issue where setting verify=False on the first request from a Session will cause subsequent requests to the same origin to also ignore cert verification, regardless of the value of verify. (https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56;>https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56) Improvements verify=True now reuses a global SSLContext which should improve request time variance between first and subsequent requests. It should also minimize certificate load time on Windows systems when using a Python version built with OpenSSL 3.x. (https://redirect.github.com/psf/requests/issues/6667;>#6667) Requests now supports optional use of character detection (chardet or charset_normalizer) when repackaged or vendored. This enables pip and other projects to minimize their vendoring surface area. The Response.text() and apparent_encoding APIs will default to utf-8 if neither library is present. (https://redirect.github.com/psf/requests/issues/6702;>#6702) Bugfixes Fixed bug in length detection where emoji length was incorrectly calculated in the request content-length. (https://redirect.github.com/psf/requests/issues/6589;>#6589) Fixed deserialization bug in JSONDecodeError. (https://redirect.github.com/psf/requests/issues/6629;>#6629) Fixed bug where an extra leading / (path separator) could lead urllib3 to
[jira] [Assigned] (KAFKA-16515) Fix the ZK Metadata cache use of voter static configuration
[ https://issues.apache.org/jira/browse/KAFKA-16515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-16515: Assignee: Colin McCabe (was: José Armando García Sancio) > Fix the ZK Metadata cache use of voter static configuration > --- > > Key: KAFKA-16515 > URL: https://issues.apache.org/jira/browse/KAFKA-16515 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: José Armando García Sancio >Assignee: Colin McCabe >Priority: Major > Fix For: 3.8.0 > > > Looks like because of ZK migration to KRaft the ZK Metadata cache was changed > to read the voter static configuration. This needs to change to use the voter > nodes reported by the raft manager or the kraft client. > The injection code is in KafkaServer where it constructs > MetadataCache.zkMetadata. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]
cmccabe opened a new pull request, #16006: URL: https://github.com/apache/kafka/pull/16006 ZkMetadataCache could theoretically return KRaft controller information from a call to ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft controllers are not part of the set of brokers. In practice, this wasn't a concern since all the use-cases for ZkMetadataCache.getAliveBrokerNode center around finding coordinators, which will never be kraft controllers anyway. Still, cleaning up this code reduces confusion and is helpful for removing places where static controller configurations are used, as part of KIP-853. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1607367455 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -891,11 +897,15 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); -ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic()); -if (tpData == null) { -tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic()); -tpd.add(tpData); +Optional topicProduceData = canUseTopicId ? +Optional.ofNullable(tpd.find(tp.topic(), topicIds.get(tp.topic( : +tpd.stream().filter(data -> data.name().equals(tp.topic())).findFirst(); + +ProduceRequestData.TopicProduceData tpData = topicProduceData.orElse(new ProduceRequestData.TopicProduceData().setName(tp.topic())); +if (canUseTopicId) { +tpData.setTopicId(topicIds.get(tp.topic())); } +tpd.add(tpData); Review Comment: Hmm -- we don't need to add if we already added this right? I also wonder if we can cache by topic name so we don't have to findFirst -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1607363954 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { -TopicPartition tp = new TopicPartition(r.name(), p.index()); +// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. +String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); Review Comment: What do we do if metadata has refreshed and is no longer in the metadata? For fetch it is a bit different since we have the session logic, and can handle missing topics. I would recommend writing through a few cases where the server and client have/don't have the topic ID to reason about the upgrade case/downgrade case/deletions/reassignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16645. - Resolution: Resolved > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Assignee: Igor Soarez >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of service │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2023-52425 │ > │ ├┤ ││ > ├───┼─┤ > │ │ CVE-2024-28757 │ ││ │ > 2.6.2-r0 │ expat: XML Entity Expansion │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2024-28757 │ > └──┴┴──┴┴───┴───┴─┘ > Looking at the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?] > that introduced the docker images, it seems we should release a bugfix when > high CVEs are detected. It would be good to investigate and assess whether > Kafka is impacted or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-16645: - Need to re-open to change the resolution, release_notes.py doesn't like the one I picked > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Assignee: Igor Soarez >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of service │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2023-52425 │ > │ ├┤ ││ > ├───┼─┤ > │ │ CVE-2024-28757 │ ││ │ > 2.6.2-r0 │ expat: XML Entity Expansion │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2024-28757 │ > └──┴┴──┴┴───┴───┴─┘ > Looking at the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?] > that introduced the docker images, it seems we should release a bugfix when > high CVEs are detected. It would be good to investigate and assess whether > Kafka is impacted or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on PR #15968: URL: https://github.com/apache/kafka/pull/15968#issuecomment-2121321915 > Topic name and Topic Id will become optional following the footstep of FetchRequest/FetchResponse My understanding is that all requests going forward will use ID and not name similar to fetch request. I believe that is what is in the PR, but the comment suggests otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add demo template for transactional client [kafka]
jolshan commented on code in PR #15913: URL: https://github.com/apache/kafka/pull/15913#discussion_r1607348108 ## examples/src/main/java/kafka/examples/TransactionalClientDemo.java: ## @@ -0,0 +1,153 @@ +package kafka.examples; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +import static java.time.Duration.ofSeconds; +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; Review Comment: nit: whenever we want to merge this, we can't use wildcard imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add demo template for transactional client [kafka]
jolshan commented on code in PR #15913: URL: https://github.com/apache/kafka/pull/15913#discussion_r1607347641 ## examples/src/main/java/kafka/examples/TransactionalClientDemo.java: ## @@ -0,0 +1,153 @@ +package kafka.examples; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +import static java.time.Duration.ofSeconds; +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.*; + +/** + * This class demonstrates a transactional Kafka client application that consumes messages from an input topic, + * processes them to generate word count statistics, and produces the results to an output topic. + * It utilizes Kafka's transactional capabilities to ensure exactly-once processing semantics. + * + * The application continuously polls for records from the input topic, processes them, and commits the offsets + * in a transactional manner. In case of exceptions or errors, it handles them appropriately, either aborting the + * transaction and resetting to the last committed positions, or restarting the application. + * + */ +public class TransactionalClientDemo { + +private static final String CONSUMER_GROUP_ID = "my-group-id"; +private static final String OUTPUT_TOPIC = "output"; +private static final String INPUT_TOPIC = "input"; +private static KafkaConsumer consumer; +private static KafkaProducer producer; + +public static void main(String[] args) { +initializeApplication(); + +boolean isRunning = true; +// Continuously poll for records +while(isRunning) { +try { +try { +// Poll records from Kafka for a timeout of 60 seconds +ConsumerRecords records = consumer.poll(ofSeconds(60)); + +// Process records to generate word count map +Map wordCountMap = new HashMap<>(); + +for (ConsumerRecord record : records) { +String[] words = record.value().split(" "); +for (String word : words) { +wordCountMap.merge(word, 1, Integer::sum); +} +} + +// Begin transaction +producer.beginTransaction(); + +// Produce word count results to output topic +wordCountMap.forEach((key, value) -> +producer.send(new ProducerRecord<>(OUTPUT_TOPIC, key, value.toString(; + +// Determine offsets to commit +Map offsetsToCommit = new HashMap<>(); +for (TopicPartition partition : records.partitions()) { +List> partitionedRecords = records.records(partition); +long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); +offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); +} + +// Send offsets to transaction for atomic commit +producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID); + +// Commit transaction +producer.commitTransaction(); +} catch (AbortableTransactionException e) { +// Abortable Exception: Handle Kafka exception by aborting transaction. AbortTransaction path never throws abortable exception. Review Comment: More of a implementation discussion, but are we saying that producer.abortTransaction() should never throw such an exception? Or that we don't ever try to catch such an exception from abortTransaction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add demo template for transactional client [kafka]
jolshan commented on code in PR #15913: URL: https://github.com/apache/kafka/pull/15913#discussion_r1607340714 ## examples/src/main/java/kafka/examples/TransactionalClientDemo.java: ## @@ -0,0 +1,153 @@ +package kafka.examples; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +import static java.time.Duration.ofSeconds; +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.*; + +/** + * This class demonstrates a transactional Kafka client application that consumes messages from an input topic, + * processes them to generate word count statistics, and produces the results to an output topic. + * It utilizes Kafka's transactional capabilities to ensure exactly-once processing semantics. + * + * The application continuously polls for records from the input topic, processes them, and commits the offsets + * in a transactional manner. In case of exceptions or errors, it handles them appropriately, either aborting the + * transaction and resetting to the last committed positions, or restarting the application. + * + */ +public class TransactionalClientDemo { + +private static final String CONSUMER_GROUP_ID = "my-group-id"; +private static final String OUTPUT_TOPIC = "output"; +private static final String INPUT_TOPIC = "input"; +private static KafkaConsumer consumer; +private static KafkaProducer producer; + +public static void main(String[] args) { +initializeApplication(); + +boolean isRunning = true; +// Continuously poll for records +while(isRunning) { Review Comment: nit: `while (isRunning)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16779) Kafka retains logs past specified retention
[ https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847997#comment-17847997 ] Nicholas Feinberg commented on KAFKA-16779: --- When we explicitly set topics' retention to 4d (34560ms), our brokers immediately expired the surprisingly old logs. I've confirmed that the same setting is present in the brokers' `server.properties` file - that is, they have `log.retention.hours=96`. I've also checked and confirmed that topics do not have an explicitly set retention that would override this. > Kafka retains logs past specified retention > --- > > Key: KAFKA-16779 > URL: https://issues.apache.org/jira/browse/KAFKA-16779 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Nicholas Feinberg >Priority: Major > Labels: expiration, retention > Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, > kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, > state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz > > > In a Kafka cluster with all topics set to four days of retention or longer > (34560ms), most brokers seem to be retaining six days of data. > This is true even for topics which have high throughput (500MB/s, 50k msgs/s) > and thus are regularly rolling new log segments. We observe this unexpectedly > high retention both via disk usage statistics and by requesting the oldest > available messages from Kafka. > Some of these brokers crashed with an 'mmap failed' error (attached). When > those brokers started up again, they returned to the expected four days of > retention. > Manually restarting brokers also seems to cause them to return to four days > of retention. Demoting and promoting brokers only has this effect on a small > part of the data hosted on a broker. > These hosts had ~170GiB of free memory available. We saw no signs of pressure > on either system or JVM heap memory before or after they reported this error. > Committed memory seems to be around 10%, so this doesn't seem to be an > overcommit issue. > This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). > Prior to the upgrade, it was running on Kafka 2.4. > We last reduced retention for ops on May 7th, after which we restored > retention to our default of four days. This was the second time we've > temporarily reduced and restored retention since the upgrade. This problem > did not manifest the previous time we did so, nor did it manifest on our > other Kafka 3.7 clusters. > We are running on AWS > [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We > have 23 brokers, each with 24 disks. We're running in a JBOD configuration > (i.e. unraided). > Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, > we're still using Zookeeper. > Sample broker logs are attached. The 05-12 and 05-14 logs are from separate > hosts. Please let me know if I can provide any further information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
cmccabe commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1607334284 ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { +private final Logger log; +private final int id; +private final Runnable refreshRegistrationCallback; + +/** + * Create the tracker. + * + * @param idThe ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ +public BrokerRegistrationTracker( +int id, +List targetDirectories, +Runnable refreshRegistrationCallback +) { +this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). +logger(BrokerRegistrationTracker.class); +this.id = id; +this.refreshRegistrationCallback = refreshRegistrationCallback; +} + +@Override +public String name() { +return "BrokerRegistrationTracker(id=" + id + ")"; +} + +@Override +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +boolean checkBrokerRegistration = false; +if (delta.featuresDelta() != null) { +if (delta.metadataVersionChanged().isPresent()) { Review Comment: If a metadata version change happens, we may be in a situation where we need to re-register. That's what this clause reflects. I agree it would be good to have a test that hits this specifically. Looks like your test below will do so -- I have added it. ## metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
cmccabe commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1607334284 ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { +private final Logger log; +private final int id; +private final Runnable refreshRegistrationCallback; + +/** + * Create the tracker. + * + * @param idThe ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ +public BrokerRegistrationTracker( +int id, +List targetDirectories, +Runnable refreshRegistrationCallback +) { +this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). +logger(BrokerRegistrationTracker.class); +this.id = id; +this.refreshRegistrationCallback = refreshRegistrationCallback; +} + +@Override +public String name() { +return "BrokerRegistrationTracker(id=" + id + ")"; +} + +@Override +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +boolean checkBrokerRegistration = false; +if (delta.featuresDelta() != null) { +if (delta.metadataVersionChanged().isPresent()) { Review Comment: If a metadata version change happens, we may be in a situation where we need to re-register. That's what this clause reflects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1607136651 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -420,12 +420,11 @@ public CompletableFuture heartbeat( ); } -// Using a read operation is okay here as we ignore the last committed offset in the snapshot registry. -// This means we will read whatever is in the latest snapshot, which is how the old coordinator behaves. -return runtime.scheduleReadOperation( +return runtime.scheduleWriteOperation( "classic-group-heartbeat", topicPartitionFor(request.groupId()), -(coordinator, __) -> coordinator.classicGroupHeartbeat(context, request) +Duration.ofMillis(config.offsetCommitTimeoutMs), Review Comment: not necessarily a comment for this PR but i wonder if we should change the name of this config since it's being used for all writes. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param contextThe request context. * @param requestThe actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ -public HeartbeatResponseData classicGroupHeartbeat( +public CoordinatorResult classicGroupHeartbeat( Review Comment: maybe i'm missing something but i don't see where we actually initialize CoordinatorResult with records to write to the log ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { +error = Errors.REBALANCE_IN_PROGRESS; +scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); Review Comment: we are saying that we cancel the join timeout when we first convert to consumer group, then when we have a group epoch bump we tell the classic group member we're rebalancing and they should send a join request. is my understanding correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured
[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847989#comment-17847989 ] Muralidhar Basani commented on KAFKA-16790: --- [~christo_lolov] have a pr open. > Calls to RemoteLogManager are made before it is configured > -- > > Key: KAFKA-16790 > URL: https://issues.apache.org/jira/browse/KAFKA-16790 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.8.0 >Reporter: Christo Lolov >Assignee: Muralidhar Basani >Priority: Major > > BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) > which in turn calls RemoteLogManager#onLeadershipChange (2), however, the > RemoteLogManager is configured after the BrokerMetadataPublisher starts > running (3, 4). This is incorrect, we either need to initialise the > RemoteLogManager before we start the BrokerMetadataPublisher or we need to > skip calls to onLeadershipChange if the RemoteLogManager is not initialised. > (1) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] > (2) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] > (3) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] > (4) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] > The way to reproduce the problem is by looking at the following changes > {code:java} > config/kraft/broker.properties | 10 ++ > .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++- > core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +- > 3 files changed, 22 insertions(+), 2 deletions(-)diff --git > a/config/kraft/broker.properties b/config/kraft/broker.properties > index 2d15997f28..39d126cf87 100644 > --- a/config/kraft/broker.properties > +++ b/config/kraft/broker.properties > @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30 > + > +remote.log.storage.system.enable=true > +remote.log.metadata.manager.listener.name=PLAINTEXT > +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage > +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar > +remote.log.storage.manager.impl.prefix=rsm.config. > +remote.log.metadata.manager.impl.prefix=rlmm.config. > +rsm.config.dir=/tmp/kafka-remote-storage > +rlmm.config.remote.log.metadata.topic.replication.factor=1 > +log.retention.check.interval.ms=1000 > diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > index 6555b7c0cd..e84a072abc 100644 > --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { > // The endpoint for remote log metadata manager to connect to > private Optional endpoint = Optional.empty(); > private boolean closed = false; > + private boolean up = false; > > /** > * Creates RemoteLogManager instance with the given arguments. > @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { > // in connecting to the brokers or remote storages. > configureRSM(); > configureRLMM(); > + up = true; > } > > public RemoteStorageManager storageManager() { > @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { > public void onLeadershipChange(Set partitionsBecomeLeader, > Set partitionsBecomeFollower, > Map topicIds) { > - LOGGER.debug("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > + if (!up) { > + LOGGER.error("NullPointerException"); > + return; > + } > + LOGGER.error("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > > Map leaderPartitionsWithLeaderEpoch = > filterPartitions(partitionsBecomeLeader) > .collect(Collectors.toMap( > diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala > b/core/src/main/scala/kafka/server/ReplicaManager.scala > index 35499430d6..bd3f41c3d6 100644 > --- a/core/src/main/scala/kafka/server/ReplicaManager.scala > +++
Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]
muralibasani commented on PR #16005: URL: https://github.com/apache/kafka/pull/16005#issuecomment-2121170470 @clolov would you like to take a look ? I have taken the first approach of configuring rlm before initializing publishers, instead of skipping if they don't exist. There can be multiple places where rlm configuration is required. So thought this is better. Haven't added any explicit tests here, but do you suggest any ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]
muralibasani opened a new pull request, #16005: URL: https://github.com/apache/kafka/pull/16005 Details of the issue is defined in https://issues.apache.org/jira/browse/KAFKA-16790 In BrokerServer.scala, brokerMetadataPublishers are configured and when there are metadata updates remoteLogManager is not configured by then. Ex : remoteLogManager.foreach(rlm => rlm.onLeadershipChange(partitionsBecomeLeader.asJava, partitionsBecomeFollower.asJava, topicIds)) in ReplicaManager is invoked after publishers are instantiated, and here rlm has relevant managers configured. This change makes sure rlm is configured before the brokerMetadataPublishers initialization. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [X] Verify test coverage and CI build status - [X] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16692: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka [kafka]
soarez commented on PR #15971: URL: https://github.com/apache/kafka/pull/15971#issuecomment-2121128256 > Not sure if we want to amend the commit name to fix it. No, that could be very disruptive, not worth the trouble. It wasn't hard to find the JIRA despite the typo. > I've picked to 3.6 and 3.7. Nice. Thanks for closing the JIRA too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: ensure KafkaServerTestHarness::tearDown is always invoked [kafka]
chia7712 merged PR #15996: URL: https://github.com/apache/kafka/pull/15996 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
chia7712 commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1607152637 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -112,12 +115,15 @@ public void start(Map props) { @Override public void commit() { -// Offset syncs which were not emitted immediately due to their offset spacing should be sent periodically -// This ensures that low-volume topics aren't left with persistent lag at the end of the topic -promoteDelayedOffsetSyncs(); -// Publish any offset syncs that we've queued up, but have not yet been able to publish -// (likely because we previously reached our limit for number of outstanding syncs) -firePendingOffsetSyncs(); +// Handle delayed and pending offset syncs only when emit.offset-syncs.enabled set to true +if (emitOffsetSyncEnabled) { +// Offset syncs which were not emitted immediately due to their offset spacing should be sent periodically +// This ensures that low-volume topics aren't left with persistent lag at the end of the topic +promoteDelayedOffsetSyncs(); +// Publish any offset syncs that we've queued up, but have not yet been able to publish +// (likely because we previously reached our limit for number of outstanding syncs) +firePendingOffsetSyncs(); Review Comment: We don't need to create `offsetProducer` if `emitOffsetSyncEnabled` is false -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
gharris1727 commented on code in PR #16001: URL: https://github.com/apache/kafka/pull/16001#discussion_r1607071132 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -1039,21 +1039,42 @@ public static List> reverseTransform(String connName, return result; } -public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> taskProps) { +public boolean taskConfigsChanged( +ClusterConfigState configState, +String connName, +List> taskProps, +int connectorConfigHash +) { int currentNumTasks = configState.taskCount(connName); boolean result = false; if (taskProps.size() != currentNumTasks) { log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); result = true; } else { -for (int index = 0; index < currentNumTasks; index++) { +for (int index = 0; index < currentNumTasks && !result; index++) { Review Comment: this has the effect of hiding the later debug logs for other tasks, is that intentional? I don't know if anyone is relying on that information, but this is taking away debug information that might be useful. ## checkstyle/import-control.xml: ## @@ -574,6 +574,7 @@ + Review Comment: nit: duplicate ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -1039,21 +1039,42 @@ public static List> reverseTransform(String connName, return result; } -public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> taskProps) { +public boolean taskConfigsChanged( +ClusterConfigState configState, +String connName, +List> taskProps, +int connectorConfigHash +) { int currentNumTasks = configState.taskCount(connName); boolean result = false; if (taskProps.size() != currentNumTasks) { log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); result = true; } else { -for (int index = 0; index < currentNumTasks; index++) { +for (int index = 0; index < currentNumTasks && !result; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); if (!taskProps.get(index).equals(configState.taskConfig(taskId))) { log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index); result = true; } } +// Do a final check to see if runtime-controlled properties that affect tasks but may +// not be included in the connector-generated configs for them (such as converter overrides) +// have changed +if (!result) { Review Comment: None of this looks expensive to compute, WDYT about moving this outside the `else` branch and always comparing the hash? ## connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java: ## @@ -243,4 +249,29 @@ public static Map patchConfig( }); return result; } + +/** + * Generate a deterministic hash of the supplied config. For configurations + * with identical key-value pairs, this hash will always be the same. + * @param config the config to hash; may be null + * @return a hash of the config + */ +public static int configHash(Map config) { +if (config == null) +return 0; + +Map toHash = new TreeMap<>(config); + +byte[] serialized; +try { +serialized = OBJECT_MAPPER.writeValueAsBytes(toHash); Review Comment: AbstractMap (superclass of TreeMap) has a hashCode implementation which depends on the keys and values in the map. Did you consider using that method, and reject it? It looks like it could have fewer memory allocations and copying. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java: ## @@ -187,6 +193,17 @@ public Map rawTaskConfig(ConnectorTaskId task) { return taskConfigs.get(task); } +/** + * Get the hash of the connector config that was used to generate the + * latest set of task configs for the connector + * @param connectorName name of the connector + * @return the config hash, or null if the connector does not exist or + * no config hash for its latest set of tasks has been stored + */ +public Integer taskConfigHash(String connectorName) { Review Comment: I think this should be called connectorConfigHash, since it's not specific to any task. ##
Re: [PR] KAFKA-16692: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka [kafka]
jolshan commented on PR #15971: URL: https://github.com/apache/kafka/pull/15971#issuecomment-2120961394 Yes. I just noticed this too 臘♀️ Not sure if we want to amend the commit name to fix it. I've picked to 3.6 and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-16692. Fix Version/s: 3.6.3 Resolution: Fixed > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.7.1, 3.6.3, 3.8 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian
Re: [PR] MINOR: remove extra import from transactions tests [kafka]
jolshan merged PR #16000: URL: https://github.com/apache/kafka/pull/16000 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
jeffkbkim commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1607031651 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ## @@ -33,13 +35,21 @@ public class AssignmentSpec { */ private final SubscriptionType subscriptionType; -public AssignmentSpec( +/** + * Reverse lookup map representing partitions per topic and + * their member assignment. + */ +Map> partitionAssignments; Review Comment: should this also be private final, and should the user be able to modify the contents of the map? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) { return targetAssignment.getOrDefault(memberId, Assignment.EMPTY); } +/** + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ +public Map> partitionAssignments() { +return Collections.unmodifiableMap(partitionAssignments); +} + /** * Updates target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { +updatePartitionAssignments( +memberId, +targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), +newTargetAssignment +); targetAssignment.put(memberId, newTargetAssignment); } +/** + * Updates partition assignments of the topics. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + * + * Package private for testing. + */ +void updatePartitionAssignments( +String memberId, +Assignment oldTargetAssignment, +Assignment newTargetAssignment +) { +// Combine keys from both old and new assignments. +Set allTopicIds = new HashSet<>(); +allTopicIds.addAll(oldTargetAssignment.partitions().keySet()); +allTopicIds.addAll(newTargetAssignment.partitions().keySet()); + +for (Uuid topicId : allTopicIds) { +Set oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); +Set newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + +TimelineHashMap topicPartitionAssignment = partitionAssignments.computeIfAbsent( +topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size())) +); + +// Remove partitions that aren't present in the new assignment. +for (Integer partition : oldPartitions) { +if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) { Review Comment: on `memberId.equals(topicPartitionAssignment.get(partition))`: will this ever be false? it seems we would always assign a partition to a member only when it's been removed by the previous member, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847953#comment-17847953 ] Kirk True commented on KAFKA-16272: --- [~sagarrao]—both of the linked PRs are merged. Can this be marked as Resolved? Thanks! > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KIP 989 b [kafka]
nicktelford closed pull request #16003: KIP 989 b URL: https://github.com/apache/kafka/pull/16003 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 7) Simplify requirements for rack aware graphs [kafka]
apourchet opened a new pull request, #16004: URL: https://github.com/apache/kafka/pull/16004 Rack aware graphs don't actually need any topology information about the system, but rather require a simple ordered (not sorted) grouping of tasks. This PR changes the internal constructors and some interface signatures of RackAwareGraphConstructor and its implementations to allow reuse by future components that may not have access to the actual subtopology information. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KIP 989 b [kafka]
nicktelford opened a new pull request, #16003: URL: https://github.com/apache/kafka/pull/16003 - **KAFKA-15541: Add num-open-iterators metric** - **Add missing try-finally block** - **KAFKA-15541: Add iterator-duration metrics** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add a constructor to accept cause in ConfigException [kafka]
gharris1727 commented on PR #15994: URL: https://github.com/apache/kafka/pull/15994#issuecomment-2120906740 Hi @gaurav-narula good to hear that this isn't necessary then. You are free to add these debug logs to your own local copy to aid in debugging, and if you find particular log statements that are very helpful or outright necessary to debug a problem, you can raise a PR to upstream them. If you're having difficulty debugging something in a unit/integration test, than a user could have the same problem and benefit from those log messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1605384701 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE; + +/** + * Reads once the Kafka log for checkpoints and populates a map of + * checkpoints per consumer group. + * + * The Kafka log is closed after the initial load and only the in memory map is + * used after start. + */ +class CheckpointsStore implements AutoCloseable { Review Comment: optional nit: This class can be public, along with the methods that are intended to be used by MirrorCheckpointTask, because this isn't a publically-documented package (like clients, or connect-api, etc.) Outside of those publically-documented packages, the general practice is public for external callers, even if the current callers are in the same package. We only use package-local for things that would be protected/private, but need to be accessed in tests (and so come with the visibility comment.) ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java: ## @@ -271,4 +284,102 @@ private Map assertCheckpointForTopic( assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync"); return checkpoints; } + +@Test +public void testCheckpointsTaskRestartUsesExistingCheckpoints() { Review Comment: I think using "real checkpoints" generated by the first MirrorCheckpointTask to test the second MirrorCheckpointTask is not necessary, and you can use simulated checkpoints instead. Reassigning variables and copy-pasting sections in tests is typo-prone and I think we can avoid it here. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -155,6 +154,58 @@ public void testPastOffsetTranslation() { } } +// this test has been wriiten knowing the exact offsets syncs stored +@Test +public void testPastOffsetTranslationWithoutInitializationReadToEnd() { +final int maxOffsetLag = 10; + +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { +@Override +void backingStoreStart() { +for (int offset = 0; offset <= 1000; offset += maxOffsetLag) { +sync(tp, offset, offset); +assertSparseSyncInvariant(this, tp); +} +} +}; + +store.start(false); + +// After starting but before seeing new offsets +assertTranslationsNearby(store, 400, 480, 0); +assertTranslationsNearby(store, 500, 720, 480); +assertTranslationsNearby(store, 1000, 1000, 990); + +for (int offset = 1000; offset <= 1; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +assertSparseSyncInvariant(store, tp); +} + +// After seeing new offsets, 1000 was kicked out of the store, so +// 1000 can only be traslated to 1, only previously stored offset is 0 +assertTranslationsNearby(store, 1000, 3840, 0); + +// We can translate offsets between the latest startup offset and the latest
[jira] [Updated] (KAFKA-16707) Kafka Kraft : adding Principal Type in StandardACL for matching with KafkaPrincipal of connected client in order to defined ACL with a notion of group
[ https://issues.apache.org/jira/browse/KAFKA-16707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Franck LEDAY updated KAFKA-16707: - Description: Default StandardAuthorizer in Kraft mode is defining a KafkaPrincpal as type=User and a name, and a special wildcard eventually. The difficulty with this solution is that we can't define ACL by group of KafkaPrincipal. There is a way for the moment to do so by defining RULE to rewrite the KafkaPrincipal name field, BUT, to introduce this way the notion of group, you have to set rules which will make you loose the uniq part of the KafkaPrincipal name of the connected client. The concept here, in the StandardAuthorizer of Kafka Kraft, is to add the management of KafkaPrincipal type: * Regex * StartsWith * EndsWith * Contains * (User is still available and keep working as before to avoid any regression/issue with current configurations) This would be done in the StandardAcl class of metadata/authorizer, and the findresult method of StandardAuthorizerData will delegate the match to the StandardAcl class (for performance reason, see below explanation). By this way, you can still use RULEs to rewrite KafkaPrincipal name of connected client (say you want to transform a DN of SSL certificate : cn=myCN,ou=myOU,c=FR becomes myCN@myOU), and then, you can define a new ACL with principal like: 'Regex:^.*@my[oO]U$' that will match all connected client with a certificate bind to ou=myOU . Note in this particular case, the same can be done with 'EndsWtih:@myOU', and the type 'Contains' can work, but I imagine more the usage of this type for matching in a multigroup definition in a KafkaPrincipal. Note about performance reason : for the moment, I have it implemented in a fork of StandardAuthroizer/StandardAuthroizerData/StandardAcl defined by the property authorizer.class.name in a cluster of Kraft with SSL authentication required and tested fine. But, by this way, every time that an ACL is checked against a KafkaPrincipal, I do a strcmp of the KafkaPrincipal type of the ACL to determine the matching method to be done. By implementing it in StandardAcl class, and then delegating the matching from StandardAuthorizerData to the StandardAcl class, this allow to analyse and store the type of the KafkaPrincipal method for matching as an enum, and the KafkaPrincipal name separately in order to avoid redoing the job each time a match has to be checked. Here is my status of the implementation: * I have this solution ('performance reason') implemented in fork (then branch) of the 3.7.0 github repo, * I added few unit test, and a gradlew metadata:test is working fine on all tests except one (witch is failing also on branch 3.7.0 without my changes), * I added few lines about in security.html . I'm opening the issue to discuss it with you, because I would like to create a PR on Github for next version. was: Default StandardAuthorizer in Kraft mode is defining a KafkaPrincpal as type=User and a name, and a special wildcard eventually. The difficulty with this solution is that we can't define ACL by group of KafkaPrincipal. There is a way for the moment to do so by defining RULE to rewrite the KafkaPrincipal name field, BUT, to introduce this way the notion of group, you have to set rules which will make you loose the uniq part of the KafkaPrincipal name of the connected client. The concept here, in the StandardAuthorizer of Kafka Kraft, is to add the management of KafkaPrincipal type: * Regex * StartsWith * EndsWith * Contains * (User is still available and keep working as before to avoid any regression/issue with current configurations) This would be done in the StandardAcl class of metadata/authorizer, and the findresult method of StandardAuthorizerData will delegate the match to the StandardAcl class (for performance reason, see below explanation). By this way, you can still use RULEs to rewrite KafkaPrincipal name of connected client (say you want to transform a DN of SSL certificate : cn=myCN,ou=myOU,c=FR becomes myCN@myOU), and then, you can define a new ACL with principal like: 'Regex:^.*@my[oO]U$' that will match all connected client with a certificate bind to ou=myOU . Note in this particular case, the same can be done with 'EndsWtih:@myOU', and the type 'Contains' can work, but I imagine more the usage of this type for matching in a multigroup definition in a KafkaPrincipal. Note about performance reason : for the moment, I have it implemented in a fork of StandardAuthroizer/StandardAuthroizerData/StandardAcl defined by the property authorizer.class.name in a cluster of Kraft with SSL authentication required and tested fine. But, by this way, every time that an ACL is checked against a KafkaPrincipal, I do a strcmp of the KafkaPrincipal type of the ACL to determine the matching method to be done. By implementing it in
[PR] KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure [kafka]
apourchet opened a new pull request, #16002: URL: https://github.com/apache/kafka/pull/16002 This PR creates the required methods to post-process the result of TaskAssignor.assign into the required ClientMetadata map. This allows most of the internal logic to remain intact after the user's assignment code runs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add a constructor to accept cause in ConfigException [kafka]
gaurav-narula commented on PR #15994: URL: https://github.com/apache/kafka/pull/15994#issuecomment-2120832178 Thanks for the feedback @gharris1727. I dug a bit and it turns out the flakey test `DynamicBrokerReconfigurationTest::testTrustStoreAlter` that motivated this PR won't benefit from either cause or suppressed `Throwable`. This is because the test invokes `incrementalAlterConfigs` as a client and ConfigExceptions in the controller are converted to `ApiError` at https://github.com/apache/kafka/blob/81e609802187ac2bcbd0ac169fa10e8c02c237f5/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java#L299 and we lose the cause/supressed throwable at that point (they aren't sent over the wire) :( I think the easiest change to debug the flakiness is to increase the logging level at https://github.com/apache/kafka/blob/81e609802187ac2bcbd0ac169fa10e8c02c237f5/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java#L187 to `error` instead of `debug`. Alternatively, we may want to have that error log before https://github.com/apache/kafka/blob/81e609802187ac2bcbd0ac169fa10e8c02c237f5/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java#L299 so that we ensure all reconfiguration errors are logged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16777: -- Fix Version/s: 3.8.0 > New consumer should throw NoOffsetForPartitionException on continuous poll > zero if no reset strategy > > > Key: KAFKA-16777 > URL: https://issues.apache.org/jira/browse/KAFKA-16777 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If the consumer does not define an offset reset strategy, a call to poll > should fail with NoOffsetForPartitionException. That works as expected on the > new consumer when polling with a timeout > 0 (existing integration test > [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), > but fails when polling continuously with ZERO timeout. > This can be easily reproduced with a new integration test like this (passes > for the legacy consumer but fails for the new consumer). We should add it as > part of the fix, for better coverage: > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, > groupProtocol: String): Unit = { > this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "none") > val consumer = createConsumer(configOverrides = this.consumerConfig) > consumer.assign(List(tp).asJava) > // continuous poll should eventually fail because there is no offset > reset strategy set (fail only when resetting positions after coordinator is > known) > TestUtils.tryUntilNoAssertionError() { > assertThrows(classOf[NoOffsetForPartitionException], () => > consumer.poll(Duration.ZERO)) > } > } > {code} > Also this is covered in the unit test > [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], > that is currently enabled only for the LegacyConsumer. After fixing this > issue we should be able to enable it for the new consumer too. > The issue seems to be around calling poll with ZERO timeout, that even when > called continuously, the consumer is not able to > initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it > to > [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], > where the exception is thrown. > > There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, > but filing this one to provide more context and point out the test failures > and suggested new tests,. All fail even with the current patch in KAFKA-16637 > so needs investigation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16758: -- Fix Version/s: 4.0.0 > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Lianet Magrans >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]
ivanyu commented on code in PR #13277: URL: https://github.com/apache/kafka/pull/13277#discussion_r1607019116 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## Review Comment: I added a number of constructors here instead of altering the existing because they're public and I assume a part of the client library contract. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847936#comment-17847936 ] Igor Soarez commented on KAFKA-16692: - [~jolshan] done > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.7.1, 3.8 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian Jira
[jira] [Updated] (KAFKA-16741) Add ShareGroupHeartbeat API support in GroupCoordinator
[ https://issues.apache.org/jira/browse/KAFKA-16741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield updated KAFKA-16741: - Summary: Add ShareGroupHeartbeat API support in GroupCoordinator (was: Add SharGroupHeartbeat API support in GroupCoordinator) > Add ShareGroupHeartbeat API support in GroupCoordinator > --- > > Key: KAFKA-16741 > URL: https://issues.apache.org/jira/browse/KAFKA-16741 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
jeffkbkim commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1607007949 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: @gaurav-narula thanks for the feedback. that makes sense, i have incorporated your suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante commented on PR #16001: URL: https://github.com/apache/kafka/pull/16001#issuecomment-2120798453 @gharris1727 If you have a moment, could you take a look? I filed this in anticipation of [KIP-891](https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+Connector+plugins) since it seems that the KIP might make this bug slightly more common, and wanted to prevent users from running into it if possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante opened a new pull request, #16001: URL: https://github.com/apache/kafka/pull/16001 [Jira](https://issues.apache.org/jira/browse/KAFKA-9228) Currently, if a connector generates the same set of task configs as the set that already exists in the config topic, then this new set of task configs is not written to the config topic, and the connector's tasks are not (immediately*) restarted. This is intentional behavior to prevent infinite rebalance loops when using eager rebalancing, and most of the time it comes with no drawbacks. However, if the runtime-controlled properties for a connector (such as its key/value/header converters, or Kafka client overrides) are modified, then this behavior can cause the updates to not be applied (immediately*) to the connector's tasks. In order to address this, we forcibly rewrite task configs to the config topic when we detect a change in the connector config, even if they are identical to the existing task configs. Changes to the connector config are tracked by taking a hash of the connector config and including it in the config topic when writing task configs. If no hash has been written to the config topic yet, then we do not compare hashes. This is done in order to prevent upgrades to newer workers from causing all tasks on the cluster to be immediately restarted. As a final note, this bug should not be very prevalent in the Kafka Connect ecosystem, since most connectors will unintentionally propagate changes in runtime-controlled properties to their tasks. This is because the classic idiom for connectors is to track the properties provided in [Connector::start](https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/Connector.html#start(java.util.Map)) and use either an identical clone or a slightly-modified copy of those properties in the return value of [Connector::taskConfigs](https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/Connector.html#taskConfigs(int)). \* - Tasks can be restarted later on as a result of workers joining/leaving the cluster, users manually triggering restarts via the REST API, or other causes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2120792114 Hi @gharris1727 if you have the time, can you please have a look again ? thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
[ https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16766: -- Component/s: clients > New consumer offsetsForTimes timeout exception does not have the proper > message > --- > > Key: KAFKA-16766 > URL: https://issues.apache.org/jira/browse/KAFKA-16766 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer > will throw a org.apache.kafka.common.errors.TimeoutException as expected, but > with the following as message: "java.util.concurrent.TimeoutException". > We should provide a clearer message, and I would even say we keep the same > message that the LegacyConsumer shows in this case, ex: "Failed to get > offsets by times in 6ms". > To fix this we should consider catching the timeout exception in the consumer > when offsetsForTimes result times out > ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), > and propagate it with the message specific to offsetsForTimes. > Same situation exists for beginningOffsets and endOffsets. All 3 funcs show > the same timeout message in the LegacyConsumer (defined > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), > but do not have a clear message in the Async, so we should fix them all 3. > With the fix, we should write tests for each func, like the ones defined for > the Legacy Consumer > ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]). > Note that we would need different tests, added to AsyncKafkaConsumerTest, > given that the async consumer issues a FindCoordinator request in this case, > but the AsyncConsumer does, so it does not account for that when matching > requests/responses in the current tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-16692: - Re-opening as 3.6 backport is still missing > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.7.1, 3.8 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian Jira
[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16764: -- Component/s: clients > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: appchemist >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847921#comment-17847921 ] Justine Olshan commented on KAFKA-16692: [~soarez] do you mind taking a quick look at this small change to run the new tests in 3.7? https://github.com/apache/kafka/pull/16000 > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.7.1, 3.8 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging
[PR] MINOR: remove extra import from transactions tests [kafka]
jolshan opened a new pull request, #16000: URL: https://github.com/apache/kafka/pull/16000 I missed that these two did not run for the cherrypick. Re running now. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2120776989 @chia7712, Please review, Thanks for your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
m1a2st commented on PR #15840: URL: https://github.com/apache/kafka/pull/15840#issuecomment-2120776240 @chia7712, Please review, Thanks for your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks
[ https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-9228: Assignee: Chris Egerton (was: Greg Harris) > Reconfigured converters and clients may not be propagated to connector tasks > > > Key: KAFKA-9228 > URL: https://issues.apache.org/jira/browse/KAFKA-9228 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > If an existing connector is reconfigured but the only changes are to its > converters and/or Kafka clients (enabled as of > [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]), > the changes will not propagate to its tasks unless the connector also > generates task configs that differ from the existing task configs. Even after > this point, if the connector tasks are reconfigured, they will still not pick > up on the new converter and/or Kafka client configs. > This is because the {{DistributedHerder}} only writes new task configurations > to the connect config topic [if the connector-provided task configs differ > from the task configs already in the config > topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332], > and neither of those contain converter or Kafka client configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847920#comment-17847920 ] Justine Olshan commented on KAFKA-16692: Hey, I still need to do 3.6. I will update the ticket when I do so. > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.7.1, 3.8 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. >
[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847911#comment-17847911 ] Igor Soarez commented on KAFKA-16760: - After another look, {{alterReplicaLogDirs}} is already being tested in KRaft in {{testAlterReplicaLogDirs}} and {{testAlterLogDirsAfterDeleteRecords}} and {{kafka.api.PlaintextAdminIntegrationTest}} and this condition isn't hit: Partition has an older epoch (0) than the current leader. Will await the new LeaderAndIsr state before resuming fetching. > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847911#comment-17847911 ] Igor Soarez edited comment on KAFKA-16760 at 5/20/24 3:51 PM: -- After another look, {{alterReplicaLogDirs}} is already being tested in KRaft in {{testAlterReplicaLogDirs}} and {{testAlterLogDirsAfterDeleteRecords}} and {{kafka.api.PlaintextAdminIntegrationTest}} and this condition isn't hit: {code:java} Partition has an older epoch (0) than the current leader. Will await the new LeaderAndIsr state before resuming fetching.{code} was (Author: soarez): After another look, {{alterReplicaLogDirs}} is already being tested in KRaft in {{testAlterReplicaLogDirs}} and {{testAlterLogDirsAfterDeleteRecords}} and {{kafka.api.PlaintextAdminIntegrationTest}} and this condition isn't hit: Partition has an older epoch (0) than the current leader. Will await the new LeaderAndIsr state before resuming fetching. > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846705#comment-17846705 ] Igor Soarez edited comment on KAFKA-16760 at 5/20/24 3:48 PM: -- [~showuon] I a few hours looking into this but couldn't yet figure out why the test is failing. Regarding the logs you first mentioned: Will await the new LeaderAndIsr state before resuming fetching. I'm not sure this is normal. -I've also not found an integration test covering alterLogDir for KRaft, so we need to add that!- was (Author: soarez): [~showuon] I a few hours looking into this but couldn't yet figure out why the test is failing. Regarding the logs you first mentioned: Will await the new LeaderAndIsr state before resuming fetching. I'm not sure this is normal. I've also not found an integration test covering alterLogDir for KRaft, so we need to add that! > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)