Re: [PR] KAFKA-16992: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka [kafka]

2024-05-17 Thread via GitHub


jolshan merged PR #15971:
URL: https://github.com/apache/kafka/pull/15971


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16791) Add thread detection to ClusterTestExtensions

2024-05-17 Thread bboyleonp (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847482#comment-17847482
 ] 

bboyleonp commented on KAFKA-16791:
---

Hi [~chia7712], I am interested in this one. May I take it? Thanks.

> Add thread detection to ClusterTestExtensions
> -
>
> Key: KAFKA-16791
> URL: https://issues.apache.org/jira/browse/KAFKA-16791
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> `ClusterTestExtensions` should implement `BeforeAllCallback` and 
> `AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605660608


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   Sorry for the wall of text  It might not seem like a huge deal but if it's 
an app with only source-changelog partitions, then doing this will save the 
assignor from having to make any DescribeTopics request since there are no 
non-source changelogs. 
   
   And yes, apps with only source changelogs do exist, they're pretty common 
for certain kinds of table-based processing (and especially apps that make 
heavy use of IQ). And saving a remote fetch is actually a pretty big deal, 
doing them in the middle of an assignment makes the rebalance vulnerable to 
timing out, especially when brokers are under heavy load or the app is 
experiencing rebalancing issues to begin with



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659907


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   To be more precise, I'm imagining something like this:
   
   ```
   final Set sourceTopicPartitions = new HashSet<>();
   final Set changelogTopicPartitions = new HashSet<>();
   final Set nonSourceChangelogTopicPartitions = new 
