Re: [PR] KAFKA-16992: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka [kafka]
jolshan merged PR #15971: URL: https://github.com/apache/kafka/pull/15971 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16791) Add thread detection to ClusterTestExtensions
[ https://issues.apache.org/jira/browse/KAFKA-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847482#comment-17847482 ] bboyleonp commented on KAFKA-16791: --- Hi [~chia7712], I am interested in this one. May I take it? Thanks. > Add thread detection to ClusterTestExtensions > - > > Key: KAFKA-16791 > URL: https://issues.apache.org/jira/browse/KAFKA-16791 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > `ClusterTestExtensions` should implement `BeforeAllCallback` and > `AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605660608 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: Sorry for the wall of text It might not seem like a huge deal but if it's an app with only source-changelog partitions, then doing this will save the assignor from having to make any DescribeTopics request since there are no non-source changelogs. And yes, apps with only source changelogs do exist, they're pretty common for certain kinds of table-based processing (and especially apps that make heavy use of IQ). And saving a remote fetch is actually a pretty big deal, doing them in the middle of an assignment makes the rebalance vulnerable to timing out, especially when brokers are under heavy load or the app is experiencing rebalancing issues to begin with -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659907 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: To be more precise, I'm imagining something like this: ``` final Set sourceTopicPartitions = new HashSet<>(); final Set changelogTopicPartitions = new HashSet<>(); final Set nonSourceChangelogTopicPartitions = new HashSet<>(); for (final var entry : sourceTopicPartitions.entrySet()) { final TaskId task = entry.getKey(); final Set taskSourcePartitions = entry.getValue(); final Set taskChangelogPartitions = changelogTopicPartitions.get(taskId); final Set taskNonSourceChangelogPartitions = new HashSet(taskChangelogPartitions); taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions); logicalTaskIds.add(taskId); sourceTopicPartitions.addAll(taskSourcePartitions); changelogTopicPartitions.addAll(taskChangelogPartitions); nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions); } ``` Then we pass the `nonSourceChangelogPartitions` into the `#getRacksForTopicPartition` instead of the `changelogPartitions` set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659120 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: Note that we'll also want to deduplicate the source-changelog partitions for the rack id computation. We should include them in the source topics/remove them from the changelog topics passed into the `#getRacksForTopicPartitions` call. Of course we still need the changelogTopicPartitions as well, so we'll want a third set of `nonSourceChangelogTopicPartitions` that's specifically for the rack id computation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605656530 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +changelogTopicPartitions.addAll(partitions); +}); + +final Map> racksForSourcePartitions = RackUtils.getRacksForTopicPartition( +cluster, internalTopicManager, sourceTopicPartitions, false); +final Map> racksForChangelogPartitions = RackUtils.getRacksForTopicPartition( Review Comment: Since the rack info is nontrivial to compute and always makes a remote call (which can take a long time and even time out or otherwise fail) and not every assignor/app will actually use it I'm thinking maybe we should try to initialize it lazily, only if/when the user actually requests the rack info I'm totally happy to push that into a followup PR to keep the scope well-defined for now, so don't worry about it for now. We'd still need everything you implemented here and would just be moving it around and/or subbing in function pointers instead of passing around data strucutres directly, so it shouldn't have any impact on how this PR goes. Just wanted to make a note so I don't forget -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605639072 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: I suppose this doesn't hurt anything since `logicalTasks` is a Set, but the taskIds returned by the partition grouper should be the same for the source and changelog topics. So you can remove this line (alternatively you can create the `logicalTaskIds` map up front by copying the keyset of one of the partitionsForTask maps but that's just an implementation detail, up to you. However I would probably consider adding a check to make sure these two maps return the same set of tasks. Doesn't need to scan the entire thing, maybe just a simple ``` if (sourcePartitionsForTask.size() != changelogPartitionsForTask.size()) { log.error("Partition grouper returned {} tasks for source topics but {} tasks for changelog topics, sourcePartitionsForTask.size(), changelogPartitionsForTask.size()); throw new TaskAssignmentException(//error msg ); } ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + + +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +/** + * A simple container class corresponding to a given {@link
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2118579935 @lianetm @cadonna—I believe I have addressed all the actionable feedback. Are there additional concerns about this PR that prevent it from being merged? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2118578191 > Hey @cadonna, the tricky bit is that, for some events, the request managers do expire requests too, so in this flow you described: > > > The event is processed in the ApplicationEventHandler and a request is added to the commit request manager. **Then the commit request manager is polled**, the requests are added to the network client and the the network client is polled > > When the manager is polled, if the event had timeout 0, it will be expired/cancelled before making it to the network thread. Currently we have 2 managers that do this (that I can remember): [TopicMetadataManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L86) and [CommitRequestManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1168). So for those events, even with this PR, if they have timeout 0, they won't have a chance to complete. > > My point is not to bring more changes into this PR, only to have the whole situation in mind so we can address it properly (with multiple PRs). This other [PR](https://github.com/apache/kafka/pull/15844) attempts to address this situation I described, but only in the `CommitRequestManager` for instance. We still have to align on the approach there, and also handle it in the `TopicMetadataManager` I would say. I would expect that a combination of this PR and those others would allow us to get to a better point (now, even with this PR, we cannot make basic progress with a consumer being continuously polled with timeout 0 because `FetchCommittedOffsets` is always expired by the manager, for instance). I can easily repro it with the following integration test + poll(ZERO) (that I was surprised we have not covered, because TestUtils always polls with a non-zero timeout) > > ``` > // Ensure TestUtils polls with ZERO. This fails for the new consumer only. > @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, groupProtocol: String): Unit = { > val numMessages = 100 > val producer = createProducer() > sendRecords(producer, numMessages, tp) > > val consumer = createConsumer() > consumer.subscribe(Set(topic).asJava) > val records = awaitNonEmptyRecords(consumer, tp) > assertEquals(numMessages, records.count()) > } > ``` > > Makes sense? Yes, the network layer changes are captured in KAFKA-16200 and build on top of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
gaurav-narula commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1605528394 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: Nvm, I see your point - `accumulator.take()` may be invoked - the block on the delegate may happen *after* the timer has been incremented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
jeffkbkim commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1605535757 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -59,7 +59,7 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment /** * The assignment specification which includes member metadata. */ -private final AssignmentSpec assignmentSpec; +private final GroupSpecImpl groupSpec; Review Comment: There are several places where we use GroupSpecImpl as the object type instead of GroupSpec. can we change all of them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
gaurav-narula commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1605530429 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: IIUC, it's the ordering that matters so we can avoid the null check and increment time only after `super.take()` returns. ```java CoordinatorEvent event = super.take(); // blocks until there are new events time.sleep(takeDelayMs); // only increment timer after we get unblocked return event; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
gaurav-narula commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1605530429 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: IIUC, it's the ordering that matters so we can avoid the null check and increment time only after `super.take()` returns. ```java CoordinatorEvent event = super.take(); time.sleep(takeDelayMs); return event; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
gaurav-narula commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1605528394 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: Nvm, I see your point - `accumulator.take()` may be invoked - the block on the delagate may happen after the timer is incremented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
gaurav-narula commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1605525333 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: > we may be at the point when we process the 8 events (diff == 800ms) or when the thread does another iteration over handleEvents(), which does another time.sleep() (diff == 900ms). This was causing the flakiness. Sorry, I'm still confused. Why would another iteration over `handleEvents()` invoke the `doAnswer`? In other words, shouldn't `accumulator.take()` block since there are no more events? I'd imagine `EventAccumulator::take` to block on `condition.await()` after 8 events have been processed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
jeffkbkim commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1605519141 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: That's correct and was my initial confusion as well. The event processor shuts down at the end of the test so we have the processor thread still running when we perform the assertions. When we call ``` long diff = time.milliseconds() - startMs; ``` we may be at the point when we process the 8 events (diff == 800ms) or when the thread does another iteration over handleEvents(), which does another time.sleep() (diff == 900ms). This was causing the flakiness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 opened a new pull request, #15988: URL: https://github.com/apache/kafka/pull/15988 This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16793) Heartbeat API for upgrading ConsumerGroup
Dongnuo Lyu created KAFKA-16793: --- Summary: Heartbeat API for upgrading ConsumerGroup Key: KAFKA-16793 URL: https://issues.apache.org/jira/browse/KAFKA-16793 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu Assignee: Dongnuo Lyu -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes [kafka]
chia7712 merged PR #15865: URL: https://github.com/apache/kafka/pull/15865 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
gaurav-narula commented on code in PR #15987: URL: https://github.com/apache/kafka/pull/15987#discussion_r1605472276 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -475,9 +478,7 @@ public void testRecordThreadIdleRatio() throws Exception { doAnswer(invocation -> { long threadIdleTime = idleTimeCaptured.getValue(); assertEquals(100, threadIdleTime); -synchronized (recordedIdleTimesMs) { -recordedIdleTimesMs.add(threadIdleTime); -} +recordedIdleTimesMs.add(threadIdleTime); Review Comment: Nit: might be useful to add a comment that this is safe because `numThreads` in line 470 is `1`. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) { @Override public CoordinatorEvent take() { -time.sleep(takeDelayMs); -return super.take(); +CoordinatorEvent event = super.take(); Review Comment: IIUC, the fix suggests the flakyness is because `super.take()` returns `null` spuriously. I don't quite follow why that would happen though. Javadoc for `EventAccumulator::take` mentions it returns `null` only when the accumulator is closed. We also assert the value captured within doAnswer is `100`, `recordedIdleTimesMs.size() == 8` and `mockRuntimeMetrics.recordThreadIdleTime()` is invoked 8 times so where is the extra invocation coming from? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chia7712 merged PR #15933: URL: https://github.com/apache/kafka/pull/15933 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15723) KRaft support in ListOffsetsRequestTest
[ https://issues.apache.org/jira/browse/KAFKA-15723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15723. Fix Version/s: 3.8.0 Resolution: Fixed > KRaft support in ListOffsetsRequestTest > --- > > Key: KAFKA-15723 > URL: https://issues.apache.org/jira/browse/KAFKA-15723 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Mickael Maison >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.8.0 > > > The following tests in ListOffsetsRequestTest in > core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala need to be > updated to support KRaft > 37 : def testListOffsetsErrorCodes(): Unit = { > 84 : def testListOffsetsMaxTimeStampOldestVersion(): Unit = { > 112 : def testCurrentEpochValidation(): Unit = { > 173 : def testResponseIncludesLeaderEpoch(): Unit = { > 210 : def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = { > Scanned 261 lines. Found 0 KRaft tests out of 5 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15723: KRaft support in ListOffsetsRequestTest [kafka]
chia7712 merged PR #15980: URL: https://github.com/apache/kafka/pull/15980 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE
[ https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16544. Fix Version/s: 3.8 Resolution: Fixed > DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames > should return null instead of throwing NPE > -- > > Key: KAFKA-16544 > URL: https://issues.apache.org/jira/browse/KAFKA-16544 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Major > Fix For: 3.8 > > > {code:java} > * @return A future map from topic names to descriptions which can be > used to check > * the status of individual description if the describe topic > request used > * topic names, otherwise return null, this request succeeds only > if all the > * topic descriptions succeed > {code} > According the docs, it should return null if we try to get the result > unmatched to the request. For example, we call `allTopicNames` in passing > `TopicIdCollection`. However, the current implementation will throw NPE > directly -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16544 DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE [kafka]
chia7712 merged PR #15979: URL: https://github.com/apache/kafka/pull/15979 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7342: Migrate tests in o.a.k.streams to JUnit 5 (except KafkaStreamsTest) [kafka]
chia7712 merged PR #15942: URL: https://github.com/apache/kafka/pull/15942 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16792: --- Summary: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) (was: Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0)) > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16764: -- Assignee: appchemist > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: appchemist >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847427#comment-17847427 ] Lianet Magrans commented on KAFKA-16764: Sure! Thanks for helping out! Follow-up on the PR. > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16792) Enable related unit tests that fail only for new consumer with to poll(0)
Lianet Magrans created KAFKA-16792: -- Summary: Enable related unit tests that fail only for new consumer with to poll(0) Key: KAFKA-16792 URL: https://issues.apache.org/jira/browse/KAFKA-16792 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Lianet Magrans Enable the following unit tests for the new async consumer in KafkaConsumerTest: - testFetchStableOffsetThrowInPoll - testCurrentLag - testListOffsetShouldUpdateSubscriptions -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16792) Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16792: --- Summary: Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0) (was: Enable related unit tests that fail only for new consumer with to poll(0)) > Enable related unit tests that fail to fetch offsets only for new consumer > with to poll(0) > -- > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16792) Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16792: --- Description: Enable the following unit tests for the new async consumer in KafkaConsumerTest: - testFetchStableOffsetThrowInPoll - testCurrentLag - testListOffsetShouldUpdateSubscriptions was: Enable the following unit tests for the new async consumer in KafkaConsumerTest: - testFetchStableOffsetThrowInPoll - testCurrentLag - testListOffsetShouldUpdateSubscriptions > Enable related unit tests that fail to fetch offsets only for new consumer > with to poll(0) > -- > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16791) Add thread detection to ClusterTestExtensions
Chia-Ping Tsai created KAFKA-16791: -- Summary: Add thread detection to ClusterTestExtensions Key: KAFKA-16791 URL: https://issues.apache.org/jira/browse/KAFKA-16791 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai `ClusterTestExtensions` should implement `BeforeAllCallback` and `AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]
jeffkbkim opened a new pull request, #15987: URL: https://github.com/apache/kafka/pull/15987 DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: use try-with resources in ClientMetricsManagerTest [kafka]
chia7712 merged PR #15982: URL: https://github.com/apache/kafka/pull/15982 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add debug enablement check when using log.debug [kafka]
chia7712 merged PR #15977: URL: https://github.com/apache/kafka/pull/15977 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16789: Fix thread leak detection for event handler threads [kafka]
chia7712 commented on code in PR #15984: URL: https://github.com/apache/kafka/pull/15984#discussion_r1605427011 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1872,7 +1872,7 @@ object TestUtils extends Logging { AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(), AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, QuorumTestHarness.ZkClientEventThreadSuffix, - QuorumController.CONTROLLER_THREAD_SUFFIX, Review Comment: It seems `CONTROLLER_THREAD_SUFFIX` can be removed from `QuorumController` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1605420401 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -1538,7 +1563,80 @@ class LogValidatorTest { assertEquals(e.recordErrors.size, 3) } - private def testBatchWithoutRecordsNotAllowed(sourceCompression: CompressionType, targetCompression: CompressionType): Unit = { + @Test + def testDifferentLevelDoesNotCauseRecompression(): Unit = { +val records = List( + List.fill(256)("some").mkString("").getBytes, + List.fill(256)("data").mkString("").getBytes +) +// Records from the producer were created with gzip max level +val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build() +val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) + +// The topic is configured with gzip min level +val gzipMin: Compression = Compression.gzip().level(GzipCompression.MIN_LEVEL).build() +val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin) + +// ensure data compressed with gzip max and min is different +assertNotEquals(recordsGzipMax, recordsGzipMin) +val validator = new LogValidator(recordsGzipMax, + topicPartition, + time, + gzipMax.`type`(), + gzipMin, + false, + RecordBatch.CURRENT_MAGIC_VALUE, Review Comment: Yes, let's keep it consistent. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16626: Lazily convert subscribed topic names to topic ids [kafka]
jeffkbkim commented on PR #15970: URL: https://github.com/apache/kafka/pull/15970#issuecomment-2118184548 RangeAssignor baseline ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode CntScoreError Units TargetAssignmentBuilderBenchmark.build1 10 100 avgt5 235.398 ± 15.092 ms/op ``` RangeAssignor PR ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode CntScoreError Units TargetAssignmentBuilderBenchmark.build1 10 100 avgt5 161.683 ± 22.739 ms/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
dajac commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1605415364 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -147,6 +147,11 @@ public static class DeadlineAndEpoch { */ private final TimelineHashMap subscribedTopicNames; +/** + * Partition assignments per topic. + */ +private final TimelineHashMap> partitionAssignments; Review Comment: How about invertedTargetAssiments or something similar? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured
[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847392#comment-17847392 ] Muralidhar Basani commented on KAFKA-16790: --- Even though brokerMetadataPublisher is instantiated with all publishers, and then rlm, seems like MetadataLoader#initializeNewPublishers calls BrokerMetadataPublisher#onMetadataUpdate, and by then rlm is already instantiated ? So if BrokerMetadataPublisher#onMetadataUpdate is invoked when there are new publishers being created like DynamicConfigPublisher during brokerMetadataPublisher instance creation, then I think yea remoteLogManager is not initialized. But seems like MetadataLoader#initializeNewPublishers calls BrokerMetadataPublisher#onMetadataUpdate, and by then rlm is already initialized. It's possible that am wrong or I didn't understand this correctly. > Calls to RemoteLogManager are made before it is configured > -- > > Key: KAFKA-16790 > URL: https://issues.apache.org/jira/browse/KAFKA-16790 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.8.0 >Reporter: Christo Lolov >Priority: Major > > BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) > which in turn calls RemoteLogManager#onLeadershipChange (2), however, the > RemoteLogManager is configured after the BrokerMetadataPublisher starts > running (3, 4). This is incorrect, we either need to initialise the > RemoteLogManager before we start the BrokerMetadataPublisher or we need to > skip calls to onLeadershipChange if the RemoteLogManager is not initialised. > (1) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] > (2) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] > (3) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] > (4) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] > The way to reproduce the problem is by looking at the following changes > {code:java} > config/kraft/broker.properties | 10 ++ > .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++- > core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +- > 3 files changed, 22 insertions(+), 2 deletions(-)diff --git > a/config/kraft/broker.properties b/config/kraft/broker.properties > index 2d15997f28..39d126cf87 100644 > --- a/config/kraft/broker.properties > +++ b/config/kraft/broker.properties > @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30 > + > +remote.log.storage.system.enable=true > +remote.log.metadata.manager.listener.name=PLAINTEXT > +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage > +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar > +remote.log.storage.manager.impl.prefix=rsm.config. > +remote.log.metadata.manager.impl.prefix=rlmm.config. > +rsm.config.dir=/tmp/kafka-remote-storage > +rlmm.config.remote.log.metadata.topic.replication.factor=1 > +log.retention.check.interval.ms=1000 > diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > index 6555b7c0cd..e84a072abc 100644 > --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { > // The endpoint for remote log metadata manager to connect to > private Optional endpoint = Optional.empty(); > private boolean closed = false; > + private boolean up = false; > > /** > * Creates RemoteLogManager instance with the given arguments. > @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { > // in connecting to the brokers or remote storages. > configureRSM(); > configureRLMM(); > + up = true; > } > > public RemoteStorageManager storageManager() { > @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { > public void onLeadershipChange(Set partitionsBecomeLeader, > Set partitionsBecomeFollower, > Map topicIds) { > - LOGGER.debug("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > + if (!up) {
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847391#comment-17847391 ] Johnson Okorie commented on KAFKA-16692: Hi [~jolshan], I wanted to express my sincere thanks for your quick response and prompt fix for this issue. One of the reasons I love this project! Keep up the great work! > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
rreddy-22 commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1605382569 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -147,6 +147,11 @@ public static class DeadlineAndEpoch { */ private final TimelineHashMap subscribedTopicNames; +/** + * Partition assignments per topic. + */ +private final TimelineHashMap> partitionAssignments; Review Comment: like targetPartitionAssignments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix rate metric spikes [kafka]
junrao commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1605350575 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,40 +111,43 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.lastEventMs >= expireAge) sample.reset(now); } } protected static class Sample { public double initialValue; public long eventCount; -public long lastWindowMs; +public long startTimeMs; +public long lastEventMs; public double value; public Sample(double initialValue, long now) { this.initialValue = initialValue; this.eventCount = 0; -this.lastWindowMs = now; +this.startTimeMs = now; +this.lastEventMs = now; this.value = initialValue; } public void reset(long now) { this.eventCount = 0; -this.lastWindowMs = now; +this.startTimeMs = now; +this.lastEventMs = now; this.value = initialValue; } public boolean isComplete(long timeMs, MetricConfig config) { -return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); +return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); } @Override public String toString() { return "Sample(" + "value=" + value + ", eventCount=" + eventCount + -", lastWindowMs=" + lastWindowMs + +", startTimeMs=" + startTimeMs + Review Comment: Should we add lastEventMs? ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,40 +111,43 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.lastEventMs >= expireAge) Review Comment: Could we add a comment that we don't purge overlapping samples? ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -82,7 +82,7 @@ public long windowSize(MetricConfig config, long now) { * but this approach does not account for sleeps. SampledStat only creates samples whenever record is called, * if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results. */ -long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs; +long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs; Review Comment: Could we add some comments to note that the total elapse time could be larger than the window size since we keep the overlapping samples? ## clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java: ## @@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS double expectedRatePerSec = sampleValue / windowSize; assertEquals(expectedRatePerSec, observedRate, EPS); } + +// Record an event every 100 ms on average, moving some 1 ms back or forth for fine-grained +// window control. The expected rate, hence, is 10-11 events/sec depending on the moment of +// measurement. Start assertions from the second window. +@Test +public void testRateIsConsistentAfterTheFirstWindow() { +MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); +List steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100); + +// start the first window and record events at 0,99,199,...,999 ms +for (int stepMs : steps) { +time.sleep(stepMs); +rate.record(config, 1, time.milliseconds()); +} + +// making a gap of 100 ms between windows +time.sleep(101); + +// start the second window and record events at 0,99,199,...,999 ms +for (int stepMs : steps) { +time.sleep(stepMs); +rate.record(config, 1, time.milliseconds()); +double observedRate = rate.measure(config, time.milliseconds()); Review Comment: Should we do a second measurement and check that the result doesn't change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] KAFKA-16654:Refactor kafka.test.annotation.Type and ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15916: URL: https://github.com/apache/kafka/pull/15916#discussion_r1605371498 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -62,10 +63,10 @@ public class ClusterTestExtensionsTest { } // Static methods can generate cluster configurations -static void generate1(ClusterGenerator clusterGenerator) { +static List generate1() { Map serverProperties = new HashMap<>(); serverProperties.put("foo", "bar"); -clusterGenerator.accept(ClusterConfig.defaultBuilder() +return Arrays.asList(ClusterConfig.defaultBuilder() Review Comment: `Collections.singletonList` ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -122,31 +115,57 @@ public Stream provideTestTemplateInvocationContex return generatedContexts.stream(); } -void processClusterTemplate(ExtensionContext context, ClusterTemplate annot, - Consumer testInvocations) { + + +List processClusterTemplate(ExtensionContext context, ClusterTemplate annot) { // If specified, call cluster config generated method (must be static) List generatedClusterConfigs = new ArrayList<>(); Review Comment: Please remove this unused variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847386#comment-17847386 ] Phuc Hong Tran commented on KAFKA-14517: [~lianetm] I’m in the process of implementing this one already > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16788) Resource leakage due to absence of close() call on connector start failure
[ https://issues.apache.org/jira/browse/KAFKA-16788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847385#comment-17847385 ] Ashok commented on KAFKA-16788: --- [~gharris1727] Thanks. yes, I am working on the fix. I will raise a PR next week. > Resource leakage due to absence of close() call on connector start failure > -- > > Key: KAFKA-16788 > URL: https://issues.apache.org/jira/browse/KAFKA-16788 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Ashok >Assignee: Ashok >Priority: Critical > > We have identified a potential issue in the WorkerConnector class of the > Connect framework. Specifically, the close() method is not being called on > the connector when the connector fails to start due to various reasons. This > omission prevents the connector from releasing any resources that were > created or started as part of the start() method. As a result, these > resources remain allocated even after the connector has failed to start, > leading to resource leakage. > To address this issue, we propose modifying the WorkerConnector class to > ensure that the close() method is called on the connector whenever the > connector fails to start. This change will allow the connector to properly > release its resources, preventing resource leakage. > *Steps to Reproduce:* > # Initiate a connector that creates or allocates resources (for instance, > threads) during the execution of the start() method. > # Generate a problem that, during the start() process, either triggers an > exception or invokes the raiseError(Exception e) method of the > WorkerConnectorContext. > # Notice that the close() method is not invoked on the connector, resulting > in resource leakage, as the stop() method is where the resources are > typically closed. > In our scenario, the issue was related to threads not being properly closed. > These threads were initiated as part of the start() method in the connector. > *Expected Result:* > When a connector fails to start, the close() method should be called to allow > the connector to release its resources. > *Actual Result:* > The close() method is not called when a connector fails to start, leading to > resource leakage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured
[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847384#comment-17847384 ] Muralidhar Basani commented on KAFKA-16790: --- [~christo_lolov] can I look into this issue ? > Calls to RemoteLogManager are made before it is configured > -- > > Key: KAFKA-16790 > URL: https://issues.apache.org/jira/browse/KAFKA-16790 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.8.0 >Reporter: Christo Lolov >Priority: Major > > BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) > which in turn calls RemoteLogManager#onLeadershipChange (2), however, the > RemoteLogManager is configured after the BrokerMetadataPublisher starts > running (3, 4). This is incorrect, we either need to initialise the > RemoteLogManager before we start the BrokerMetadataPublisher or we need to > skip calls to onLeadershipChange if the RemoteLogManager is not initialised. > (1) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] > (2) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] > (3) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] > (4) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] > The way to reproduce the problem is by looking at the following changes > {code:java} > config/kraft/broker.properties | 10 ++ > .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++- > core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +- > 3 files changed, 22 insertions(+), 2 deletions(-)diff --git > a/config/kraft/broker.properties b/config/kraft/broker.properties > index 2d15997f28..39d126cf87 100644 > --- a/config/kraft/broker.properties > +++ b/config/kraft/broker.properties > @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30 > + > +remote.log.storage.system.enable=true > +remote.log.metadata.manager.listener.name=PLAINTEXT > +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage > +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar > +remote.log.storage.manager.impl.prefix=rsm.config. > +remote.log.metadata.manager.impl.prefix=rlmm.config. > +rsm.config.dir=/tmp/kafka-remote-storage > +rlmm.config.remote.log.metadata.topic.replication.factor=1 > +log.retention.check.interval.ms=1000 > diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > index 6555b7c0cd..e84a072abc 100644 > --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { > // The endpoint for remote log metadata manager to connect to > private Optional endpoint = Optional.empty(); > private boolean closed = false; > + private boolean up = false; > > /** > * Creates RemoteLogManager instance with the given arguments. > @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { > // in connecting to the brokers or remote storages. > configureRSM(); > configureRLMM(); > + up = true; > } > > public RemoteStorageManager storageManager() { > @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { > public void onLeadershipChange(Set partitionsBecomeLeader, > Set partitionsBecomeFollower, > Map topicIds) { > - LOGGER.debug("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > + if (!up) { > + LOGGER.error("NullPointerException"); > + return; > + } > + LOGGER.error("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > > Map leaderPartitionsWithLeaderEpoch = > filterPartitions(partitionsBecomeLeader) > .collect(Collectors.toMap( > diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala > b/core/src/main/scala/kafka/server/ReplicaManager.scala > index 35499430d6..bd3f41c3d6 100644 > --- a/core/src/main/scala/kafka/server/ReplicaManager.scala > +++
Re: [PR] MINOR: Refactor write timeout in CoordinatorRuntime [kafka]
chia7712 merged PR #15976: URL: https://github.com/apache/kafka/pull/15976 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16788) Resource leakage due to absence of close() call on connector start failure
[ https://issues.apache.org/jira/browse/KAFKA-16788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847383#comment-17847383 ] Greg Harris commented on KAFKA-16788: - Nice find [~ashok89]! Are you interested in working on a fix for this? > Resource leakage due to absence of close() call on connector start failure > -- > > Key: KAFKA-16788 > URL: https://issues.apache.org/jira/browse/KAFKA-16788 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Ashok >Assignee: Ashok >Priority: Critical > > We have identified a potential issue in the WorkerConnector class of the > Connect framework. Specifically, the close() method is not being called on > the connector when the connector fails to start due to various reasons. This > omission prevents the connector from releasing any resources that were > created or started as part of the start() method. As a result, these > resources remain allocated even after the connector has failed to start, > leading to resource leakage. > To address this issue, we propose modifying the WorkerConnector class to > ensure that the close() method is called on the connector whenever the > connector fails to start. This change will allow the connector to properly > release its resources, preventing resource leakage. > *Steps to Reproduce:* > # Initiate a connector that creates or allocates resources (for instance, > threads) during the execution of the start() method. > # Generate a problem that, during the start() process, either triggers an > exception or invokes the raiseError(Exception e) method of the > WorkerConnectorContext. > # Notice that the close() method is not invoked on the connector, resulting > in resource leakage, as the stop() method is where the resources are > typically closed. > In our scenario, the issue was related to threads not being properly closed. > These threads were initiated as part of the start() method in the connector. > *Expected Result:* > When a connector fails to start, the close() method should be called to allow > the connector to release its resources. > *Actual Result:* > The close() method is not called when a connector fails to start, leading to > resource leakage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chia7712 commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1605365153 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig, cleanerManager.removeMetrics() Review Comment: > We can removed the cleanerManager.removeMetrics() then we should also removed [this](https://github.com/apache/kafka/blob/fafa3c76dc93f3258b2cea49dfd1dc7a724a213c/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala#L96) in the testRemoveMetricsOnClose(). My point was - we don't need to re-create the metrics after reconfiguration - i.e in reconfiguration we SHOULD NOT remove the metrics. For example, ```scala private[this] def shutdownCleaners(): Unit = { info("Shutting down the log cleaner.") cleaners.foreach(_.shutdown()) cleaners.clear() } /** * Stop the background cleaner threads */ def shutdown(): Unit = { try shutdownCleaners() finally removeMetrics() info("Shutting down the log cleaner.") } ``` With above changes, in `reconfigure` we call `shutdownCleaners` instead of `shutdown`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1605361253 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -240,28 +215,61 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { -clusterReference.get().waitForReadyBrokers(); +clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } -private BrokerServer findBrokerOrThrow(int brokerId) { -return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) -.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); -} @Override public Map brokers() { -return clusterReference.get().brokers().entrySet() +return clusterTestKit.brokers().entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override public Map controllers() { -return Collections.unmodifiableMap(clusterReference.get().controllers()); +return Collections.unmodifiableMap(clusterTestKit.controllers()); +} + +public void format() throws Exception { Review Comment: We can put `safeBuildCluster` and `doBuild` into `format`, right? ```java public void format() throws Exception { if (formated.compareAndSet(false,true)) { TestKitNodes nodes = new TestKitNodes.Builder() .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) .setCombined(isCombined) .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) .setPerServerProperties(clusterConfig.perServerOverrideProperties()) .setNumControllerNodes(clusterConfig.numControllers()).build(); KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); if (Boolean.parseBoolean(clusterConfig.serverProperties() .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { this.embeddedZookeeper = new EmbeddedZookeeper(); builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); } // Copy properties into the TestKit builder clusterConfig.serverProperties().forEach(builder::setConfigProp); // KAFKA-12512 need to pass security protocol and listener name here this.clusterTestKit = builder.build(); this.clusterTestKit.format(); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured
[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-16790: -- Description: BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) which in turn calls RemoteLogManager#onLeadershipChange (2), however, the RemoteLogManager is configured after the BrokerMetadataPublisher starts running (3, 4). This is incorrect, we either need to initialise the RemoteLogManager before we start the BrokerMetadataPublisher or we need to skip calls to onLeadershipChange if the RemoteLogManager is not initialised. (1) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] (2) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] (3) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] (4) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] The way to reproduce the problem is by looking at the following changes ``` --- config/kraft/broker.properties | 10 ++ .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++- core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index 2d15997f28..39d126cf87 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=30 + +remote.log.storage.system.enable=true +remote.log.metadata.manager.listener.name=PLAINTEXT +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar +remote.log.storage.manager.impl.prefix=rsm.config. +remote.log.metadata.manager.impl.prefix=rlmm.config. +rsm.config.dir=/tmp/kafka-remote-storage +rlmm.config.remote.log.metadata.topic.replication.factor=1 +log.retention.check.interval.ms=1000 diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6555b7c0cd..e84a072abc 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { // The endpoint for remote log metadata manager to connect to private Optional endpoint = Optional.empty(); private boolean closed = false; + private boolean up = false; /** * Creates RemoteLogManager instance with the given arguments. @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); + up = true; } public RemoteStorageManager storageManager() { @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { public void onLeadershipChange(Set partitionsBecomeLeader, Set partitionsBecomeFollower, Map topicIds) { - LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); + if (!up) { + LOGGER.error("NullPointerException"); + return; + } + LOGGER.error("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); Map leaderPartitionsWithLeaderEpoch = filterPartitions(partitionsBecomeLeader) .collect(Collectors.toMap( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 35499430d6..bd3f41c3d6 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig, */ def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { // Before taking the lock, compute the local changes + stateChangeLogger.error("ROBIN") val localChanges = delta.localChanges(config.nodeId) val metadataVersion = newImage.features().metadataVersion() @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() - remoteLogManager.foreach(rlm =>
[jira] [Updated] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured
[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-16790: -- Description: BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) which in turn calls RemoteLogManager#onLeadershipChange (2), however, the RemoteLogManager is configured after the BrokerMetadataPublisher starts running (3, 4). This is incorrect, we either need to initialise the RemoteLogManager before we start the BrokerMetadataPublisher or we need to skip calls to onLeadershipChange if the RemoteLogManager is not initialised. (1) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] (2) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] (3) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] (4) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] The way to reproduce the problem is by looking at the following changes {code:java} config/kraft/broker.properties | 10 ++ .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++- core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +- 3 files changed, 22 insertions(+), 2 deletions(-)diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index 2d15997f28..39d126cf87 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=30 + +remote.log.storage.system.enable=true +remote.log.metadata.manager.listener.name=PLAINTEXT +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar +remote.log.storage.manager.impl.prefix=rsm.config. +remote.log.metadata.manager.impl.prefix=rlmm.config. +rsm.config.dir=/tmp/kafka-remote-storage +rlmm.config.remote.log.metadata.topic.replication.factor=1 +log.retention.check.interval.ms=1000 diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6555b7c0cd..e84a072abc 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { // The endpoint for remote log metadata manager to connect to private Optional endpoint = Optional.empty(); private boolean closed = false; + private boolean up = false; /** * Creates RemoteLogManager instance with the given arguments. @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); + up = true; } public RemoteStorageManager storageManager() { @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { public void onLeadershipChange(Set partitionsBecomeLeader, Set partitionsBecomeFollower, Map topicIds) { - LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); + if (!up) { + LOGGER.error("NullPointerException"); + return; + } + LOGGER.error("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); Map leaderPartitionsWithLeaderEpoch = filterPartitions(partitionsBecomeLeader) .collect(Collectors.toMap( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 35499430d6..bd3f41c3d6 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig, */ def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { // Before taking the lock, compute the local changes + stateChangeLogger.error("ROBIN") val localChanges = delta.localChanges(config.nodeId) val metadataVersion = newImage.features().metadataVersion() @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() - remoteLogManager.foreach(rlm =>
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
prestona commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2118027385 Hi @gharris1727, hopefully the latest commits address your review comments. Once again, really appreciate all your feedback and suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured
Christo Lolov created KAFKA-16790: - Summary: Calls to RemoteLogManager are made before it is configured Key: KAFKA-16790 URL: https://issues.apache.org/jira/browse/KAFKA-16790 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.8.0 Reporter: Christo Lolov BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) which in turn calls RemoteLogManager#onLeadershipChange (2), however, the RemoteLogManager is configured after the BrokerMetadataPublisher starts running (3, 4). This is incorrect, we either need to initialise the RemoteLogManager before we start the BrokerMetadataPublisher or we need to skip calls to onLeadershipChange if the RemoteLogManager is not initialised. (1) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] (2) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] (3) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] (4) [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] The way to reproduce the problem is by looking at the following branch -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
junrao commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1605313096 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -1538,7 +1563,80 @@ class LogValidatorTest { assertEquals(e.recordErrors.size, 3) } - private def testBatchWithoutRecordsNotAllowed(sourceCompression: CompressionType, targetCompression: CompressionType): Unit = { + @Test + def testDifferentLevelDoesNotCauseRecompression(): Unit = { +val records = List( + List.fill(256)("some").mkString("").getBytes, + List.fill(256)("data").mkString("").getBytes +) +// Records from the producer were created with gzip max level +val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build() +val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) + +// The topic is configured with gzip min level +val gzipMin: Compression = Compression.gzip().level(GzipCompression.MIN_LEVEL).build() +val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin) + +// ensure data compressed with gzip max and min is different +assertNotEquals(recordsGzipMax, recordsGzipMin) +val validator = new LogValidator(recordsGzipMax, + topicPartition, + time, + gzipMax.`type`(), + gzipMin, + false, + RecordBatch.CURRENT_MAGIC_VALUE, Review Comment: Earlier, we use RecordBatch.MAGIC_VALUE_V2. Should we be consistent here? Ditto in the next test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
jsancio opened a new pull request, #15986: URL: https://github.com/apache/kafka/pull/15986 DRAFT ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14
[ https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16763. Fix Version/s: 3.8.0 Resolution: Fixed > Upgrade to scala 2.12.19 and scala 2.13.14 > -- > > Key: KAFKA-16763 > URL: https://issues.apache.org/jira/browse/KAFKA-16763 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Minor > Fix For: 3.8.0 > > > scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19) > > scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16763: Upgrade to scala 2.12.19 and scala 2.13.14 [kafka]
chia7712 merged PR #15958: URL: https://github.com/apache/kafka/pull/15958 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847369#comment-17847369 ] Lianet Magrans commented on KAFKA-14517: hey [~phuctran], [~dajac] , I would like to take on this one if that's ok? Thanks! > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16774: --- Fix Version/s: 3.8.0 > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Minor > Labels: flaky-test > Fix For: 3.8.0 > > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16758: -- Assignee: Lianet Magrans > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Lianet Magrans >Priority: Major > Labels: needs-kip > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15197) Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
[ https://issues.apache.org/jira/browse/KAFKA-15197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847366#comment-17847366 ] Chia-Ping Tsai commented on KAFKA-15197: It seems that behavior is expected. The check assumes that consumed offsets of "all" partitions gets updated after the consumer commits the offsets. However, the committed offsets are not the "end" offsets [0]. When the monotonic offsets of `OffsetSyncStore` is composed by end offset (upstream) only, we won't send any checkpoint as offsets translation won't generate the offset which is outside the bound [1]. For example: # tp has 500 records -> end offset is 500 # monotonic syncs is [499],[499],...[0],[0] # committed consumer offset is 449 # 499 (synced offset) is bigger than 449 (upstream group offset), and so we don't send checkpoint for it. In order to stabilize the tests, in short, we can change the condition from "all" to "some". For example, It works if 50% partitions get updated. Or we can tweak the translation algorithm to generate more smooth monotonic syncs [2]. This is more complicated and it can't solve the issue 100%. However, it could reduce of the possibility of updating nothing of group offset to downstream [0] [https://github.com/apache/kafka/blob/b8c96389b47df0dbd53fcba9404363dcdf43604d/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java#L716] [1] [https://github.com/apache/kafka/blob/b8c96389b47df0dbd53fcba9404363dcdf43604d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L308] [2] [https://github.com/apache/kafka/blob/b8c96389b47df0dbd53fcba9404363dcdf43604d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L212] [~gharris1727] WDYT? It would be great if MM2 expert can give me feedback :) > Flaky test > MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() > -- > > Key: KAFKA-15197 > URL: https://issues.apache.org/jira/browse/KAFKA-15197 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Reporter: Divij Vaidya >Priority: Major > Labels: flaky-test > Fix For: 3.8.0 > > > As of Jul 17th, this is the second most flaky test in our CI on trunk and > fails 46% of times. > See: > [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=Europe/Berlin] > > Note that MirrorConnectorsIntegrationExactlyOnceTest has multiple tests but > testOffsetTranslationBehindReplicationFlow is the one that is the reason for > most failures. see: > [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=Europe/Berlin=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest] > > > Reason for failure is: > |org.opentest4j.AssertionFailedError: Condition not met within timeout 2. > Offsets for consumer group consumer-group-lagging-behind not translated from > primary for topic primary.test-topic-1 ==> expected: but was: | -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
dajac commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1605223809 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java: ## @@ -65,4 +65,33 @@ protected static Set topicIdPartitions( .mapToObj(i -> new TopicIdPartition(topic, i)) ).collect(Collectors.toSet()); } + +/** + * Constructs a set of {@code TopicIdPartition} including all the partitions that are + * currently not assigned to any member. + * + * @param topicIds Collection of topic Ids. + * @param subscribedTopicDescriber Describer to fetch partition counts for topics. + * @param groupSpecThe group's assignment spec. + * + * + * @return Set of unassigned {@code TopicIdPartition} including newly added topic partitions. + */ +protected static Set unassignedTopicIdPartitions( Review Comment: I just noticed that we already have a loop in `OptimizedUniformAssignmentBuilder#buildAssignment` iterating over the subscribed topic ids. I would be great if we could merge this one into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847352#comment-17847352 ] Justine Olshan commented on KAFKA-16692: [~soarez] I'd like to try to get this fix into 3.7.1 The PR for trunk is approved, but just need to get builds to cooperate. Hope to cherrypick in the next day or so. > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this
[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join
[ https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847348#comment-17847348 ] Muralidhar Basani commented on KAFKA-16333: --- Agree. thanks. > Removed Deprecated methods KTable#join > -- > > Key: KAFKA-16333 > URL: https://issues.apache.org/jira/browse/KAFKA-16333 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > KTable#join() methods taking a `Named` parameter got deprecated in 3.1 > release via https://issues.apache.org/jira/browse/KAFKA-13813 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2117763853 Hey @cadonna, the tricky bit is that, for some events, the request managers do expire requests too, so in this flow you described: > The event is processed in the ApplicationEventHandler and a request is added to the commit request manager. **Then the commit request manager is polled**, the requests are added to the network client and the the network client is polled When the manager is polled, if the event had timeout 0, it will be expired/cancelled before making it to the network thread. Currently we have 2 managers that do this (that I can remember): [TopicMetadataManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L86) and [CommitRequestManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1168). So for those events, even with this PR, if they have timeout 0, they won't have a chance to complete. My point is not to bring more changes into this PR, only to have the whole situation in mind so we can address it properly (with multiple PRs). This other [PR](https://github.com/apache/kafka/pull/15844) attempts to address this situation I described, but only in the `CommitRequestManager` for instance. We still have to align on the approach there, and also handle it in the `TopicMetadataManager` I would say. I would expect that a combination of this PR and those others would allow us to get to a better point (now, even with this PR, we cannot make basic progress with a consumer being continuously polled with timeout 0 because `FetchCommittedOffsets` is always expired by the manager, for instance). I can easily repro it with the following integration test (that I was surprised we have not covered, because TestUtils always polls with a non-zero timeout) ``` // Ensure TestUtils polls with ZERO. This fails for the new consumer only. @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, groupProtocol: String): Unit = { val numMessages = 100 val producer = createProducer() sendRecords(producer, numMessages, tp) val consumer = createConsumer() consumer.subscribe(Set(topic).asJava) val records = awaitNonEmptyRecords(consumer, tp) assertEquals(numMessages, records.count()) } ``` Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]
appchemist commented on PR #15961: URL: https://github.com/apache/kafka/pull/15961#issuecomment-2117739675 @lianetm & @philipnee PTAL `testSubscriptionOnInvalidTopic` is still disabled for new consumer. I couldn't make the changes because `MockMetadataUpdater` is being used internally instead of `MetadataUpdater`. So I only added unittest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847334#comment-17847334 ] Lucas Brutschy edited comment on KAFKA-12679 at 5/17/24 2:26 PM: - Hey [~stoeckmk]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. We missed updating the fix version on this ticket. I updated it now. Hey [~coltmcnealy-lh]. The change I implemented was to back-off in case of a lock error, but essentially retry. You should see something like `"Encountered lock exception. Reattempting locking the state in the next iteration.` If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this, since the problem described in this ticket should resolve itself once the old thread releases the lock on the state directory. was (Author: JIRAUSER302322): Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this, since the problem described in this ticket should resolve itself once the old thread releases the lock on the state directory. Either way, I'll update the fix version here. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847334#comment-17847334 ] Lucas Brutschy edited comment on KAFKA-12679 at 5/17/24 2:20 PM: - Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this, since the problem described in this ticket should resolve itself once the old thread releases the lock on the state directory. Either way, I'll update the fix version here. was (Author: JIRAUSER302322): Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this.Either way, I'll update the fix version here. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-12399: Deprecate KafkaLog4jAppender [kafka]
mimaison commented on PR #15985: URL: https://github.com/apache/kafka/pull/15985#issuecomment-2117717230 @viktorsomogyi IIRC you volunteered to do the migration work to log4j2 a while back. Are you still able/willing to do so for Kafka 4.0? @jlprat As there's a chance 3.8 will be the last 3.X release, if we want to migrate to log4j2 in Kafka 4.0, we need to deprecate our log4j appender in 3.8.0. The KIP ([KIP-719](https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Deprecate+Log4J+Appender)) was voted years ago, so we just need to merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-12679: --- Fix Version/s: 3.8.0 (was: 3.7.0) > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847334#comment-17847334 ] Lucas Brutschy commented on KAFKA-12679: Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this.Either way, I'll update the fix version here. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.7.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16762) SyncGroup API for upgrading ConsumerGroup
[ https://issues.apache.org/jira/browse/KAFKA-16762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16762. - Fix Version/s: 3.8.0 Reviewer: David Jacot Resolution: Fixed > SyncGroup API for upgrading ConsumerGroup > - > > Key: KAFKA-16762 > URL: https://issues.apache.org/jira/browse/KAFKA-16762 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-12399: Deprecate KafkaLog4jAppender [kafka]
mimaison opened a new pull request, #15985: URL: https://github.com/apache/kafka/pull/15985 As per KIP-719, KafkaLog4jAppender is now deprecated and will be removed in Kafka 4.0. Users should migrate to the log4j2 Kafka Appender ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac merged PR #15954: URL: https://github.com/apache/kafka/pull/15954 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16784) Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new test infra
[ https://issues.apache.org/jira/browse/KAFKA-16784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16784: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new > test infra > - > > Key: KAFKA-16784 > URL: https://issues.apache.org/jira/browse/KAFKA-16784 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > Labels: storage_test > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16789: Fix thread leak detection for event handler threads [kafka]
gaurav-narula opened a new pull request, #15984: URL: https://github.com/apache/kafka/pull/15984 Updates `TestUtils::verifyNoUnexpectedThreads` to check for all event queue threads instead of the incorrect QuorumController thread. Also updated `KafkaEventQueueTest` to use try-with-resources to ensure threads get closed and to use LoggerContext with the current test name as the logPrefix for easier debugging. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15746: KRaft support in ControllerMutationQuotaTest [kafka]
mimaison commented on PR #15038: URL: https://github.com/apache/kafka/pull/15038#issuecomment-2117661783 @linzihao1999 Can you follow up this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16675: Refactored and new rebalance callbacks integration tests [kafka]
lianetm commented on PR #15965: URL: https://github.com/apache/kafka/pull/15965#issuecomment-2117652874 Thanks for the helpful comment @lucasbru , addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16675: Refactored and new rebalance callbacks integration tests [kafka]
lianetm commented on code in PR #15965: URL: https://github.com/apache/kafka/pull/15965#discussion_r1605041571 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala: ## @@ -84,29 +84,87 @@ class PlaintextConsumerCallbackTest extends AbstractConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked(quorum: String, groupProtocol: String): Unit = { -val tp = new TopicPartition(topic, 0); +val tp = new TopicPartition(topic, 0) triggerOnPartitionsRevoked { (consumer, _) => val map = consumer.beginningOffsets(Collections.singletonList(tp)) assertTrue(map.containsKey(tp)) assertEquals(0, map.get(tp)) } } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(quorum: String, groupProtocol: String): Unit = { +val tp = new TopicPartition(topic, 0) +triggerOnPartitionsAssigned { (consumer, _) => assertDoesNotThrow(() => consumer.position(tp)) } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSeekPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(quorum: String, groupProtocol: String): Unit = { +val consumer = createConsumer() +val startingOffset = 100L +val totalRecords = 120L + +val producer = createProducer() +val startingTimestamp = 0 +sendRecords(producer, totalRecords.toInt, tp, startingTimestamp) + +consumer.subscribe(asList(topic), new ConsumerRebalanceListener { + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { +consumer.seek(tp, startingOffset) + } + + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { +// noop + } +}) +consumeAndVerifyRecords(consumer, numRecords = (totalRecords - startingOffset).toInt, + startingOffset = startingOffset.toInt, startingKeyAndValueIndex = startingOffset.toInt, + startingTimestamp = startingOffset) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testPauseOnPartitionsAssignedCallback(quorum: String, groupProtocol: String): Unit = { +val consumer = createConsumer() +val totalRecords = 100L +val partitionsAssigned = new AtomicBoolean(false) + +val producer = createProducer() +val startingTimestamp = 0 +sendRecords(producer, totalRecords.toInt, tp, startingTimestamp) + +consumer.subscribe(asList(topic), new ConsumerRebalanceListener { Review Comment: I couldn't initially mainly because of the close and poll in the helper, that played against what I needed to test in these 2, but then I myself removed the close and forgot to try again :). So I did integrate the helper here now, with a minor twist to pass the consumer as param. Also it allowed me to simplify the seek/pause test in one, given that we do need to pause to properly check the seek behaviour, so removed the extra test for pause. Good catch, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27 With apache kerby [kafka]
mimaison commented on PR #15277: URL: https://github.com/apache/kafka/pull/15277#issuecomment-2117650240 @highluck Can you address the pending comments and rebase on trunk to resolve the conflicts? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
cadonna commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1605020116 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; + +import java.util.Optional; + +class KStreamKStreamLeftJoin extends KStreamKStreamJoin { Review Comment: I would still prefer `KStreamKStreamJoinLeftSide`. It is the left side of a stream-stream join. The processor are also named `KStreamKStreamJoinLeftProcessor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
cadonna commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1604937635 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -187,17 +157,25 @@ public void process(final Record record) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } else { sharedTimeTracker.updatedMinTime(inputRecordTimestamp); -outerJoinStore.ifPresent(store -> store.put( -TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp), -LeftOrRightValue.make(isLeftSide, record.value(; +putInOuterJoinStore(record); } } } } +protected abstract TimestampedKeyAndJoinSide makeThisKey(final K key, final long inputRecordTimestamp); + +protected abstract LeftOrRightValue makeThisValue(final VThis thisValue); + +protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K key, final long timestamp); + +protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue); + +protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue); + private void emitNonJoinedOuterRecords( -final KeyValueStore, LeftOrRightValue> store, -final Record record) { +final KeyValueStore, LeftOrRightValue> store, +final Record record) { Review Comment: nit: ```java private void emitNonJoinedOuterRecords(final KeyValueStore, LeftOrRightValue> store, final Record record) { ``` ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -275,20 +246,22 @@ private void emitNonJoinedOuterRecords( } } -@SuppressWarnings("unchecked") -private VOut getNullJoinedValue( -final K key, -final LeftOrRightValue leftOrRightValue) { -// depending on the JoinSide fill in the joiner key and joiner values -if (isLeftSide) { -return joiner.apply(key, -leftOrRightValue.getLeftValue(), -leftOrRightValue.getRightValue()); -} else { -return joiner.apply(key, -(V1) leftOrRightValue.getRightValue(), -(V2) leftOrRightValue.getLeftValue()); -} +private void forwardNonJoinedOuterRecords(final Record record, final KeyValue, ? extends LeftOrRightValue> nextKeyValue) { Review Comment: I think it would be simpler to define this method as: ```java private void forwardNonJoinedOuterRecords(final Record record, final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide, final LeftOrRightValue leftOrRightValue) { ``` It makes the code a bit simpler and shorter. Also here, why not `Record record`. That is exactly the type used at the call site. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -187,17 +157,25 @@ public void process(final Record record) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } else { sharedTimeTracker.updatedMinTime(inputRecordTimestamp); -outerJoinStore.ifPresent(store -> store.put( -TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp), -LeftOrRightValue.make(isLeftSide, record.value(; +putInOuterJoinStore(record); } } } } +protected abstract TimestampedKeyAndJoinSide makeThisKey(final K key, final long inputRecordTimestamp); + +protected abstract LeftOrRightValue makeThisValue(final VThis thisValue); + +protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K key, final long timestamp); + +protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue); + +protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue); + private void emitNonJoinedOuterRecords( -final KeyValueStore, LeftOrRightValue> store, -final Record record) { +final KeyValueStore, LeftOrRightValue> store, +final Record record) { Review Comment: Shouldn't that be ```java final Record record ``` ? ##
[jira] [Created] (KAFKA-16789) Thread leak detection checks for incorrect QuorumController thread name
Gaurav Narula created KAFKA-16789: - Summary: Thread leak detection checks for incorrect QuorumController thread name Key: KAFKA-16789 URL: https://issues.apache.org/jira/browse/KAFKA-16789 Project: Kafka Issue Type: Test Reporter: Gaurav Narula [PR-11417|https://github.com/apache/kafka/pull/11417] introduced thread leak detection for QuromController event queue thread. Later, [PR-13390|https://github.com/apache/kafka/pull/13390] changed conventions around thread names used in Kraft. Unfortunately, the thread-leak detection bit wasn't updated in the latter PR. We should update {{TestUtils::verifyNoUnexpectedThreads}} to instead check for event handler thread leaks by checking for the {{"event-handler"}} suffix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16789) Thread leak detection checks for incorrect QuorumController thread name
[ https://issues.apache.org/jira/browse/KAFKA-16789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula reassigned KAFKA-16789: - Assignee: Gaurav Narula > Thread leak detection checks for incorrect QuorumController thread name > --- > > Key: KAFKA-16789 > URL: https://issues.apache.org/jira/browse/KAFKA-16789 > Project: Kafka > Issue Type: Test >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > > [PR-11417|https://github.com/apache/kafka/pull/11417] introduced thread leak > detection for QuromController event queue thread. Later, > [PR-13390|https://github.com/apache/kafka/pull/13390] changed conventions > around thread names used in Kraft. Unfortunately, the thread-leak detection > bit wasn't updated in the latter PR. > We should update {{TestUtils::verifyNoUnexpectedThreads}} to instead check > for event handler thread leaks by checking for the {{"event-handler"}} suffix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
[ https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847294#comment-17847294 ] Muralidhar Basani commented on KAFKA-16448: --- Thanks [~mjsax] [~lmunoz] if there any sub tasks, pls let me know. thanks. > Add Kafka Streams exception handler for exceptions occuring during processing > (KIP-1033) > > > Key: KAFKA-16448 > URL: https://issues.apache.org/jira/browse/KAFKA-16448 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina >Assignee: Loïc Greffier >Priority: Minor > Fix For: 3.8 > > > Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler > for exceptions occuring during > processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16784) Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new test infra
[ https://issues.apache.org/jira/browse/KAFKA-16784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847272#comment-17847272 ] PoAn Yang commented on KAFKA-16784: --- Hi [~chia7712], I'm interested in this issue. May I take it? Thank you. > Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new > test infra > - > > Key: KAFKA-16784 > URL: https://issues.apache.org/jira/browse/KAFKA-16784 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: storage_test > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]
lucasbru commented on PR #14216: URL: https://github.com/apache/kafka/pull/14216#issuecomment-2117519600 @mimaison -- Let me test this. Yes, I see that also the version of requests inside ducktape was reverted: https://github.com/confluentinc/ducktape/commit/29dcdbeb1a8048505ce1613eb9aad05bc6b044f0 I think there were version conflicts around urllib that broke the system tests. Not sure why kafka depends on ducktape < 0.9.0. I can try to bump it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]
FrankYang0529 opened a new pull request, #15983: URL: https://github.com/apache/kafka/pull/15983 * Replace `TopicBasedRemoteLogMetadataManagerWrapperWithHarness` with `RemoteLogMetadataManagerTestUtils#builder` in `RemoteLogMetadataManagerTest`. * Use `ClusterTestExtention` for `RemoteLogMetadataManagerTest`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]
mimaison commented on PR #14216: URL: https://github.com/apache/kafka/pull/14216#issuecomment-2117500715 Thanks @lucasbru ! I wonder if we also need to bump ducktape to 0.11.4 as it's seems previous versions depend on requests<2.31.0: https://ducktape.readthedocs.io/en/latest/changelog.html#section-1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]
lucasbru commented on PR #14216: URL: https://github.com/apache/kafka/pull/14216#issuecomment-2117489283 @mimaison I remember I had problems running the system tests with that version, but I don't think I intended merging the version change. Sorry for that, I'll test the newer version and bring the version bump back to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: use try-with resources in ClientMetricsManagerTest [kafka]
gaurav-narula opened a new pull request, #15982: URL: https://github.com/apache/kafka/pull/15982 Ensures we close resources even if an exception is thrown within a test. Also added an `@AfterAll` to ensure threads aren't leaked from the test class. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on PR #15305: URL: https://github.com/apache/kafka/pull/15305#issuecomment-2117312634 Thanks @gharris1727 for another round of review. I have made the changes. Please review when you have some time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1604768491 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +268,20 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { +Stage currentStage = listener.onPollTimeoutExpiry(); +log.warn("worker poll timeout has expired. This means the time between subsequent calls to poll() " + +"in DistributedHerder tick() method was longer than the configured rebalance.timeout.ms. " + Review Comment: Got it, yeah I changed the message accordingly. I tried to keep the logline similar to the one in consumer's poll timeout but i think it's fine to deviate in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1604767686 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception { ); } +@Test +public void testPollTimeoutExpiry() throws Exception { +// This is a fabricated test to ensure that a poll timeout expiry happens. The tick thread awaits on +// task#stop method which is blocked. The timeouts have been set accordingly +workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(20))); +workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(40))); +connect = connectBuilder +.numBrokers(1) +.numWorkers(1) +.build(); + +connect.start(); + +connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not brought up in time"); + +Map connectorWithBlockingTaskStopConfig = new HashMap<>(); +connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getName()); +connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1"); + connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, Objects.requireNonNull(TASK_STOP)); + +connect.configureConnector(CONNECTOR_NAME, connectorWithBlockingTaskStopConfig); + +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, 1, "connector and tasks did not start in time" +); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +connect.restartTask(CONNECTOR_NAME, 0); +TestUtils.waitForCondition(() -> logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN")) && Review Comment: Yeah I wanted to add the test in `BlockingConnectorTest` itself but it would have meant a lot of changes in that class. That is because currently that test doesn't support setting worker level properties or changing the number of workers. Being able to change the worker level properties was the way I could get the poll timeout expiry. Moreover, the test I have added doesn't really block for the entire stop method but ends almost after the task shutdown graceful ms period ends because of the reset at the end of the test. Let me know if that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1604759527 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java: ## @@ -36,4 +37,15 @@ public interface WorkerRebalanceListener { * or tasks might refer to all or some of the connectors and tasks running on the worker. */ void onRevoked(String leader, Collection connectors, Collection tasks); + + +/** + * Invoked when a worker experiences a poll timeout expiry. Invoking this method allows getting + * the stage which was currently being executed when the poll timeout happened. The default implementation + * returns null + * @return The current stage being executed. Could be null + */ +default Stage onPollTimeoutExpiry() { Review Comment: I added the default bit to avoid changing the tests. Turns out it should be ok to modify them so I removed the default bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1604757990 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection connectors, Collection
[jira] [Created] (KAFKA-16788) Resource leakage due to absence of close() call on connector start failure
Ashok created KAFKA-16788: - Summary: Resource leakage due to absence of close() call on connector start failure Key: KAFKA-16788 URL: https://issues.apache.org/jira/browse/KAFKA-16788 Project: Kafka Issue Type: Bug Components: connect Reporter: Ashok Assignee: Ashok We have identified a potential issue in the WorkerConnector class of the Connect framework. Specifically, the close() method is not being called on the connector when the connector fails to start due to various reasons. This omission prevents the connector from releasing any resources that were created or started as part of the start() method. As a result, these resources remain allocated even after the connector has failed to start, leading to resource leakage. To address this issue, we propose modifying the WorkerConnector class to ensure that the close() method is called on the connector whenever the connector fails to start. This change will allow the connector to properly release its resources, preventing resource leakage. *Steps to Reproduce:* # Initiate a connector that creates or allocates resources (for instance, threads) during the execution of the start() method. # Generate a problem that, during the start() process, either triggers an exception or invokes the raiseError(Exception e) method of the WorkerConnectorContext. # Notice that the close() method is not invoked on the connector, resulting in resource leakage, as the stop() method is where the resources are typically closed. In our scenario, the issue was related to threads not being properly closed. These threads were initiated as part of the start() method in the connector. *Expected Result:* When a connector fails to start, the close() method should be called to allow the connector to release its resources. *Actual Result:* The close() method is not called when a connector fails to start, leading to resource leakage. -- This message was sent by Atlassian Jira (v8.20.10#820010)