HashSet<>();
   
   for (final var entry : sourceTopicPartitions.entrySet()) {
   final TaskId task = entry.getKey();
   final Set taskSourcePartitions = entry.getValue();
   final Set taskChangelogPartitions = 
changelogTopicPartitions.get(taskId);
   final Set taskNonSourceChangelogPartitions = new 
HashSet(taskChangelogPartitions);
   taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
   
   logicalTaskIds.add(taskId);
   sourceTopicPartitions.addAll(taskSourcePartitions);
   changelogTopicPartitions.addAll(taskChangelogPartitions);
  
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
   }
   ```
   
   Then we pass the `nonSourceChangelogPartitions` into the 
`#getRacksForTopicPartition` instead of the `changelogPartitions` set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659120


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   Note that we'll also want to deduplicate the source-changelog partitions for 
the rack id computation. We should include them in the source topics/remove 
them from the changelog topics passed into the `#getRacksForTopicPartitions` 
call. Of course we still need the changelogTopicPartitions as well, so we'll 
want a third set of `nonSourceChangelogTopicPartitions` that's specifically for 
the rack id computation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605656530


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+changelogTopicPartitions.addAll(partitions);
+});
+
+final Map> racksForSourcePartitions = 
RackUtils.getRacksForTopicPartition(
+cluster, internalTopicManager, sourceTopicPartitions, false);
+final Map> racksForChangelogPartitions = 
RackUtils.getRacksForTopicPartition(

Review Comment:
   Since the rack info is nontrivial to compute and always makes a remote call 
(which can take a long time and even time out or otherwise fail) and not every 
assignor/app will actually use it I'm thinking maybe we should try to 
initialize it lazily, only if/when the user actually requests the rack info
   
   I'm totally happy to push that into a followup PR to keep the scope 
well-defined for now, so don't worry about it for now. We'd still need 
everything you implemented here and would just be moving it around and/or 
subbing in function pointers instead of passing around data strucutres 
directly, so it shouldn't have any impact on how this PR goes. Just wanted to 
make a note so I don't forget



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605639072


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   I suppose this doesn't hurt anything since `logicalTasks` is a Set, but the 
taskIds returned by the partition grouper should be the same for the source and 
changelog topics. So you can remove this line
   
   (alternatively you can create the `logicalTaskIds` map up front by copying 
the keyset of one of the partitionsForTask maps but that's just an 
implementation detail, up to you. However I would probably consider adding a 
check to make sure these two maps return the same set of tasks. Doesn't need to 
scan the entire thing, maybe just a simple 
   
   ```
   if (sourcePartitionsForTask.size() != changelogPartitionsForTask.size()) {
 log.error("Partition grouper returned {} tasks for source topics but {} 
tasks for changelog topics, 
  sourcePartitionsForTask.size(), 
changelogPartitionsForTask.size());
 throw new TaskAssignmentException(//error msg );
   }



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple container class corresponding to a given {@link 

Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-17 Thread via GitHub


kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2118579935

   @lianetm @cadonna—I believe I have addressed all the actionable feedback. 
Are there additional concerns about this PR that prevent it from being merged? 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-17 Thread via GitHub


kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2118578191

   > Hey @cadonna, the tricky bit is that, for some events, the request 
managers do expire requests too, so in this flow you described:
   > 
   > > The event is processed in the ApplicationEventHandler and a request is 
added to the commit request manager. **Then the commit request manager is 
polled**, the requests are added to the network client and the the network 
client is polled
   > 
   > When the manager is polled, if the event had timeout 0, it will be 
expired/cancelled before making it to the network thread. Currently we have 2 
managers that do this (that I can remember): 
[TopicMetadataManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L86)
 and 
[CommitRequestManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1168).
 So for those events, even with this PR, if they have timeout 0, they won't 
have a chance to complete.
   > 
   > My point is not to bring more changes into this PR, only to have the whole 
situation in mind so we can address it properly (with multiple PRs). This other 
[PR](https://github.com/apache/kafka/pull/15844) attempts to address this 
situation I described, but only in the `CommitRequestManager` for instance. We 
still have to align on the approach there, and also handle it in the 
`TopicMetadataManager` I would say. I would expect that a combination of this 
PR and those others would allow us to get to a better point (now, even with 
this PR, we cannot make basic progress with a consumer being continuously 
polled with timeout 0 because `FetchCommittedOffsets` is always expired by the 
manager, for instance). I can easily repro it with the following integration 
test + poll(ZERO) (that I was surprised we have not covered, because TestUtils 
always polls with a non-zero timeout)
   > 
   > ```
   >   // Ensure TestUtils polls with ZERO. This fails for the new consumer 
only.
   >   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   >   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   >   def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, 
groupProtocol: String): Unit = {
   > val numMessages = 100
   > val producer = createProducer()
   > sendRecords(producer, numMessages, tp)
   > 
   > val consumer = createConsumer()
   > consumer.subscribe(Set(topic).asJava)
   > val records = awaitNonEmptyRecords(consumer, tp)
   > assertEquals(numMessages, records.count())
   >   }
   > ```
   > 
   > Makes sense?
   
   Yes, the network layer changes are captured in KAFKA-16200 and build on top 
of this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


gaurav-narula commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1605528394


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   Nvm, I see your point - `accumulator.take()` may be invoked - the block on 
the delegate may happen *after* the timer has been incremented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-17 Thread via GitHub


jeffkbkim commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1605535757


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -59,7 +59,7 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
 /**
  * The assignment specification which includes member metadata.
  */
-private final AssignmentSpec assignmentSpec;
+private final GroupSpecImpl groupSpec;

Review Comment:
   There are several places where we use GroupSpecImpl as the object type 
instead of GroupSpec. can we change all of them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


gaurav-narula commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1605530429


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   IIUC, it's the ordering that matters so we can avoid the null check and 
increment time only after `super.take()` returns.
   
   ```java
   CoordinatorEvent event = super.take(); // blocks until there are 
new events
   time.sleep(takeDelayMs); // only increment timer after we get 
unblocked
   return event;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


gaurav-narula commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1605530429


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   IIUC, it's the ordering that matters so we can avoid the null check and 
increment time only after `super.take()` returns.
   
   ```java
   CoordinatorEvent event = super.take();
   time.sleep(takeDelayMs);
   return event;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


gaurav-narula commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1605528394


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   Nvm, I see your point - `accumulator.take()` may be invoked - the block on 
the delagate may happen after the timer is incremented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


gaurav-narula commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1605525333


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   > we may be at the point when we process the 8 events (diff == 800ms) or 
when the thread does another iteration over handleEvents(), which does another 
time.sleep() (diff == 900ms). This was causing the flakiness.
   
   Sorry, I'm still confused. Why would another iteration over `handleEvents()` 
invoke the `doAnswer`? In other words, shouldn't `accumulator.take()` block 
since there are no more events? I'd imagine `EventAccumulator::take` to block 
on `condition.await()` after 8 events have been processed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


jeffkbkim commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1605519141


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   That's correct and was my initial confusion as well. The event processor 
shuts down at the end of the test so we have the processor thread still running 
when we perform the assertions.
   
   When we call
   ```
   long diff = time.milliseconds() - startMs;
   ```
   we may be at the point when we process the 8 events (diff == 800ms) or when 
the thread does another iteration over handleEvents(), which does another 
time.sleep() (diff == 900ms). This was causing the flakiness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-17 Thread via GitHub


dongnuo123 opened a new pull request, #15988:
URL: https://github.com/apache/kafka/pull/15988

   This patch implements the heartbeat api to the members that use the classic 
protocol in a ConsumerGroup.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16793) Heartbeat API for upgrading ConsumerGroup

2024-05-17 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16793:
---

 Summary: Heartbeat API for upgrading ConsumerGroup
 Key: KAFKA-16793
 URL: https://issues.apache.org/jira/browse/KAFKA-16793
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu
Assignee: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15865:
URL: https://github.com/apache/kafka/pull/15865


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


gaurav-narula commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1605472276


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -475,9 +478,7 @@ public void testRecordThreadIdleRatio() throws Exception {
 doAnswer(invocation -> {
 long threadIdleTime = idleTimeCaptured.getValue();
 assertEquals(100, threadIdleTime);
-synchronized (recordedIdleTimesMs) {
-recordedIdleTimesMs.add(threadIdleTime);
-}
+recordedIdleTimesMs.add(threadIdleTime);

Review Comment:
   Nit: might be useful to add a comment that this is safe because `numThreads` 
in line 470 is `1`.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   IIUC, the fix suggests the flakyness is because `super.take()` returns 
`null` spuriously. I don't quite follow why that would happen though. Javadoc 
for `EventAccumulator::take` mentions it returns `null` only when the 
accumulator is closed. We also assert the value captured within doAnswer is 
`100`, `recordedIdleTimesMs.size() == 8` and 
`mockRuntimeMetrics.recordThreadIdleTime()` is invoked 8 times so where is the 
extra invocation coming from?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15933:
URL: https://github.com/apache/kafka/pull/15933


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15723) KRaft support in ListOffsetsRequestTest

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15723.

Fix Version/s: 3.8.0
   Resolution: Fixed

> KRaft support in ListOffsetsRequestTest
> ---
>
> Key: KAFKA-15723
> URL: https://issues.apache.org/jira/browse/KAFKA-15723
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Mickael Maison
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.8.0
>
>
> The following tests in ListOffsetsRequestTest in 
> core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala need to be 
> updated to support KRaft
> 37 : def testListOffsetsErrorCodes(): Unit = {
> 84 : def testListOffsetsMaxTimeStampOldestVersion(): Unit = {
> 112 : def testCurrentEpochValidation(): Unit = {
> 173 : def testResponseIncludesLeaderEpoch(): Unit = {
> 210 : def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
> Scanned 261 lines. Found 0 KRaft tests out of 5 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15723: KRaft support in ListOffsetsRequestTest [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15980:
URL: https://github.com/apache/kafka/pull/15980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16544.

Fix Version/s: 3.8
   Resolution: Fixed

> DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames 
> should return null instead of throwing NPE
> --
>
> Key: KAFKA-16544
> URL: https://issues.apache.org/jira/browse/KAFKA-16544
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8
>
>
> {code:java}
>  * @return A future map from topic names to descriptions which can be 
> used to check
>  * the status of individual description if the describe topic 
> request used
>  * topic names, otherwise return null, this request succeeds only 
> if all the
>  * topic descriptions succeed
> {code}
> According the docs, it should return null if we try to get the result 
> unmatched to the request. For example, we call `allTopicNames` in passing 
> `TopicIdCollection`. However, the current implementation will throw NPE 
> directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16544 DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15979:
URL: https://github.com/apache/kafka/pull/15979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7342: Migrate tests in o.a.k.streams to JUnit 5 (except KafkaStreamsTest) [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15942:
URL: https://github.com/apache/kafka/pull/15942


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Summary: Enable consumer unit tests that fail to fetch offsets only for new 
consumer with poll(0)  (was: Enable related unit tests that fail to fetch 
offsets only for new consumer with to poll(0))

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16764:
--

Assignee: appchemist

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: appchemist
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847427#comment-17847427
 ] 

Lianet Magrans commented on KAFKA-16764:


Sure! Thanks for helping out! Follow-up on the PR. 

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16792) Enable related unit tests that fail only for new consumer with to poll(0)

2024-05-17 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16792:
--

 Summary: Enable related unit tests that fail only for new consumer 
with to poll(0)
 Key: KAFKA-16792
 URL: https://issues.apache.org/jira/browse/KAFKA-16792
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Lianet Magrans


Enable the following unit tests for the new async consumer in KafkaConsumerTest:
- testFetchStableOffsetThrowInPoll

- testCurrentLag
- testListOffsetShouldUpdateSubscriptions
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0)

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Summary: Enable related unit tests that fail to fetch offsets only for new 
consumer with to poll(0)  (was: Enable related unit tests that fail only for 
new consumer with to poll(0))

> Enable related unit tests that fail to fetch offsets only for new consumer 
> with to poll(0)
> --
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
> - testFetchStableOffsetThrowInPoll
> - testCurrentLag
> - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0)

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Description: 
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 

  was:
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
- testFetchStableOffsetThrowInPoll

- testCurrentLag
- testListOffsetShouldUpdateSubscriptions
 


> Enable related unit tests that fail to fetch offsets only for new consumer 
> with to poll(0)
> --
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16791) Add thread detection to ClusterTestExtensions

2024-05-17 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16791:
--

 Summary: Add thread detection to ClusterTestExtensions
 Key: KAFKA-16791
 URL: https://issues.apache.org/jira/browse/KAFKA-16791
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


`ClusterTestExtensions` should implement `BeforeAllCallback` and 
`AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-17 Thread via GitHub


jeffkbkim opened a new pull request, #15987:
URL: https://github.com/apache/kafka/pull/15987

   DelayEventAccumulator should return immediately if there are no events in 
the queue. Also removed some unused fields inside EventProcessorThread
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: use try-with resources in ClientMetricsManagerTest [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15982:
URL: https://github.com/apache/kafka/pull/15982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add debug enablement check when using log.debug [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15977:
URL: https://github.com/apache/kafka/pull/15977


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16789: Fix thread leak detection for event handler threads [kafka]

2024-05-17 Thread via GitHub


chia7712 commented on code in PR #15984:
URL: https://github.com/apache/kafka/pull/15984#discussion_r1605427011


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1872,7 +1872,7 @@ object TestUtils extends Logging {
   AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
   AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
   QuorumTestHarness.ZkClientEventThreadSuffix,
-  QuorumController.CONTROLLER_THREAD_SUFFIX,

Review Comment:
   It seems `CONTROLLER_THREAD_SUFFIX` can be removed from `QuorumController`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-17 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1605420401


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1538,7 +1563,80 @@ class LogValidatorTest {
 assertEquals(e.recordErrors.size, 3)
   }
 
-  private def testBatchWithoutRecordsNotAllowed(sourceCompression: 
CompressionType, targetCompression: CompressionType): Unit = {
+  @Test
+  def testDifferentLevelDoesNotCauseRecompression(): Unit = {
+val records = List(
+  List.fill(256)("some").mkString("").getBytes,
+  List.fill(256)("data").mkString("").getBytes
+)
+// Records from the producer were created with gzip max level
+val gzipMax: Compression = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMax)
+
+// The topic is configured with gzip min level
+val gzipMin: Compression = 
Compression.gzip().level(GzipCompression.MIN_LEVEL).build()
+val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMin)
+
+// ensure data compressed with gzip max and min is different
+assertNotEquals(recordsGzipMax, recordsGzipMin)
+val validator = new LogValidator(recordsGzipMax,
+  topicPartition,
+  time,
+  gzipMax.`type`(),
+  gzipMin,
+  false,
+  RecordBatch.CURRENT_MAGIC_VALUE,

Review Comment:
   Yes, let's keep it consistent. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16626: Lazily convert subscribed topic names to topic ids [kafka]

2024-05-17 Thread via GitHub


jeffkbkim commented on PR #15970:
URL: https://github.com/apache/kafka/pull/15970#issuecomment-2118184548

   RangeAssignor baseline
   ```
   Benchmark (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  CntScoreError  Units
   TargetAssignmentBuilderBenchmark.build1  
   10   100  avgt5  235.398 ± 15.092  ms/op
   ```
   
   RangeAssignor PR
   ```
   Benchmark (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  CntScoreError  Units
   TargetAssignmentBuilderBenchmark.build1  
   10   100  avgt5  161.683 ± 22.739  ms/op
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-17 Thread via GitHub


dajac commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1605415364


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -147,6 +147,11 @@ public static class DeadlineAndEpoch {
  */
 private final TimelineHashMap subscribedTopicNames;
 
+/**
+ * Partition assignments per topic.
+ */
+private final TimelineHashMap> 
partitionAssignments;

Review Comment:
   How about invertedTargetAssiments or something similar?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847392#comment-17847392
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

Even though brokerMetadataPublisher is instantiated with all publishers, and 
then rlm, seems like MetadataLoader#initializeNewPublishers calls 
BrokerMetadataPublisher#onMetadataUpdate, and by then rlm is already 
instantiated ?

So if BrokerMetadataPublisher#onMetadataUpdate is invoked when there are new 
publishers being created like 

DynamicConfigPublisher during brokerMetadataPublisher instance creation, then I 
think yea remoteLogManager is not initialized.

But seems like MetadataLoader#initializeNewPublishers calls 
BrokerMetadataPublisher#onMetadataUpdate, and by then rlm is already 
initialized.

 

It's possible that am wrong or I didn't understand this correctly.

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {

[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-17 Thread Johnson Okorie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847391#comment-17847391
 ] 

Johnson Okorie commented on KAFKA-16692:


Hi [~jolshan], I wanted to express my sincere thanks for your quick response 
and prompt fix for this issue. One of the reasons I love this project! Keep up 
the great work!

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging 

Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-17 Thread via GitHub


rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1605382569


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -147,6 +147,11 @@ public static class DeadlineAndEpoch {
  */
 private final TimelineHashMap subscribedTopicNames;
 
+/**
+ * Partition assignments per topic.
+ */
+private final TimelineHashMap> 
partitionAssignments;

Review Comment:
   like targetPartitionAssignments?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-17 Thread via GitHub


junrao commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1605350575


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -110,40 +111,43 @@ public String toString() {
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+if (now - sample.lastEventMs >= expireAge)
 sample.reset(now);
 }
 }
 
 protected static class Sample {
 public double initialValue;
 public long eventCount;
-public long lastWindowMs;
+public long startTimeMs;
+public long lastEventMs;
 public double value;
 
 public Sample(double initialValue, long now) {
 this.initialValue = initialValue;
 this.eventCount = 0;
-this.lastWindowMs = now;
+this.startTimeMs = now;
+this.lastEventMs = now;
 this.value = initialValue;
 }
 
 public void reset(long now) {
 this.eventCount = 0;
-this.lastWindowMs = now;
+this.startTimeMs = now;
+this.lastEventMs = now;
 this.value = initialValue;
 }
 
 public boolean isComplete(long timeMs, MetricConfig config) {
-return timeMs - lastWindowMs >= config.timeWindowMs() || 
eventCount >= config.eventWindow();
+return timeMs - startTimeMs >= config.timeWindowMs() || eventCount 
>= config.eventWindow();
 }
 
 @Override
 public String toString() {
 return "Sample(" +
 "value=" + value +
 ", eventCount=" + eventCount +
-", lastWindowMs=" + lastWindowMs +
+", startTimeMs=" + startTimeMs +

Review Comment:
   Should we add lastEventMs?



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -110,40 +111,43 @@ public String toString() {
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+if (now - sample.lastEventMs >= expireAge)

Review Comment:
   Could we add a comment that we don't purge overlapping samples?



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##
@@ -82,7 +82,7 @@ public long windowSize(MetricConfig config, long now) {
  * but this approach does not account for sleeps. SampledStat only 
creates samples whenever record is called,
  * if no record is called for a period of time that time is not 
accounted for in windowSize and produces incorrect results.
  */
-long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
+long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs;

Review Comment:
   Could we add some comments to note that the total elapse time could be 
larger than the window size since we keep the overlapping samples?



##
clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java:
##
@@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int 
numSample, int sampleWindowS
 double expectedRatePerSec = sampleValue / windowSize;
 assertEquals(expectedRatePerSec, observedRate, EPS);
 }
+
+// Record an event every 100 ms on average, moving some 1 ms back or forth 
for fine-grained 
+// window control. The expected rate, hence, is 10-11 events/sec depending 
on the moment of 
+// measurement. Start assertions from the second window.
+@Test
+public void testRateIsConsistentAfterTheFirstWindow() {
+MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+List steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 
100, 100, 100, 100);
+
+// start the first window and record events at 0,99,199,...,999 ms 
+for (int stepMs : steps) {
+time.sleep(stepMs);
+rate.record(config, 1, time.milliseconds());
+}
+
+// making a gap of 100 ms between windows
+time.sleep(101);
+
+// start the second window and record events at 0,99,199,...,999 ms
+for (int stepMs : steps) {
+time.sleep(stepMs);
+rate.record(config, 1, time.milliseconds());
+double observedRate = rate.measure(config, time.milliseconds());

Review Comment:
   Should we do a second measurement and check that the result doesn't change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] KAFKA-16654:Refactor kafka.test.annotation.Type and ClusterTestExtensions [kafka]

2024-05-17 Thread via GitHub


chia7712 commented on code in PR #15916:
URL: https://github.com/apache/kafka/pull/15916#discussion_r1605371498


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -62,10 +63,10 @@ public class ClusterTestExtensionsTest {
 }
 
 // Static methods can generate cluster configurations
-static void generate1(ClusterGenerator clusterGenerator) {
+static List generate1() {
 Map serverProperties = new HashMap<>();
 serverProperties.put("foo", "bar");
-clusterGenerator.accept(ClusterConfig.defaultBuilder()
+return Arrays.asList(ClusterConfig.defaultBuilder()

Review Comment:
   `Collections.singletonList`



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -122,31 +115,57 @@ public Stream 
provideTestTemplateInvocationContex
 return generatedContexts.stream();
 }
 
-void processClusterTemplate(ExtensionContext context, ClusterTemplate 
annot,
-
Consumer testInvocations) {
+
+
+List 
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
 // If specified, call cluster config generated method (must be static)
 List generatedClusterConfigs = new ArrayList<>();

Review Comment:
   Please remove this unused variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14517) Implement regex subscriptions

2024-05-17 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847386#comment-17847386
 ] 

Phuc Hong Tran commented on KAFKA-14517:


[~lianetm] I’m in the process of implementing this one already

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16788) Resource leakage due to absence of close() call on connector start failure

2024-05-17 Thread Ashok (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847385#comment-17847385
 ] 

Ashok commented on KAFKA-16788:
---

[~gharris1727] Thanks. yes, I am working on the fix. I will raise a PR next 
week.

> Resource leakage due to absence of close() call on connector start failure
> --
>
> Key: KAFKA-16788
> URL: https://issues.apache.org/jira/browse/KAFKA-16788
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Ashok
>Assignee: Ashok
>Priority: Critical
>
> We have identified a potential issue in the WorkerConnector class of the 
> Connect framework. Specifically, the close() method is not being called on 
> the connector when the connector fails to start due to various reasons. This 
> omission prevents the connector from releasing any resources that were 
> created or started as part of the start() method. As a result, these 
> resources remain allocated even after the connector has failed to start, 
> leading to resource leakage.  
> To address this issue, we propose modifying the WorkerConnector class to 
> ensure that the close() method is called on the connector whenever the 
> connector fails to start. This change will allow the connector to properly 
> release its resources, preventing resource leakage.
> *Steps to Reproduce:*  
>  # Initiate a connector that creates or allocates resources (for instance, 
> threads) during the execution of the start() method.
>  # Generate a problem that, during the start() process, either triggers an 
> exception or invokes the raiseError(Exception e) method of the 
> WorkerConnectorContext.
>  # Notice that the close() method is not invoked on the connector, resulting 
> in resource leakage, as the stop() method is where the resources are 
> typically closed.
> In our scenario, the issue was related to threads not being properly closed. 
> These threads were initiated as part of the start() method in the connector.
> *Expected Result:*  
> When a connector fails to start, the close() method should be called to allow 
> the connector to release its resources.  
> *Actual Result:*  
> The close() method is not called when a connector fails to start, leading to 
> resource leakage. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847384#comment-17847384
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

[~christo_lolov] can I look into this issue ?

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- a/core/src/main/scala/kafka/server/ReplicaManager.scala
> +++ 

Re: [PR] MINOR: Refactor write timeout in CoordinatorRuntime [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15976:
URL: https://github.com/apache/kafka/pull/15976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16788) Resource leakage due to absence of close() call on connector start failure

2024-05-17 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847383#comment-17847383
 ] 

Greg Harris commented on KAFKA-16788:
-

Nice find [~ashok89]! Are you interested in working on a fix for this?

> Resource leakage due to absence of close() call on connector start failure
> --
>
> Key: KAFKA-16788
> URL: https://issues.apache.org/jira/browse/KAFKA-16788
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Ashok
>Assignee: Ashok
>Priority: Critical
>
> We have identified a potential issue in the WorkerConnector class of the 
> Connect framework. Specifically, the close() method is not being called on 
> the connector when the connector fails to start due to various reasons. This 
> omission prevents the connector from releasing any resources that were 
> created or started as part of the start() method. As a result, these 
> resources remain allocated even after the connector has failed to start, 
> leading to resource leakage.  
> To address this issue, we propose modifying the WorkerConnector class to 
> ensure that the close() method is called on the connector whenever the 
> connector fails to start. This change will allow the connector to properly 
> release its resources, preventing resource leakage.
> *Steps to Reproduce:*  
>  # Initiate a connector that creates or allocates resources (for instance, 
> threads) during the execution of the start() method.
>  # Generate a problem that, during the start() process, either triggers an 
> exception or invokes the raiseError(Exception e) method of the 
> WorkerConnectorContext.
>  # Notice that the close() method is not invoked on the connector, resulting 
> in resource leakage, as the stop() method is where the resources are 
> typically closed.
> In our scenario, the issue was related to threads not being properly closed. 
> These threads were initiated as part of the start() method in the connector.
> *Expected Result:*  
> When a connector fails to start, the close() method should be called to allow 
> the connector to release its resources.  
> *Actual Result:*  
> The close() method is not called when a connector fails to start, leading to 
> resource leakage. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-17 Thread via GitHub


chia7712 commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1605365153


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 cleanerManager.removeMetrics()

Review Comment:
   > We can removed the cleanerManager.removeMetrics() then we should also 
removed 
[this](https://github.com/apache/kafka/blob/fafa3c76dc93f3258b2cea49dfd1dc7a724a213c/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala#L96)
 in the testRemoveMetricsOnClose().
   
   My point was - we don't need to re-create the metrics after reconfiguration 
- i.e  in reconfiguration we SHOULD NOT remove the metrics. For example,
   ```scala
 private[this] def shutdownCleaners(): Unit = {
   info("Shutting down the log cleaner.")
   cleaners.foreach(_.shutdown())
   cleaners.clear()
 }
 
 /**
  * Stop the background cleaner threads
  */
 def shutdown(): Unit = {
   try shutdownCleaners()
   finally removeMetrics()
   info("Shutting down the log cleaner.")
 }
   ```
   
   With above changes, in `reconfigure` we call `shutdownCleaners` instead of 
`shutdown`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-17 Thread via GitHub


chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1605361253


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -240,28 +215,61 @@ public void startBroker(int brokerId) {
 @Override
 public void waitForReadyBrokers() throws InterruptedException {
 try {
-clusterReference.get().waitForReadyBrokers();
+clusterTestKit.waitForReadyBrokers();
 } catch (ExecutionException e) {
 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
 }
 }
 
-private BrokerServer findBrokerOrThrow(int brokerId) {
-return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-}
 
 @Override
 public Map brokers() {
-return clusterReference.get().brokers().entrySet()
+return clusterTestKit.brokers().entrySet()
 .stream()
 .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
 }
 
 @Override
 public Map controllers() {
-return 
Collections.unmodifiableMap(clusterReference.get().controllers());
+return Collections.unmodifiableMap(clusterTestKit.controllers());
+}
+
+public void format() throws Exception {

Review Comment:
   We can put `safeBuildCluster` and `doBuild` into `format`, right?
   ```java
   public void format() throws Exception {
   if (formated.compareAndSet(false,true)) {
   TestKitNodes nodes = new TestKitNodes.Builder()
   
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
   .setCombined(isCombined)
   .setNumBrokerNodes(clusterConfig.numBrokers())
   .setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
   
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
   
.setNumControllerNodes(clusterConfig.numControllers()).build();
   KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(nodes);
   if (Boolean.parseBoolean(clusterConfig.serverProperties()
   .getOrDefault("zookeeper.metadata.migration.enable", 
"false"))) {
   this.embeddedZookeeper = new EmbeddedZookeeper();
   builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", embeddedZookeeper.port()));
   }
   // Copy properties into the TestKit builder
   
clusterConfig.serverProperties().forEach(builder::setConfigProp);
   // KAFKA-12512 need to pass security protocol and listener 
name here
   this.clusterTestKit = builder.build();
   this.clusterTestKit.format();
   }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-16790:
--
Description: 
BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
RemoteLogManager is configured after the BrokerMetadataPublisher starts running 
(3, 4). This is incorrect, we either need to initialise the RemoteLogManager 
before we start the BrokerMetadataPublisher or we need to skip calls to 
onLeadershipChange if the RemoteLogManager is not initialised.

(1) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]

(2) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]

(3) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]

(4) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]

The way to reproduce the problem is by looking at the following changes

```

---
 config/kraft/broker.properties                         | 10 ++
 .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
 core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties
index 2d15997f28..39d126cf87 100644
--- a/config/kraft/broker.properties
+++ b/config/kraft/broker.properties
@@ -127,3 +127,13 @@ log.segment.bytes=1073741824
 # The interval at which log segments are checked to see if they can be deleted 
according
 # to the retention policies
 log.retention.check.interval.ms=30
+
+remote.log.storage.system.enable=true
+remote.log.metadata.manager.listener.name=PLAINTEXT
+remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
+remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
+remote.log.storage.manager.impl.prefix=rsm.config.
+remote.log.metadata.manager.impl.prefix=rlmm.config.
+rsm.config.dir=/tmp/kafka-remote-storage
+rlmm.config.remote.log.metadata.topic.replication.factor=1
+log.retention.check.interval.ms=1000
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6555b7c0cd..e84a072abc 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
     // The endpoint for remote log metadata manager to connect to
     private Optional endpoint = Optional.empty();
     private boolean closed = false;
+    private boolean up = false;
 
     /**
      * Creates RemoteLogManager instance with the given arguments.
@@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
         // in connecting to the brokers or remote storages.
         configureRSM();
         configureRLMM();
+        up = true;
     }
 
     public RemoteStorageManager storageManager() {
@@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
     public void onLeadershipChange(Set partitionsBecomeLeader,
                                    Set partitionsBecomeFollower,
                                    Map topicIds) {
-        LOGGER.debug("Received leadership changes for leaders: {} and 
followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
+        if (!up) {
+            LOGGER.error("NullPointerException");
+            return;
+        }
+        LOGGER.error("Received leadership changes for leaders: {} and 
followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
 
         Map leaderPartitionsWithLeaderEpoch = 
filterPartitions(partitionsBecomeLeader)
                 .collect(Collectors.toMap(
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 35499430d6..bd3f41c3d6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
     // Before taking the lock, compute the local changes
+    stateChangeLogger.error("ROBIN")
     val localChanges = delta.localChanges(config.nodeId)
     val metadataVersion = newImage.features().metadataVersion()
 
@@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig,
         replicaFetcherManager.shutdownIdleFetcherThreads()
         replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
 
-        remoteLogManager.foreach(rlm => 

[jira] [Updated] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-16790:
--
Description: 
BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
RemoteLogManager is configured after the BrokerMetadataPublisher starts running 
(3, 4). This is incorrect, we either need to initialise the RemoteLogManager 
before we start the BrokerMetadataPublisher or we need to skip calls to 
onLeadershipChange if the RemoteLogManager is not initialised.

(1) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]

(2) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]

(3) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]

(4) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]

The way to reproduce the problem is by looking at the following changes
{code:java}
 config/kraft/broker.properties                         | 10 ++
 .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
 core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
 3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
a/config/kraft/broker.properties b/config/kraft/broker.properties
index 2d15997f28..39d126cf87 100644
--- a/config/kraft/broker.properties
+++ b/config/kraft/broker.properties
@@ -127,3 +127,13 @@ log.segment.bytes=1073741824
 # The interval at which log segments are checked to see if they can be deleted 
according
 # to the retention policies
 log.retention.check.interval.ms=30
+
+remote.log.storage.system.enable=true
+remote.log.metadata.manager.listener.name=PLAINTEXT
+remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
+remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
+remote.log.storage.manager.impl.prefix=rsm.config.
+remote.log.metadata.manager.impl.prefix=rlmm.config.
+rsm.config.dir=/tmp/kafka-remote-storage
+rlmm.config.remote.log.metadata.topic.replication.factor=1
+log.retention.check.interval.ms=1000
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6555b7c0cd..e84a072abc 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
     // The endpoint for remote log metadata manager to connect to
     private Optional endpoint = Optional.empty();
     private boolean closed = false;
+    private boolean up = false;
 
     /**
      * Creates RemoteLogManager instance with the given arguments.
@@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
         // in connecting to the brokers or remote storages.
         configureRSM();
         configureRLMM();
+        up = true;
     }
 
     public RemoteStorageManager storageManager() {
@@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
     public void onLeadershipChange(Set partitionsBecomeLeader,
                                    Set partitionsBecomeFollower,
                                    Map topicIds) {
-        LOGGER.debug("Received leadership changes for leaders: {} and 
followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
+        if (!up) {
+            LOGGER.error("NullPointerException");
+            return;
+        }
+        LOGGER.error("Received leadership changes for leaders: {} and 
followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
 
         Map leaderPartitionsWithLeaderEpoch = 
filterPartitions(partitionsBecomeLeader)
                 .collect(Collectors.toMap(
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 35499430d6..bd3f41c3d6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
     // Before taking the lock, compute the local changes
+    stateChangeLogger.error("ROBIN")
     val localChanges = delta.localChanges(config.nodeId)
     val metadataVersion = newImage.features().metadataVersion()
 
@@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig,
         replicaFetcherManager.shutdownIdleFetcherThreads()
         replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
 
-        remoteLogManager.foreach(rlm => 

Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-17 Thread via GitHub


prestona commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2118027385

   Hi @gharris1727, hopefully the latest commits address your review comments. 
Once again, really appreciate all your feedback and suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16790:
-

 Summary: Calls to RemoteLogManager are made before it is configured
 Key: KAFKA-16790
 URL: https://issues.apache.org/jira/browse/KAFKA-16790
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.8.0
Reporter: Christo Lolov


BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
RemoteLogManager is configured after the BrokerMetadataPublisher starts running 
(3, 4). This is incorrect, we either need to initialise the RemoteLogManager 
before we start the BrokerMetadataPublisher or we need to skip calls to 
onLeadershipChange if the RemoteLogManager is not initialised.

(1) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]

(2) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]

(3) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]

(4) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]

The way to reproduce the problem is by looking at the following branch 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-17 Thread via GitHub


junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1605313096


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1538,7 +1563,80 @@ class LogValidatorTest {
 assertEquals(e.recordErrors.size, 3)
   }
 
-  private def testBatchWithoutRecordsNotAllowed(sourceCompression: 
CompressionType, targetCompression: CompressionType): Unit = {
+  @Test
+  def testDifferentLevelDoesNotCauseRecompression(): Unit = {
+val records = List(
+  List.fill(256)("some").mkString("").getBytes,
+  List.fill(256)("data").mkString("").getBytes
+)
+// Records from the producer were created with gzip max level
+val gzipMax: Compression = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMax)
+
+// The topic is configured with gzip min level
+val gzipMin: Compression = 
Compression.gzip().level(GzipCompression.MIN_LEVEL).build()
+val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMin)
+
+// ensure data compressed with gzip max and min is different
+assertNotEquals(recordsGzipMax, recordsGzipMin)
+val validator = new LogValidator(recordsGzipMax,
+  topicPartition,
+  time,
+  gzipMax.`type`(),
+  gzipMin,
+  false,
+  RecordBatch.CURRENT_MAGIC_VALUE,

Review Comment:
   Earlier, we use RecordBatch.MAGIC_VALUE_V2. Should we be consistent here? 
Ditto in the next test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-17 Thread via GitHub


jsancio opened a new pull request, #15986:
URL: https://github.com/apache/kafka/pull/15986

   DRAFT
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16763.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Upgrade to scala 2.12.19 and scala 2.13.14
> --
>
> Key: KAFKA-16763
> URL: https://issues.apache.org/jira/browse/KAFKA-16763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
> Fix For: 3.8.0
>
>
> scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19)
>  
> scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16763: Upgrade to scala 2.12.19 and scala 2.13.14 [kafka]

2024-05-17 Thread via GitHub


chia7712 merged PR #15958:
URL: https://github.com/apache/kafka/pull/15958


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14517) Implement regex subscriptions

2024-05-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847369#comment-17847369
 ] 

Lianet Magrans commented on KAFKA-14517:


hey [~phuctran], [~dajac] , I would like to take on this one if that's ok? 
Thanks!

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16774:
---
Fix Version/s: 3.8.0

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: flaky-test
> Fix For: 3.8.0
>
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16758:
--

Assignee: Lianet Magrans

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: needs-kip
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15197) Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()

2024-05-17 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847366#comment-17847366
 ] 

Chia-Ping Tsai commented on KAFKA-15197:


It seems that behavior is expected.


The check assumes that consumed offsets of "all" partitions gets updated after 
the consumer commits the offsets. However, the committed offsets are not the 
"end" offsets [0]. When the monotonic offsets of `OffsetSyncStore` is composed 
by end offset (upstream) only, we won't send any checkpoint as offsets 
translation won't generate the offset which is outside the bound [1].
For example:
 # tp has 500 records -> end offset is 500
 # monotonic syncs is [499],[499],...[0],[0]
 # committed consumer offset is 449
 # 499 (synced offset) is bigger than 449 (upstream group offset), and so we 
don't send checkpoint for it.

In order to stabilize the tests, in short, we can change the condition from 
"all" to "some". For example, It works if 50% partitions get updated.

Or we can tweak the translation algorithm to generate more smooth monotonic 
syncs [2]. This is more complicated and it can't solve the issue 100%. However, 
it could reduce of the possibility of updating nothing of group offset to 
downstream

[0] 
[https://github.com/apache/kafka/blob/b8c96389b47df0dbd53fcba9404363dcdf43604d/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java#L716]
[1] 
[https://github.com/apache/kafka/blob/b8c96389b47df0dbd53fcba9404363dcdf43604d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L308]
[2] 
[https://github.com/apache/kafka/blob/b8c96389b47df0dbd53fcba9404363dcdf43604d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L212]

 

[~gharris1727] WDYT? It would be great if MM2 expert can give me feedback :)

> Flaky test 
> MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
> --
>
> Key: KAFKA-15197
> URL: https://issues.apache.org/jira/browse/KAFKA-15197
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.8.0
>
>
> As of Jul 17th, this is the second most flaky test in our CI on trunk and 
> fails 46% of times. 
> See: 
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=Europe/Berlin]
>  
> Note that MirrorConnectorsIntegrationExactlyOnceTest has multiple tests but 
> testOffsetTranslationBehindReplicationFlow is the one that is the reason for 
> most failures. see: 
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=Europe/Berlin=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest]
>  
>  
> Reason for failure is: 
> |org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets for consumer group consumer-group-lagging-behind not translated from 
> primary for topic primary.test-topic-1 ==> expected:  but was: |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-17 Thread via GitHub


dajac commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1605223809


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java:
##
@@ -65,4 +65,33 @@ protected static Set topicIdPartitions(
 .mapToObj(i -> new TopicIdPartition(topic, i))
 ).collect(Collectors.toSet());
 }
+
+/**
+ * Constructs a set of {@code TopicIdPartition} including all the 
partitions that are
+ * currently not assigned to any member.
+ *
+ * @param topicIds  Collection of topic Ids.
+ * @param subscribedTopicDescriber  Describer to fetch partition 
counts for topics.
+ * @param groupSpecThe group's assignment spec.
+ *
+ *
+ * @return Set of unassigned {@code TopicIdPartition} including newly 
added topic partitions.
+ */
+protected static Set unassignedTopicIdPartitions(

Review Comment:
   I just noticed that we already have a loop in 
`OptimizedUniformAssignmentBuilder#buildAssignment` iterating over the 
subscribed topic ids. I would be great if we could merge this one into it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-17 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847352#comment-17847352
 ] 

Justine Olshan commented on KAFKA-16692:


[~soarez] I'd like to try to get this fix into 3.7.1 The PR for trunk is 
approved, but just need to get builds to cooperate. Hope to cherrypick in the 
next day or so.

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this 

[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join

2024-05-17 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847348#comment-17847348
 ] 

Muralidhar Basani commented on KAFKA-16333:
---

Agree. thanks.

> Removed Deprecated methods KTable#join
> --
>
> Key: KAFKA-16333
> URL: https://issues.apache.org/jira/browse/KAFKA-16333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> KTable#join() methods taking a `Named` parameter got deprecated in 3.1 
> release via https://issues.apache.org/jira/browse/KAFKA-13813 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-17 Thread via GitHub


lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2117763853

   Hey @cadonna, the tricky bit is that, for some events, the request managers 
do expire requests too, so in this flow you described:
   
   > The event is processed in the ApplicationEventHandler and a request is 
added to the commit request manager. **Then the commit request manager is 
polled**, the requests are added to the network client and the the network 
client is polled
   
   When the manager is polled, if the event had timeout 0, it will be 
expired/cancelled before making it to the network thread. Currently we have 2 
managers that do this (that I can remember): 
[TopicMetadataManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L86)
 and 
[CommitRequestManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1168).
 So for those events, even with this PR, if they have timeout 0, they won't 
have a chance to complete.
   
   My point is not to bring more changes into this PR, only to have the whole 
situation in mind so we can address it properly (with multiple PRs). This other 
[PR](https://github.com/apache/kafka/pull/15844) attempts to address this 
situation I described, but only in the `CommitRequestManager` for instance. We 
still have to align on the approach there, and also handle it in the 
`TopicMetadataManager` I would say. I would expect that a combination of this 
PR and those others would allow us to get to a better point (now, even with 
this PR, we cannot make basic progress with a consumer being continuously 
polled with timeout 0 because `FetchCommittedOffsets` is always expired by the 
manager, for instance). I can easily repro it with the following integration 
test (that I was surprised we have not covered, because TestUtils always polls 
with a non-zero timeout)
   
   
   ```
 // Ensure TestUtils polls with ZERO. This fails for the new consumer only.
 @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
 @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
 def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, 
groupProtocol: String): Unit = {
   val numMessages = 100
   val producer = createProducer()
   sendRecords(producer, numMessages, tp)
   
   val consumer = createConsumer()
   consumer.subscribe(Set(topic).asJava)
   val records = awaitNonEmptyRecords(consumer, tp)
   assertEquals(numMessages, records.count())
 }
   ```
   
   
   Makes sense?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-17 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2117739675

   @lianetm & @philipnee PTAL
   
   `testSubscriptionOnInvalidTopic` is still disabled for new consumer.
   I couldn't make the changes because `MockMetadataUpdater` is being used 
internally instead of `MetadataUpdater`.
   So I only added unittest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-17 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847334#comment-17847334
 ] 

Lucas Brutschy edited comment on KAFKA-12679 at 5/17/24 2:26 PM:
-

Hey [~stoeckmk]. It is true that the state updater was not enabled in 3.7 in 
the end, so there is no back-off when locking the state directory. We missed 
updating the fix version on this ticket. I updated it now. 

Hey [~coltmcnealy-lh]. The change I implemented was to back-off in case of a 
lock error, but essentially retry. You should see something like `"Encountered 
lock exception. Reattempting locking the state in the next iteration.` If you 
are completely stuck and the application makes zero progress, this does not 
seem like it would be solved by a back-off, and somehow that seems to describe 
a different problem, especially if it also happens with state updater enabled. 
We may want to create a separate ticket for this, since the problem described 
in this ticket should resolve itself once the old thread releases the lock on 
the state directory.


was (Author: JIRAUSER302322):
Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 
in the end, so there is no back-off when locking the state directory. The 
change I implemented was to back-off in case of a lock error, to not end up in 
a busy loop trying to acquire the lock. If you are completely stuck and the 
application makes zero progress, this does not seem like it would be solved by 
a back-off, and somehow that seems to describe a different problem, especially 
if it also happens with state updater enabled. 

We may want to create a separate ticket for this, since the problem described 
in this ticket should resolve itself once the old thread releases the lock on 
the state directory.

Either way, I'll update the fix version here.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-17 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847334#comment-17847334
 ] 

Lucas Brutschy edited comment on KAFKA-12679 at 5/17/24 2:20 PM:
-

Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 
in the end, so there is no back-off when locking the state directory. The 
change I implemented was to back-off in case of a lock error, to not end up in 
a busy loop trying to acquire the lock. If you are completely stuck and the 
application makes zero progress, this does not seem like it would be solved by 
a back-off, and somehow that seems to describe a different problem, especially 
if it also happens with state updater enabled. 

We may want to create a separate ticket for this, since the problem described 
in this ticket should resolve itself once the old thread releases the lock on 
the state directory.

Either way, I'll update the fix version here.


was (Author: JIRAUSER302322):
Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 
in the end, so there is no back-off when locking the state directory. The 
change I implemented was to back-off in case of a lock error, to not end up in 
a busy loop trying to acquire the lock. If you are completely stuck and the 
application makes zero progress, this does not seem like it would be solved by 
a back-off, and somehow that seems to describe a different problem, especially 
if it also happens with state updater enabled. 

We may want to create a separate ticket for this.Either way, I'll update the 
fix version here.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-12399: Deprecate KafkaLog4jAppender [kafka]

2024-05-17 Thread via GitHub


mimaison commented on PR #15985:
URL: https://github.com/apache/kafka/pull/15985#issuecomment-2117717230

   @viktorsomogyi IIRC you volunteered to do the migration work to log4j2 a 
while back. Are you still able/willing to do so for Kafka 4.0?
   
   @jlprat As there's a chance 3.8 will be the last 3.X release, if we want to 
migrate to log4j2 in Kafka 4.0, we need to deprecate our log4j appender in 
3.8.0. The KIP 
([KIP-719](https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Deprecate+Log4J+Appender))
 was voted years ago, so we just need to merge this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-17 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-12679:
---
Fix Version/s: 3.8.0
   (was: 3.7.0)

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-17 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847334#comment-17847334
 ] 

Lucas Brutschy commented on KAFKA-12679:


Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 
in the end, so there is no back-off when locking the state directory. The 
change I implemented was to back-off in case of a lock error, to not end up in 
a busy loop trying to acquire the lock. If you are completely stuck and the 
application makes zero progress, this does not seem like it would be solved by 
a back-off, and somehow that seems to describe a different problem, especially 
if it also happens with state updater enabled. 

We may want to create a separate ticket for this.Either way, I'll update the 
fix version here.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16762) SyncGroup API for upgrading ConsumerGroup

2024-05-17 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16762.
-
Fix Version/s: 3.8.0
 Reviewer: David Jacot
   Resolution: Fixed

> SyncGroup API for upgrading ConsumerGroup
> -
>
> Key: KAFKA-16762
> URL: https://issues.apache.org/jira/browse/KAFKA-16762
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-12399: Deprecate KafkaLog4jAppender [kafka]

2024-05-17 Thread via GitHub


mimaison opened a new pull request, #15985:
URL: https://github.com/apache/kafka/pull/15985

   As per KIP-719, KafkaLog4jAppender is now deprecated and will be removed in 
Kafka 4.0. Users should migrate to the log4j2 Kafka Appender
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-17 Thread via GitHub


dajac merged PR #15954:
URL: https://github.com/apache/kafka/pull/15954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16784) Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new test infra

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16784:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new 
> test infra
> -
>
> Key: KAFKA-16784
> URL: https://issues.apache.org/jira/browse/KAFKA-16784
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: storage_test
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16789: Fix thread leak detection for event handler threads [kafka]

2024-05-17 Thread via GitHub


gaurav-narula opened a new pull request, #15984:
URL: https://github.com/apache/kafka/pull/15984

   Updates `TestUtils::verifyNoUnexpectedThreads` to check for all event queue 
threads instead of the incorrect QuorumController thread.
   
   Also updated `KafkaEventQueueTest` to use try-with-resources to ensure 
threads get closed and to use LoggerContext with the current test name as the 
logPrefix for easier debugging.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15746: KRaft support in ControllerMutationQuotaTest [kafka]

2024-05-17 Thread via GitHub


mimaison commented on PR #15038:
URL: https://github.com/apache/kafka/pull/15038#issuecomment-2117661783

   @linzihao1999 Can you follow up this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16675: Refactored and new rebalance callbacks integration tests [kafka]

2024-05-17 Thread via GitHub


lianetm commented on PR #15965:
URL: https://github.com/apache/kafka/pull/15965#issuecomment-2117652874

   Thanks for the helpful comment @lucasbru , addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16675: Refactored and new rebalance callbacks integration tests [kafka]

2024-05-17 Thread via GitHub


lianetm commented on code in PR #15965:
URL: https://github.com/apache/kafka/pull/15965#discussion_r1605041571


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -84,29 +84,87 @@ class PlaintextConsumerCallbackTest extends 
AbstractConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked(quorum: 
String, groupProtocol: String): Unit = {
-val tp = new TopicPartition(topic, 0);
+val tp = new TopicPartition(topic, 0)
 triggerOnPartitionsRevoked { (consumer, _) =>
   val map = consumer.beginningOffsets(Collections.singletonList(tp))
   assertTrue(map.containsKey(tp))
   assertEquals(0, map.get(tp))
 }
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def 
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(quorum: 
String, groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0)
+triggerOnPartitionsAssigned { (consumer, _) => assertDoesNotThrow(() => 
consumer.position(tp)) }
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def 
testSeekPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(quorum: 
String, groupProtocol: String): Unit = {
+val consumer = createConsumer()
+val startingOffset = 100L
+val totalRecords = 120L
+
+val producer = createProducer()
+val startingTimestamp = 0
+sendRecords(producer, totalRecords.toInt, tp, startingTimestamp)
+
+consumer.subscribe(asList(topic), new ConsumerRebalanceListener {
+  override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+consumer.seek(tp, startingOffset)
+  }
+
+  override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+// noop
+  }
+})
+consumeAndVerifyRecords(consumer, numRecords = (totalRecords - 
startingOffset).toInt,
+  startingOffset = startingOffset.toInt, startingKeyAndValueIndex = 
startingOffset.toInt,
+  startingTimestamp = startingOffset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPauseOnPartitionsAssignedCallback(quorum: String, groupProtocol: 
String): Unit = {
+val consumer = createConsumer()
+val totalRecords = 100L
+val partitionsAssigned = new AtomicBoolean(false)
+
+val producer = createProducer()
+val startingTimestamp = 0
+sendRecords(producer, totalRecords.toInt, tp, startingTimestamp)
+
+consumer.subscribe(asList(topic), new ConsumerRebalanceListener {

Review Comment:
   I couldn't initially mainly because of the close and poll in the helper, 
that played against what I needed to test in these 2, but then I myself removed 
the close and forgot to try again :). So I did integrate the helper here now, 
with a minor twist  to pass the consumer as param. Also it allowed me to 
simplify the seek/pause test in one, given that we do need to pause to properly 
check the seek behaviour, so removed the extra test for pause. Good catch, 
thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27 With apache kerby [kafka]

2024-05-17 Thread via GitHub


mimaison commented on PR #15277:
URL: https://github.com/apache/kafka/pull/15277#issuecomment-2117650240

   @highluck Can you address the pending comments and rebase on trunk to 
resolve the conflicts? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-17 Thread via GitHub


cadonna commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1605020116


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+
+import java.util.Optional;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {

Review Comment:
   I would still prefer `KStreamKStreamJoinLeftSide`. It is the left side of a 
stream-stream join. The processor are also named 
`KStreamKStreamJoinLeftProcessor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-17 Thread via GitHub


cadonna commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1604937635


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -187,17 +157,25 @@ public void process(final Record record) {
 
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
 } else {
 sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-outerJoinStore.ifPresent(store -> store.put(
-TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-LeftOrRightValue.make(isLeftSide, 
record.value(;
+putInOuterJoinStore(record);
 }
 }
 }
 }
 
+protected abstract TimestampedKeyAndJoinSide makeThisKey(final K 
key, final long inputRecordTimestamp);
+
+protected abstract LeftOrRightValue makeThisValue(final 
VThis thisValue);
+
+protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K 
key, final long timestamp);
+
+protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
+
+protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
+
 private void emitNonJoinedOuterRecords(
-final KeyValueStore, 
LeftOrRightValue> store,
-final Record record) {
+final KeyValueStore, 
LeftOrRightValue> store,
+final Record record) {

Review Comment:
   nit:
   ```java
   private void emitNonJoinedOuterRecords(final 
KeyValueStore, LeftOrRightValue> 
store, 
  final Record record) {
   
   ```



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -275,20 +246,22 @@ private void emitNonJoinedOuterRecords(
 }
 }
 
-@SuppressWarnings("unchecked")
-private VOut getNullJoinedValue(
-final K key, 
-final LeftOrRightValue leftOrRightValue) {
-// depending on the JoinSide fill in the joiner key and joiner 
values
-if (isLeftSide) {
-return joiner.apply(key,
-leftOrRightValue.getLeftValue(),
-leftOrRightValue.getRightValue());
-} else {
-return joiner.apply(key,
-(V1) leftOrRightValue.getRightValue(),
-(V2) leftOrRightValue.getLeftValue());
-}
+private void forwardNonJoinedOuterRecords(final Record record, 
final KeyValue, ? extends 
LeftOrRightValue> nextKeyValue) {

Review Comment:
   I think it would be simpler to define this method as:
   ```java
   private void forwardNonJoinedOuterRecords(final Record record,
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide,
 final LeftOrRightValue leftOrRightValue) {
   ```
   It makes the code a bit simpler and shorter.
   Also here, why not `Record record`. That is exactly the type used 
at the call site.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -187,17 +157,25 @@ public void process(final Record record) {
 
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
 } else {
 sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-outerJoinStore.ifPresent(store -> store.put(
-TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-LeftOrRightValue.make(isLeftSide, 
record.value(;
+putInOuterJoinStore(record);
 }
 }
 }
 }
 
+protected abstract TimestampedKeyAndJoinSide makeThisKey(final K 
key, final long inputRecordTimestamp);
+
+protected abstract LeftOrRightValue makeThisValue(final 
VThis thisValue);
+
+protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K 
key, final long timestamp);
+
+protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
+
+protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
+
 private void emitNonJoinedOuterRecords(
-final KeyValueStore, 
LeftOrRightValue> store,
-final Record record) {
+final KeyValueStore, 
LeftOrRightValue> store,
+final Record record) {

Review Comment:
   Shouldn't that be
   ```java
   final Record record
   ```
   ?



##

[jira] [Created] (KAFKA-16789) Thread leak detection checks for incorrect QuorumController thread name

2024-05-17 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16789:
-

 Summary: Thread leak detection checks for incorrect 
QuorumController thread name
 Key: KAFKA-16789
 URL: https://issues.apache.org/jira/browse/KAFKA-16789
 Project: Kafka
  Issue Type: Test
Reporter: Gaurav Narula


[PR-11417|https://github.com/apache/kafka/pull/11417] introduced thread leak 
detection for QuromController event queue thread. Later, 
[PR-13390|https://github.com/apache/kafka/pull/13390] changed conventions 
around thread names used in Kraft. Unfortunately, the thread-leak detection bit 
wasn't updated in the latter PR.

We should update {{TestUtils::verifyNoUnexpectedThreads}} to instead check for 
event handler thread leaks by checking for the {{"event-handler"}} suffix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16789) Thread leak detection checks for incorrect QuorumController thread name

2024-05-17 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula reassigned KAFKA-16789:
-

Assignee: Gaurav Narula

> Thread leak detection checks for incorrect QuorumController thread name
> ---
>
> Key: KAFKA-16789
> URL: https://issues.apache.org/jira/browse/KAFKA-16789
> Project: Kafka
>  Issue Type: Test
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
>
> [PR-11417|https://github.com/apache/kafka/pull/11417] introduced thread leak 
> detection for QuromController event queue thread. Later, 
> [PR-13390|https://github.com/apache/kafka/pull/13390] changed conventions 
> around thread names used in Kraft. Unfortunately, the thread-leak detection 
> bit wasn't updated in the latter PR.
> We should update {{TestUtils::verifyNoUnexpectedThreads}} to instead check 
> for event handler thread leaks by checking for the {{"event-handler"}} suffix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-05-17 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847294#comment-17847294
 ] 

Muralidhar Basani commented on KAFKA-16448:
---

Thanks [~mjsax] 

[~lmunoz] if there any sub tasks, pls let me know. thanks.

> Add Kafka Streams exception handler for exceptions occuring during processing 
> (KIP-1033)
> 
>
> Key: KAFKA-16448
> URL: https://issues.apache.org/jira/browse/KAFKA-16448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Assignee: Loïc Greffier
>Priority: Minor
> Fix For: 3.8
>
>
> Jira to follow work on KIP:  [KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during 
> processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16784) Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new test infra

2024-05-17 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847272#comment-17847272
 ] 

PoAn Yang commented on KAFKA-16784:
---

Hi [~chia7712], I'm interested in this issue. May I take it? Thank you.

> Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to new 
> test infra
> -
>
> Key: KAFKA-16784
> URL: https://issues.apache.org/jira/browse/KAFKA-16784
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: storage_test
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]

2024-05-17 Thread via GitHub


lucasbru commented on PR #14216:
URL: https://github.com/apache/kafka/pull/14216#issuecomment-2117519600

   @mimaison -- Let me test this. Yes, I see that also the version of requests 
inside ducktape was reverted: 
https://github.com/confluentinc/ducktape/commit/29dcdbeb1a8048505ce1613eb9aad05bc6b044f0
 I think there were version conflicts around urllib that broke the system 
tests. Not sure why kafka depends on ducktape < 0.9.0. I can try to bump it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]

2024-05-17 Thread via GitHub


FrankYang0529 opened a new pull request, #15983:
URL: https://github.com/apache/kafka/pull/15983

   * Replace `TopicBasedRemoteLogMetadataManagerWrapperWithHarness` with 
`RemoteLogMetadataManagerTestUtils#builder` in `RemoteLogMetadataManagerTest`.
   * Use `ClusterTestExtention` for `RemoteLogMetadataManagerTest`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]

2024-05-17 Thread via GitHub


mimaison commented on PR #14216:
URL: https://github.com/apache/kafka/pull/14216#issuecomment-2117500715

   Thanks @lucasbru !
   I wonder if we also need to bump ducktape to 0.11.4 as it's seems previous 
versions depend on requests<2.31.0: 
https://ducktape.readthedocs.io/en/latest/changelog.html#section-1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]

2024-05-17 Thread via GitHub


lucasbru commented on PR #14216:
URL: https://github.com/apache/kafka/pull/14216#issuecomment-2117489283

   @mimaison I remember I had problems running the system tests with that 
version, but I don't think I intended merging the version change. Sorry for 
that, I'll test the newer version and bring the version bump back to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: use try-with resources in ClientMetricsManagerTest [kafka]

2024-05-17 Thread via GitHub


gaurav-narula opened a new pull request, #15982:
URL: https://github.com/apache/kafka/pull/15982

   Ensures we close resources even if an exception is thrown within a test. 
Also added an `@AfterAll` to ensure threads aren't leaked from the test class.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2117312634

   Thanks @gharris1727 for another round of review. I have made the changes. 
Please review when you have some time. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604768491


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +268,20 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+Stage currentStage = listener.onPollTimeoutExpiry();
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +

Review Comment:
   Got it, yeah I changed the message accordingly. I tried to keep the logline 
similar to the one in consumer's poll timeout but i think it's fine to deviate 
in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604767686


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception {
 );
 }
 
+@Test
+public void testPollTimeoutExpiry() throws Exception {
+// This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+// task#stop method which is blocked. The timeouts have been set 
accordingly
+workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(20)));
+workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(40)));
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+
+connect.start();
+
+connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not 
brought up in time");
+
+Map connectorWithBlockingTaskStopConfig = new 
HashMap<>();
+connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getName());
+connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
+
connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG,
 Objects.requireNonNull(TASK_STOP));
+
+connect.configureConnector(CONNECTOR_NAME, 
connectorWithBlockingTaskStopConfig);
+
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+);
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+connect.restartTask(CONNECTOR_NAME, 0);
+TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&

Review Comment:
   Yeah I wanted to add the test in `BlockingConnectorTest` itself but it would 
have meant a lot of changes in that class. That is because currently that test 
doesn't support setting worker level properties or changing the number of 
workers. Being able to change the worker level properties was the way I could 
get the poll timeout expiry. 
   Moreover, the test I have added doesn't really block for the entire stop 
method but ends almost after the task shutdown graceful ms period ends because 
of the reset at the end of the test. Let me know if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604759527


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java:
##
@@ -36,4 +37,15 @@ public interface WorkerRebalanceListener {
  * or tasks might refer to all or some of the connectors and tasks running 
on the worker.
  */
 void onRevoked(String leader, Collection connectors, 
Collection tasks);
+
+
+/**
+ * Invoked when a worker experiences a poll timeout expiry. Invoking this 
method allows getting
+ * the stage which was currently being executed when the poll timeout 
happened. The default implementation
+ * returns null
+ * @return The current stage being executed. Could be null
+ */
+default Stage onPollTimeoutExpiry() {

Review Comment:
   I added the default bit to avoid changing the tests. Turns out it should be 
ok to modify them so I removed the default bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604757990


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection 
connectors, Collection

[jira] [Created] (KAFKA-16788) Resource leakage due to absence of close() call on connector start failure

2024-05-17 Thread Ashok (Jira)
Ashok created KAFKA-16788:
-

 Summary: Resource leakage due to absence of close() call on 
connector start failure
 Key: KAFKA-16788
 URL: https://issues.apache.org/jira/browse/KAFKA-16788
 Project: Kafka
  Issue Type: Bug
  Components: connect
Reporter: Ashok
Assignee: Ashok


We have identified a potential issue in the WorkerConnector class of the 
Connect framework. Specifically, the close() method is not being called on the 
connector when the connector fails to start due to various reasons. This 
omission prevents the connector from releasing any resources that were created 
or started as part of the start() method. As a result, these resources remain 
allocated even after the connector has failed to start, leading to resource 
leakage.  

To address this issue, we propose modifying the WorkerConnector class to ensure 
that the close() method is called on the connector whenever the connector fails 
to start. This change will allow the connector to properly release its 
resources, preventing resource leakage.

*Steps to Reproduce:*  
 # Initiate a connector that creates or allocates resources (for instance, 
threads) during the execution of the start() method.
 # Generate a problem that, during the start() process, either triggers an 
exception or invokes the raiseError(Exception e) method of the 
WorkerConnectorContext.
 # Notice that the close() method is not invoked on the connector, resulting in 
resource leakage, as the stop() method is where the resources are typically 
closed.

In our scenario, the issue was related to threads not being properly closed. 
These threads were initiated as part of the start() method in the connector.

*Expected Result:*  

When a connector fails to start, the close() method should be called to allow 
the connector to release its resources.  

*Actual Result:*  

The close() method is not called when a connector fails to start, leading to 
resource leakage. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >