Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15151:
URL: https://github.com/apache/kafka/pull/15151#issuecomment-1897988473

   Only `streams.streams_standby_replica_test` failed -- we know that it's 
broken and have already a PR to fix it: 
https://github.com/apache/kafka/pull/15217
   
   Not sure why Jenkins did fail though -- retriggered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12359: Update Jetty to 11 [kafka]

2024-01-18 Thread via GitHub


tommyk-gears commented on PR #10176:
URL: https://github.com/apache/kafka/pull/10176#issuecomment-1897990630

   Any update on this? Jetty 9.4 will not reach EOL until the last Webtide 
customer has migrated away from it (which may take forever more or less). It 
did however reach "End of community support" back in June 2022. Even Jetty 11 
that this PR is migrating _to_ has already reached "End of community support".
   My gut feeling is that Jakarta EE 9+ has gained some good traction over last 
years (at least in the echo system at our company kafka-connect is the only 
thing still holding on to Java EE 8 afaik).
   
   References:
   Jetty 9.4 EOCS: https://github.com/jetty/jetty.project/issues/7958
   Jetty 11 EOCS: https://github.com/jetty/jetty.project/issues/10485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15197:
URL: https://github.com/apache/kafka/pull/15197#issuecomment-1897998713

   @pprovenzano @cmccabe -- it seems this PR did break `trunk` build.
   
   ```
   > Task :core:compileTestScala
   [Error] 
/Users/matthias/IdeaProjects/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:6178:23:
 value latest is not a member of object 
org.apache.kafka.server.common.MetadataVersion
   one error found
   ```
   
   Can you take a look and help to fix it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15210:
URL: https://github.com/apache/kafka/pull/15210#discussion_r1457081393


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1547,6 +1553,29 @@ public String toString() {
 }
 }
 
+private class ConsumerCoordinatorMetrics {

Review Comment:
   This is to restore the original private class as it doesn't make a lot of 
sense to reuse this in the new consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-01-18 Thread via GitHub


lucasbru commented on code in PR #15219:
URL: https://github.com/apache/kafka/pull/15219#discussion_r1457110433


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -146,7 +146,7 @@ public static  QueryResult handleBasicQueries(
 "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
 );
 }
-result.setPosition(position);
+result.setPosition(position.copy());

Review Comment:
   Nice find! But I'm not sure, whether this thread safe now? It seems I'd have 
to put a lock somewhere to make sure `position` isn't modified before I reach 
this point. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]

2024-01-18 Thread via GitHub


dajac opened a new pull request, #15221:
URL: https://github.com/apache/kafka/pull/15221

   This is the last patch to complete the implementation of the transactional 
offsets. This patch updates the following paths:
   * delete offsets - the patch ensures that a tombstone is written for pending 
transactional offsets too.
   * delete all offsets - the patch ensures that all pending transactional 
offsets are deleted too.
   * expire offsets - the patch ensures that an offset for a partition is not 
expire is there is a pending transaction.
   * replay offset record - the patch ensures that all pending transactional 
offsets are removed when a tombstone is received.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-4759 Add support for IPv4 and IPv6 ranges in AclAuthorizer [kafka]

2024-01-18 Thread via GitHub


flyingcougar commented on PR #9937:
URL: https://github.com/apache/kafka/pull/9937#issuecomment-1898087626

   Seems that failures are not connected to the PR - similar ones are no trunk 
as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on PR #14856:
URL: https://github.com/apache/kafka/pull/14856#issuecomment-1898096530

   @mimaison CI seems OK. I addressed all your comments. Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1457158606


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -452,19 +453,36 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+Set statesFilter,
+Set typesFilter,
+long committedOffset
+) {
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+
+// The type check is case-insensitive.
+boolean typeCheck = typesFilter.isEmpty() ||
+typesFilter.stream()
+.map(String::toLowerCase)

Review Comment:
   It would be better to do this outside of the predicate to avoid doing it for 
every group.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9616,47 +9618,76 @@ public void testListGroups() {
 .build()));
 context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
 
+// Test list group response without a group state or group type filter.
 Map actualAllGroupMap =
-context.sendListGroups(Collections.emptyList())
-
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+context.sendListGroups(Collections.emptyList(), 
Collections.emptyList()).stream()
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
 Map expectAllGroupMap =
 Stream.of(
 new ListGroupsResponseData.ListedGroup()
 .setGroupId(classicGroup.groupId())
-.setProtocolType(classicGroupType)
-.setGroupState(EMPTY.toString()),
+.setProtocolType("classic")
+.setGroupState(EMPTY.toString())
+.setGroupType(Group.GroupType.CLASSIC.toString()),
 new ListGroupsResponseData.ListedGroup()
 .setGroupId(consumerGroupId)
 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
 
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
+.setGroupType(Group.GroupType.CONSUMER.toString())
 
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 
 assertEquals(expectAllGroupMap, actualAllGroupMap);
 
 context.commit();
-actualAllGroupMap = 
context.sendListGroups(Collections.emptyList()).stream()
+
+// Test list group response to check assigning state in the consumer 
group.
+actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("assigning"), 
Collections.emptyList()).stream()
 
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 expectAllGroupMap =
 Stream.of(
-new ListGroupsResponseData.ListedGroup()
-.setGroupId(classicGroup.groupId())
-.setProtocolType(classicGroupType)
-.setGroupState(EMPTY.toString()),
 new ListGroupsResponseData.ListedGroup()
 .setGroupId(consumerGroupId)
 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
 
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
+.setGroupType(Group.GroupType.CONSUMER.toString())
 
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 
 assertEquals(expectAllGroupMap, actualAllGroupMap);
 
-actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("Empty")).stream()
+// Test list group response with group state filter and no group type 
filter.
+actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("Empty"), 
Collect

[PR] MINOR: Fix compilation issue in ReplicaManagerTest [kafka]

2024-01-18 Thread via GitHub


mimaison opened a new pull request, #15222:
URL: https://github.com/apache/kafka/pull/15222

   Fixes the compilation issue on trunk:
   ```
   > Task :core:compileTestScala
   [Error] 
/Users/mickael/github/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:6178:23:
 value latest is not a member of object 
org.apache.kafka.server.common.MetadataVersion
   one error found
   
   > Task :core:compileTestScala FAILED
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix compilation issue in ReplicaManagerTest [kafka]

2024-01-18 Thread via GitHub


mimaison commented on PR #15222:
URL: https://github.com/apache/kafka/pull/15222#issuecomment-1898143029

   @showuon @dajac @divijvaidya Can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15712: Added KRaft support to MultipleListenersWithSameSecurityProtocolBaseTest [kafka]

2024-01-18 Thread via GitHub


smjn opened a new pull request, #15223:
URL: https://github.com/apache/kafka/pull/15223

   ### What
   * Added support for KRaft mode in 
MultipleListenersWithSameSecurityProtocolBaseTest.
   * Fixed minor issue in `ReplicaManagerTest` where incorrect non-existent 
method was being called from `MetadataVersion.latest` -> 
`MetadataVersion.latestTesting`
   
   ### Why
   * MultipleListenersWithSameSecurityProtocolBaseTest currently only supports 
zk mode.
   * To add support for KRaft mode, new advertised listener configs and 
additional mechanism to create topics was needed, which is addressed in the PR.
   
   ### Testing
   * Ran affected unit tests
   ```
   ➜  ak git:(smjn-KAFKA-15712) ./gradlew :core:test --tests 
kafka.server.MultipleListenersWithDefaultJaasContextTest
  
   > Configure project :
   Starting build with version 3.8.0-SNAPSHOT (commit id 3c4da20e) using Gradle 
8.5, Java 17 and Scala 2.13.12
   Build properties: maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:test
   
   Gradle Test Run :core:test > Gradle Test Executor 9 > 
MultipleListenersWithDefaultJaasContextTest > testProduceConsume(String) > 
testProduceConsume(String).quorum=zk PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 9 > 
MultipleListenersWithDefaultJaasContextTest > testProduceConsume(String) > 
testProduceConsume(String).quorum=kraft PASSED
   ...
   
   BUILD SUCCESSFUL in 38s
   ```
   
   ```
   ➜  ak git:(smjn-KAFKA-15712) ./gradlew :core:test --tests 
kafka.server.MultipleListenersWithAdditionalJaasContextTest 

   
   > Configure project :
   Starting build with version 3.8.0-SNAPSHOT (commit id 3c4da20e) using Gradle 
8.5, Java 17 and Scala 2.13.12
   Build properties: maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:test
   
   Gradle Test Run :core:test > Gradle Test Executor 10 > 
MultipleListenersWithAdditionalJaasContextTest > testProduceConsume(String) > 
testProduceConsume(String).quorum=zk PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 10 > 
MultipleListenersWithAdditionalJaasContextTest > testProduceConsume(String) > 
testProduceConsume(String).quorum=kraft PASSED
   ...
   
   BUILD SUCCESSFUL in 36s
   59 actionable tasks: 1 executed, 58 up-to-date
   ```
   * clean build
   ```
   ➜  ak git:(smjn-KAFKA-15712) ./gradlew clean jar 

  [15:17:10]
   
   > Configure project :
   Starting build with version 3.8.0-SNAPSHOT (commit id 3c4da20e) using Gradle 
8.5, Java 17 and Scala 2.13.12
   Build properties: maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileScala
   [Warn] /Users/smahajan/.gradle/workers/warning:[options] bootstrap class 
path not set in conjunction with -source 8
   
/Users/smahajan/ws/ak/core/src/main/java/kafka/log/remote/RemoteLogManager.java:235:
  [removal] AccessController in java.security has been deprecated and marked 
for removal
   [Warn] 
/Users/smahajan/ws/ak/core/src/main/java/kafka/log/remote/RemoteLogManager.java:257:
  [removal] AccessController in java.security has been deprecated and marked 
for removal
   
   > Task :core:compileTestScala
   Unexpected javac output: warning: [options] bootstrap class path not set in 
conjunction with -source 8
   Note: 
/Users/smahajan/ws/ak/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
 uses or overrides a deprecated API.
   Note: Recompile with -Xlint:deprecation for details.
   Note: Some input files use unchecked or unsafe operations.
   Note: Recompile with -Xlint:unchecked for details.
   1 warning.
   
   ...
   
   BUILD SUCCESSFUL in 1m 32s
   199 actionable tasks: 190 executed, 9 up-to-date
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-18 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808114#comment-17808114
 ] 

Gaurav Narula commented on KAFKA-16157:
---

[~pprovenzano] I was also able to reproduce this in a cluster with 1 broker.

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Priority: Blocker
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104,103,101       Isr: 102,104,103
> {code}
>  
>  
> In this example, it is the leader of partitions `0, 6 and 8`.
>  
> Consider `foo.test-8`. It is present in the following brokers/disks:
>  
>  
> {code:java}
> $ fd foo.test-8
> broker-1/d1/foo.test-8/
> broker-2/d2/foo.test-8/
> broker-3/d2/foo.test-8/
> broker-4/d1/foo.test-8/{code}
>  
>  
> `broker-1/d1` still refers to the topic id which is pending deletion because 
> the log dir is marked offline.
>  
>  
> {code:java}
> $ cat broker-1/d1/foo.test-8/partition.metadata
> version: 0
> topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
>  
>  
> However, other brokers have the correct topic-id
>  
>  
> {code:java}
> $ cat broker-2/d2/foo.test-8/partition.metadata
> version: 0
> topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
>  
>  
> Now, let's consider `foo.test-0`. We observe that the replica isn't present 
> in `broker-1`:
> {code:java}
> $ fd foo.test-0
> broker-2/d1/foo.test-0/
> broker-3/d1/foo.test-0/
> broker-4/d2/foo.test-0/{code}
> In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
> partitions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix compilation issue in ReplicaManagerTest [kafka]

2024-01-18 Thread via GitHub


divijvaidya merged PR #15222:
URL: https://github.com/apache/kafka/pull/15222


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1457193686


##
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions states = Collections.emptySet();
 
+private Set groupTypes = Collections.emptySet();

Review Comment:
   nit: `types`?



##
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##
@@ -21,95 +21,111 @@
 import java.util.Optional;

Review Comment:
   This class is part of our public API therefore we cannot change the existing 
(e.g. removing constructors). Could you please revert all the unnecessary 
changes here and only add the new field and its related changes?



##
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions states = Collections.emptySet();
 
+private Set groupTypes = Collections.emptySet();
+
 /**
- * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by 
listConsumerGroups().
  * Otherwise, all groups are returned.
  * This operation is supported by brokers with version 2.6.0 or later.
  */
 public ListConsumerGroupsOptions inStates(Set states) {
-this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;
+return this;
+}
+
+/**
+ * If groupTypes is set, only groups of these groupTypes will be returned 
by listConsumerGroups().
+ * Otherwise, all groups are returned.
+ *
+ */
+public ListConsumerGroupsOptions inTypes(Set 
groupTypes) {
+this.groupTypes = (groupTypes == null || groupTypes.isEmpty()) ? 
Collections.emptySet() : groupTypes;

Review Comment:
   Should we make a copy of the types?



##
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions states = Collections.emptySet();
 
+private Set groupTypes = Collections.emptySet();
+
 /**
- * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by 
listConsumerGroups().
  * Otherwise, all groups are returned.
  * This operation is supported by brokers with version 2.6.0 or later.
  */
 public ListConsumerGroupsOptions inStates(Set states) {
-this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;
+return this;
+}
+
+/**
+ * If groupTypes is set, only groups of these groupTypes will be returned 
by listConsumerGroups().
+ * Otherwise, all groups are returned.
+ *

Review Comment:
   nit: This empty line could be removed.



##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 val service = getConsumerGroupService(cgcArgs)
 
 val expectedListing = Set(
-  new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-  new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+  new ConsumerGroupListing(simpleGroup, true)

Review Comment:
   Do we need all the changes in this test? It may be better to keep it as it 
was.



##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   Does it bring any value to test all the combinations in this case?



##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 val service = getConsumerGroupService(cgcArgs)
 
 val expectedListing = Set(
-  new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-  new ConsumerGroupListing(group, false, 
Optional.of(Cons

Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]

2024-01-18 Thread via GitHub


dajac merged PR #15017:
URL: https://github.com/apache/kafka/pull/15017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix compilation issue in ReplicaManagerTest [kafka]

2024-01-18 Thread via GitHub


mimaison commented on PR #15222:
URL: https://github.com/apache/kafka/pull/15222#issuecomment-1898214647

   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-18 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-16157:
---
Fix Version/s: 3.7.0

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104,103,101       Isr: 102,104,103
> {code}
>  
>  
> In this example, it is the leader of partitions `0, 6 and 8`.
>  
> Consider `foo.test-8`. It is present in the following brokers/disks:
>  
>  
> {code:java}
> $ fd foo.test-8
> broker-1/d1/foo.test-8/
> broker-2/d2/foo.test-8/
> broker-3/d2/foo.test-8/
> broker-4/d1/foo.test-8/{code}
>  
>  
> `broker-1/d1` still refers to the topic id which is pending deletion because 
> the log dir is marked offline.
>  
>  
> {code:java}
> $ cat broker-1/d1/foo.test-8/partition.metadata
> version: 0
> topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
>  
>  
> However, other brokers have the correct topic-id
>  
>  
> {code:java}
> $ cat broker-2/d2/foo.test-8/partition.metadata
> version: 0
> topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
>  
>  
> Now, let's consider `foo.test-0`. We observe that the replica isn't present 
> in `broker-1`:
> {code:java}
> $ fd foo.test-0
> broker-2/d1/foo.test-0/
> broker-3/d1/foo.test-0/
> broker-4/d2/foo.test-0/{code}
> In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
> partitions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-18 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808140#comment-17808140
 ] 

Mickael Maison commented on KAFKA-16157:


[~enether] I marked this as a blocker for 3.7. Feel free to reassign if you 
don't agree.

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104,103,101       Isr: 102,104,103
> {code}
>  
>  
> In this example, it is the leader of partitions `0, 6 and 8`.
>  
> Consider `foo.test-8`. It is present in the following brokers/disks:
>  
>  
> {code:java}
> $ fd foo.test-8
> broker-1/d1/foo.test-8/
> broker-2/d2/foo.test-8/
> broker-3/d2/foo.test-8/
> broker-4/d1/foo.test-8/{code}
>  
>  
> `broker-1/d1` still refers to the topic id which is pending deletion because 
> the log dir is marked offline.
>  
>  
> {code:java}
> $ cat broker-1/d1/foo.test-8/partition.metadata
> version: 0
> topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
>  
>  
> However, other brokers have the correct topic-id
>  
>  
> {code:java}
> $ cat broker-2/d2/foo.test-8/partition.metadata
> version: 0
> topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
>  
>  
> Now, let's consider `foo.test-0`. We observe that the replica isn't present 
> in `broker-1`:
> {code:java}
> $ fd foo.test-0
> broker-2/d1/foo.test-0/
> broker-3/d1/foo.test-0/
> broker-4/d2/foo.test-0/{code}
> In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
> partitions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-01-18 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-16160:
--

Assignee: Phuc Hong Tran

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-18 Thread via GitHub


lucasbru commented on code in PR #15210:
URL: https://github.com/apache/kafka/pull/15210#discussion_r1457244991


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -140,6 +141,7 @@ public class AsyncKafkaConsumerTest {
 private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
 private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
 private final LinkedBlockingQueue backgroundEventQueue = 
new LinkedBlockingQueue<>();
+private final Metrics metrics = new Metrics();

Review Comment:
   This is only accessed in one place, so not sure why you added it, when you 
are accessing the metrics using `consumer.metrics()` inside the test.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1547,6 +1553,29 @@ public String toString() {
 }
 }
 
+private class ConsumerCoordinatorMetrics {

Review Comment:
   Why not? Why did you create separate implementations



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -735,6 +743,23 @@ public void 
testOffsetCommitFailsWithStaleEpochAndRetriesWithNewEpoch() {
 assertEquals(memberId, reqData.memberId());
 }
 
+@Test
+public void testEnsureCommitSensorRecordsMetric() {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+Map offsets = 
Collections.singletonMap(
+new TopicPartition("topic", 1),
+new OffsetAndMetadata(0));
+commitRequestManager.addOffsetCommitRequest(offsets, Optional.empty(), 
true);
+NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
+assertEquals(1, res.unsentRequests.size());
+
res.unsentRequests.get(0).future().complete(mockOffsetCommitResponse("topic", 
1, (short) 1, Errors.NONE));
+assertNotNull(getMetric("commit-latency-avg"));

Review Comment:
   Could we please send two commits, mock the createTime and the receivedTime 
and then test the metric for concrete values? 
   
   Alternatively, we could test the metric individually, similar to 
`ConsumerCoordinatorTest.testMetrics`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -371,6 +392,24 @@ public NetworkClientDelegate.PollResult 
drainPendingOffsetCommitRequests() {
 return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
 }
 
+private Sensor addCommitSensor(Metrics metrics, String metricGrpPrefix) {
+System.out.println("hello");

Review Comment:
   good day sir



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-18 Thread via GitHub


mimaison commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1457296352


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1492,6 +1501,20 @@ public static  Set diff(final Supplier> 
constructor, final Set l
 return result;
 }
 
+/**
+ * @param set Source set.
+ * @param toRemove Elements to remove.
+ * @return {@code set} copy without {@code toRemove} elements.
+ * @param  Element type.
+ */
+@SuppressWarnings("unchecked")
+public static  Set minus(Set set, T...toRemove) {

Review Comment:
   Let's not add methods in the client module that are only used in tools. This 
can be moved to ToolsUtils.



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -594,6 +594,15 @@ public static  String join(T[] strs, String separator) {
 return join(Arrays.asList(strs), separator);
 }
 
+/**
+ * Create a string representation of a collection joined by ", ".
+ * @param collection The list of items
+ * @return The string representation.
+ */
+public static  String join(Collection collection) {

Review Comment:
   Do we really need this method? Why can't we call the existing `join()` 
method and pass the separator? If you want to keep it, since it's only used in 
tools let's move it to the tools module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15727: Added KRaft support in AlterUserScramCredentialsRequestNotAuthorizedTest [kafka]

2024-01-18 Thread via GitHub


adixitconfluent opened a new pull request, #15224:
URL: https://github.com/apache/kafka/pull/15224

   **About**
   This PR adds KRaft support to the following tests in 
`AlterUserScramCredentialsRequestNotAuthorizedTest` class -
   
   1. `testAlterNothingNotAuthorized`
   2. `testAlterSomethingNotAuthorized`
   
   reference - [KAFKA-15727](https://issues.apache.org/jira/browse/KAFKA-15727)
   
   **Testing**
   Adding screenhots confirming the tests are passing on CLI for 
`AlterUserScramCredentialsRequestNotAuthorizedTest`
   https://github.com/apache/kafka/assets/144765188/4f343d1e-6629-455a-b098-cd2486b68ce2";>
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]

2024-01-18 Thread via GitHub


appchemist opened a new pull request, #15225:
URL: https://github.com/apache/kafka/pull/15225

   This PR adds KRaft support in LeaderEpochIntegrationTest
   
   JIRA : https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15717
   
   Test Result
   ![Screen_Shot_2024-01-18_At_8 57 
23_Pm_1](https://github.com/apache/kafka/assets/1546031/b4dcfe88-11b0-40bc-91d1-784ac1428486)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16162:
-

 Summary: New created topics are unavailable after upgrading to 3.7
 Key: KAFKA-16162
 URL: https://issues.apache.org/jira/browse/KAFKA-16162
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen


In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
request will include the `LogDirs` fields with UUID for each log dir in each 
broker. This info will be stored in the controller and used to identify if the 
log dir is known and online while handling AssignReplicasToDirsRequest 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
 

While upgrading from old version, the kafka cluster will run in 3.7 binary with 
old metadata version, and then upgrade to newer version using 
kafka-features.sh. That means, while brokers startup and send the 
brokerRegistration request, it'll be using older metadata version without 
`LogDirs` fields included. And it makes the controller has no log dir info for 
all brokers. Later, after upgraded, if new topic is created, the flow will go 
like this:

1. Controller assign replicas and adds in metadata log
2. brokers fetch the metadata and apply it
3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica assignment, 
controller will think the log dir in current replica is offline, so triggering 
offline handler, and reassign replica, and offline, until no more replicas to 
assign, so assigning leader to -1 (i.e. no leader) 

So, the results will be that new created topics are unavailable (with no 
leader) because the controller thinks all log dir are offline.

{code:java}
lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
quickstart-events3 --bootstrap-server localhost:9092
  

Topic: quickstart-events3   TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
3   ReplicationFactor: 3Configs: segment.bytes=1073741824
Topic: quickstart-events3   Partition: 0Leader: none
Replicas: 7,2,6 Isr: 6
Topic: quickstart-events3   Partition: 1Leader: none
Replicas: 2,6,7 Isr: 6
Topic: quickstart-events3   Partition: 2Leader: none
Replicas: 6,7,2 Isr: 6
{code}

The log snippet in the controller :


{code:java}
# handling 1st assignReplicaToDirs request

[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] offline-dir-assignment: 
changing partition(s): quickstart-events3-0, quickstart-events3-2, 
quickstart-events3-1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[7K5JBERyyqFFxIXSXYluJA, AA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[7K5JBERyyqFFxIXSXYluJA, 
AA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[AA, 7K5JBERyyqFFxIXSXYluJA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=2, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[AA, 
7K5JBERyyqFFxIXSXYluJA, AA], eligibleLeaderReplicas=null, 
l

[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16162:
--
Description: 
In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
request will include the `LogDirs` fields with UUID for each log dir in each 
broker. This info will be stored in the controller and used to identify if the 
log dir is known and online while handling AssignReplicasToDirsRequest 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
 

While upgrading from old version, the kafka cluster will run in 3.7 binary with 
old metadata version, and then upgrade to newer version using 
kafka-features.sh. That means, while brokers startup and send the 
brokerRegistration request, it'll be using older metadata version without 
`LogDirs` fields included. And it makes the controller has no log dir info for 
all brokers. Later, after upgraded, if new topic is created, the flow will go 
like this:

1. Controller assign replicas and adds in metadata log
2. brokers fetch the metadata and apply it
3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica assignment, 
controller will think the log dir in current replica is offline, so triggering 
offline handler, and reassign leader to another replica, and offline, until no 
more replicas to assign, so assigning leader to -1 (i.e. no leader) 

So, the results will be that new created topics are unavailable (with no 
leader) because the controller thinks all log dir are offline.

{code:java}
lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
quickstart-events3 --bootstrap-server localhost:9092
  

Topic: quickstart-events3   TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
3   ReplicationFactor: 3Configs: segment.bytes=1073741824
Topic: quickstart-events3   Partition: 0Leader: none
Replicas: 7,2,6 Isr: 6
Topic: quickstart-events3   Partition: 1Leader: none
Replicas: 2,6,7 Isr: 6
Topic: quickstart-events3   Partition: 2Leader: none
Replicas: 6,7,2 Isr: 6
{code}

The log snippet in the controller :


{code:java}
# handling 1st assignReplicaToDirs request

[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] offline-dir-assignment: 
changing partition(s): quickstart-events3-0, quickstart-events3-2, 
quickstart-events3-1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[7K5JBERyyqFFxIXSXYluJA, AA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[7K5JBERyyqFFxIXSXYluJA, 
AA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[AA, 7K5JBERyyqFFxIXSXYluJA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=2, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[AA, 
7K5JBERyyqFFxIXSXYluJA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01

[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16162:
--
Description: 
In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
request will include the `LogDirs` fields with UUID for each log dir in each 
broker. This info will be stored in the controller and used to identify if the 
log dir is known and online while handling AssignReplicasToDirsRequest 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
 

While upgrading from old version, the kafka cluster will run in 3.7 binary with 
old metadata version, and then upgrade to newer version using 
kafka-features.sh. That means, while brokers startup and send the 
brokerRegistration request, it'll be using older metadata version without 
`LogDirs` fields included. And it makes the controller has no log dir info for 
all brokers. Later, after upgraded, if new topic is created, the flow will go 
like this:

1. Controller assign replicas and adds in metadata log
2. brokers fetch the metadata and apply it
3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica assignment, 
controller will think the log dir in current replica is offline, so triggering 
offline handler, and reassign leader to another replica, and offline, until no 
more replicas to assign, so assigning leader to -1 (i.e. no leader) 

So, the results will be that new created topics are unavailable (with no 
leader) because the controller thinks all log dir are offline.

{code:java}
lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
quickstart-events3 --bootstrap-server localhost:9092
  

Topic: quickstart-events3   TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
3   ReplicationFactor: 3Configs: segment.bytes=1073741824
Topic: quickstart-events3   Partition: 0Leader: none
Replicas: 7,2,6 Isr: 6
Topic: quickstart-events3   Partition: 1Leader: none
Replicas: 2,6,7 Isr: 6
Topic: quickstart-events3   Partition: 2Leader: none
Replicas: 6,7,2 Isr: 6
{code}

The log snippet in the controller :


{code:java}
# handling 1st assignReplicaToDirs request

[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] offline-dir-assignment: 
changing partition(s): quickstart-events3-0, quickstart-events3-2, 
quickstart-events3-1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[7K5JBERyyqFFxIXSXYluJA, AA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[7K5JBERyyqFFxIXSXYluJA, 
AA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[AA, 7K5JBERyyqFFxIXSXYluJA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=2, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[AA, 
7K5JBERyyqFFxIXSXYluJA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01

Re: [PR] KAFKA-4759 Add support for IPv4 and IPv6 ranges in AclAuthorizer [kafka]

2024-01-18 Thread via GitHub


rgo commented on PR #9937:
URL: https://github.com/apache/kafka/pull/9937#issuecomment-1898377886

   @flyingcougar yes, I also had (unrelated) issues with the test suite at the 
beginning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]

2024-01-18 Thread via GitHub


alikelleci commented on PR #14157:
URL: https://github.com/apache/kafka/pull/14157#issuecomment-1898388652

   This issue realy makes it impossible to use kafka streams to make our 
materialized views in an event sourced system. Adding a minimal change like a 
primitve type in our view model will break the join. I am realy hoping this PR 
will solve the issue and look forward for updates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16162:
--
Description: 
In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
request will include the `LogDirs` fields with UUID for each log dir in each 
broker. This info will be stored in the controller and used to identify if the 
log dir is known and online while handling AssignReplicasToDirsRequest 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
 

While upgrading from old version, the kafka cluster will run in 3.7 binary with 
old metadata version, and then upgrade to newer version using 
kafka-features.sh. That means, while brokers startup and send the 
brokerRegistration request, it'll be using older metadata version without 
`LogDirs` fields included. And it makes the controller has no log dir info for 
all brokers. Later, after upgraded, if new topic is created, the flow will go 
like this:

1. Controller assign replicas and adds in metadata log
2. brokers fetch the metadata and apply it
3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica assignment, 
controller will think the log dir in current replica is offline, so triggering 
offline handler, and reassign leader to another replica, and offline, until no 
more replicas to assign, so assigning leader to -1 (i.e. no leader) 

So, the results will be that new created topics are unavailable (with no 
leader) because the controller thinks all log dir are offline.

{code:java}
lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
quickstart-events3 --bootstrap-server localhost:9092
  

Topic: quickstart-events3   TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
3   ReplicationFactor: 3Configs: segment.bytes=1073741824
Topic: quickstart-events3   Partition: 0Leader: none
Replicas: 7,2,6 Isr: 6
Topic: quickstart-events3   Partition: 1Leader: none
Replicas: 2,6,7 Isr: 6
Topic: quickstart-events3   Partition: 2Leader: none
Replicas: 6,7,2 Isr: 6
{code}

The log snippet in the controller :


{code:java}
# handling 1st assignReplicaToDirs request

[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] offline-dir-assignment: 
changing partition(s): quickstart-events3-0, quickstart-events3-2, 
quickstart-events3-1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[7K5JBERyyqFFxIXSXYluJA, AA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[7K5JBERyyqFFxIXSXYluJA, 
AA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[AA, 7K5JBERyyqFFxIXSXYluJA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=2, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[AA, 
7K5JBERyyqFFxIXSXYluJA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01

[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16162:
--
Description: 
In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
request will include the `LogDirs` fields with UUID for each log dir in each 
broker. This info will be stored in the controller and used to identify if the 
log dir is known and online while handling AssignReplicasToDirsRequest 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
 

While upgrading from old version, the kafka cluster will run in 3.7 binary with 
old metadata version, and then upgrade to newer version using 
kafka-features.sh. That means, while brokers startup and send the 
brokerRegistration request, it'll be using older metadata version without 
`LogDirs` fields included. And it makes the controller has no log dir info for 
all brokers. Later, after upgraded, if new topic is created, the flow will go 
like this:

1. Controller assign replicas and adds in metadata log
2. brokers fetch the metadata and apply it
3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica assignment, 
controller will think the log dir in current replica is offline, so triggering 
offline handler, and reassign leader to another replica, and offline, until no 
more replicas to assign, so assigning leader to -1 (i.e. no leader) 

So, the results will be that new created topics are unavailable (with no 
leader) because the controller thinks all log dir are offline.

{code:java}
lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
quickstart-events3 --bootstrap-server localhost:9092
  

Topic: quickstart-events3   TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
3   ReplicationFactor: 3Configs: segment.bytes=1073741824
Topic: quickstart-events3   Partition: 0Leader: none
Replicas: 7,2,6 Isr: 6
Topic: quickstart-events3   Partition: 1Leader: none
Replicas: 2,6,7 Isr: 6
Topic: quickstart-events3   Partition: 2Leader: none
Replicas: 6,7,2 Isr: 6
{code}

The log snippet in the controller :


{code:java}
# handling 1st assignReplicaToDirs request

[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] offline-dir-assignment: 
changing partition(s): quickstart-events3-0, quickstart-events3-2, 
quickstart-events3-1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[7K5JBERyyqFFxIXSXYluJA, AA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[7K5JBERyyqFFxIXSXYluJA, 
AA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
[AA, AA, AA] -> 
[AA, 7K5JBERyyqFFxIXSXYluJA, AA], 
partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
change PartitionChangeRecord(partitionId=2, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
isr=null, leader=-2, replicas=null, removingReplicas=null, addingReplicas=null, 
leaderRecoveryState=-1, directories=[AA, 
7K5JBERyyqFFxIXSXYluJA, AA], eligibleLeaderReplicas=null, 
lastKnownELR=null) for topic quickstart-events3 
(org.apache.kafka.controller.ReplicationControlManager)
[2024-01

Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1457451148


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1134,9 +1134,22 @@ private CompletableFuture assignPartitions(
 // Make assignment effective on the client by updating the 
subscription state.
 updateSubscription(assignedPartitions, false);
 
+// Pause partitions to ensure that fetch does not start until the 
callback completes.
+assignedPartitions.forEach(tp -> 
subscriptions.pause(tp.topicPartition()));

Review Comment:
   Shouldn't we pause only the `addedPartitions`?
   
   Moreover, I wonder if using `pause` is the correct approach. Pausing the 
newly added partitions will ensure that the consumer does not fetch. However, 
for the newly added partitions, the consumer may also try to initialize their 
offsets. See `initWithCommittedOffsetsIfNeeded`. I think that we must prevent 
this too. Otherwise, there is a race between it and the assigned callback for 
initializing the offsets. Do you see what I mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457470738


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -92,30 +117,34 @@ public boolean equals(Object o) {
 
 OffsetAndMetadata that = (OffsetAndMetadata) o;
 
-if (offset != that.offset) return false;
+if (committedOffset != that.committedOffset) return false;
 if (commitTimestampMs != that.commitTimestampMs) return false;
-if (!leaderEpoch.equals(that.leaderEpoch)) return false;
-if (!metadata.equals(that.metadata)) return false;
-return expireTimestampMs.equals(that.expireTimestampMs);
+if (recordOffset != that.recordOffset) return false;
+if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;

Review Comment:
   IntelliJ auto-generated this one. We should ask it. :). I suppose that it 
uses Objects.equals for the reason you mentioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457471555


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -918,7 +920,7 @@ public void replay(
 groupId,
 topic,
 partition,
-OffsetAndMetadata.fromRecord(value)
+OffsetAndMetadata.fromRecord(offset, value)

Review Comment:
   yeah, this offset is the offset of the record, it is not the committed 
offset in the record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457474626


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
 pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
 topicOffsets.forEach((topicName, partitionOffsets) -> {
 partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-"with topic {}, partition {}, and offset {}.",
-producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-offsets.put(
+OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
 groupId,
 topicName,
-partitionId,
-offsetAndMetadata
+partitionId
 );
+
+// We always keep the most recent committed offset 
when we have a mix of transactional and regular
+// offset commits. Without preserving information of 
the commit record offset, compaction of the
+// __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+"with topic {} and partition {}.",
+offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+offsets.put(

Review Comment:
   yeah, that's right. the overall idea is to ensure that the last committed 
offset in the log is the one meterialized in memory. imagine the following 
sequence:
   1. committed transactional offset 100 as part of transaction X. it goes to 
the pending structure.
   2. committed regular offset 101. it goes to the main structure.
   3. transaction X is committed. putting 100 in the main structure is wrong 
here because the last one in the log is 101.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457475930


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -712,13 +712,14 @@ public void run() {
 try {
 // Apply the records to the state machine.
 if (result.replayRecords()) {
-result.records().forEach(record ->
+for (int i = 0; i < result.records().size(); 
i++) {
 context.coordinator.replay(
+prevLastWrittenOffset + i,

Review Comment:
   yes. the only way to speculate about what the offset will be in the log 
based on the last written offset. as the coordinator is the single writer to 
the log, it works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1457478495


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1098,30 +1054,36 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 return groupId != null && !groupId.isEmpty();
 }
 
-/**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
- */
-private static Errors normalizeException(Throwable exception) {
-exception = Errors.maybeUnwrapException(exception);
-
-if (exception instanceof UnknownTopicOrPartitionException ||
-exception instanceof NotEnoughReplicasException ||
-exception instanceof TimeoutException) {
-return Errors.COORDINATOR_NOT_AVAILABLE;
-}
-
-if (exception instanceof NotLeaderOrFollowerException ||
-exception instanceof KafkaStorageException) {
-return Errors.NOT_COORDINATOR;
-}
-
-if (exception instanceof RecordTooLargeException ||
-exception instanceof RecordBatchTooLargeException ||
-exception instanceof InvalidFetchSizeException) {
-return Errors.UNKNOWN_SERVER_ERROR;
+private  RSP handleOperationException(
+String requestName,
+REQ request,
+Throwable exception,
+BiFunction responseBuilder
+) {
+ApiError apiError = ApiError.fromThrowable(exception);
+
+switch (apiError.error()) {
+case UNKNOWN_SERVER_ERROR:
+log.error("{} request {} hit an unexpected exception: {}.",
+requestName, request, exception.getMessage(), exception);
+return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+case UNKNOWN_TOPIC_OR_PARTITION:
+case NOT_ENOUGH_REPLICAS:
+case REQUEST_TIMED_OUT:
+return responseBuilder.apply(Errors.COORDINATOR_NOT_AVAILABLE, 
null);
+
+case NOT_LEADER_OR_FOLLOWER:
+case KAFKA_STORAGE_ERROR:
+return responseBuilder.apply(Errors.NOT_COORDINATOR, null);
+
+case MESSAGE_TOO_LARGE:
+case RECORD_LIST_TOO_LARGE:
+case INVALID_FETCH_SIZE:
+return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+default:
+return responseBuilder.apply(apiError.error(), 
apiError.message());

Review Comment:
   if you look at GroupMetadataManager.consumerGroupHeartbeat, there are many 
cases where a non-default message is provided in order to give more information 
to the client. all those non-default messages will be set here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1457480228


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -861,19 +861,9 @@ private void maybeUpdatePartitionEpoch(
 ConsumerGroupMember oldMember,
 ConsumerGroupMember newMember
 ) {
-if (oldMember == null) {
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-} else {
-if 
(!oldMember.assignedPartitions().equals(newMember.assignedPartitions())) {
-removePartitionEpochs(oldMember.assignedPartitions());
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-}
-if 
(!oldMember.partitionsPendingRevocation().equals(newMember.partitionsPendingRevocation()))
 {
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-}
-}
+maybeRemovePartitionEpoch(oldMember);

Review Comment:
   Yeah, that's a great question and I don't really know to be honest. I 
suppose that I tried to avoid doing the operations if there partitions did not 
change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


dajac commented on PR #15212:
URL: https://github.com/apache/kafka/pull/15212#issuecomment-1898536868

   > to confirm, the previousMemberEpoch=11 should be 14 in the given example 
right?
   
   previousMemberEpoch=11 seems correct as the member transitioned from 11 to 
15.
   
   > this seems a bit scary. should we have some check that ensures the 
invariant?
   
   yeah, but i am not sure how. one option would be to strengthen the 
validation when `currentPartitionEpoch` is modified. let me see if i can do 
something.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


dajac commented on PR #15212:
URL: https://github.com/apache/kafka/pull/15212#issuecomment-1898623457

   @jeffkbkim i have strengthen the validation around updating the partition 
epochs in my last commit. with this, the state change that we saw would have 
been rejected. i think that this is better than silently accepting it. let me 
know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

2024-01-18 Thread via GitHub


C0urante commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1898635563

   Superseded by https://github.com/apache/kafka/pull/14567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

2024-01-18 Thread via GitHub


C0urante closed pull request #13905: KAFKA-13988: Fix MM2 not consuming from 
latest when "auto.offset.reset=latest" is set
URL: https://github.com/apache/kafka/pull/13905


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988:Fix mm2 auto.offset.reset=latest not working [kafka]

2024-01-18 Thread via GitHub


C0urante closed pull request #12358: KAFKA-13988:Fix mm2 
auto.offset.reset=latest not working
URL: https://github.com/apache/kafka/pull/12358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988:Fix mm2 auto.offset.reset=latest not working [kafka]

2024-01-18 Thread via GitHub


C0urante commented on PR #12358:
URL: https://github.com/apache/kafka/pull/12358#issuecomment-1898636336

   Superseded by https://github.com/apache/kafka/pull/14567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-18 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1457583523


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##
@@ -75,9 +77,9 @@ public class ErrorHandlingIntegrationTest {
 private static final String DLQ_TOPIC = "my-connector-errors";
 private static final String CONNECTOR_NAME = "error-conn";
 private static final String TASK_ID = "error-conn-0";
-private static final int NUM_RECORDS_PRODUCED = 20;
-private static final int EXPECTED_CORRECT_RECORDS = 19;
+private static final int NUM_RECORDS_PRODUCED = 1000;
 private static final int EXPECTED_INCORRECT_RECORDS = 1;
+private static final int EXPECTED_CORRECT_RECORDS = NUM_RECORDS_PRODUCED - 
EXPECTED_INCORRECT_RECORDS;

Review Comment:
   Great, thanks for leaving it better than you found it 🙌 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-18 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1457585290


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##
@@ -392,90 +397,29 @@ public void testSetConfigs() {
 }
 
 @Test
-public void testThreadSafety() throws Throwable {
-long runtimeMs = 5_000;
-int numThreads = 10;
-// Check that multiple threads using RetryWithToleranceOperator 
concurrently
-// can't corrupt the state of the ProcessingContext
-AtomicReference failed = new AtomicReference<>(null);
-RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-private final AtomicInteger count = new AtomicInteger();
-private final AtomicInteger attempt = new AtomicInteger();
-
-@Override
-public void error(Throwable error) {
-if (count.getAndIncrement() > 0) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-}
-super.error(error);
-}
-
-@Override
-public Future report() {
-if (count.getAndSet(0) > 1) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-}
-
-return super.report();
-}
-
-@Override
-public void currentContext(Stage stage, Class klass) {
-this.attempt.set(0);
-super.currentContext(stage, klass);
-}
-
-@Override
-public void attempt(int attempt) {
-if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-failed.compareAndSet(null, new AssertionError(
-"Concurrent call to attempt(): Attempts 
should increase monotonically " +
-"within the scope of a given 
currentContext()"));
-}
-super.attempt(attempt);
-}
-}, new CountDownLatch(1));
-
-ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-List> futures = IntStream.range(0, 
numThreads).boxed()
-.map(id ->
-pool.submit(() -> {
-long t0 = System.currentTimeMillis();
-long i = 0;
-while (true) {
-if (++i % 1 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-break;
-}
-if (failed.get() != null) {
-break;
-}
-try {
-if (id < numThreads / 2) {
-
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-SinkTask.class, 
consumerRecord, new Throwable()).get();
-} else {
-retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-SinkTask.class);
-}
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}
-}
-}))
-.collect(Collectors.toList());
-pool.shutdown();
-pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-futures.forEach(future -> {
-try {
-future.get();
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}

Review Comment:
   Phew, okay--the 50% figure is reasonable. Thanks for humoring me 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]

2024-01-18 Thread via GitHub


mimaison merged PR #15159:
URL: https://github.com/apache/kafka/pull/15159


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1457569421


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel,
   val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
 
   // call the replica manager to append messages to the replicas
-  replicaManager.appendRecords(
+  replicaManager.handleProduceAppend(
 timeout = produceRequest.timeout.toLong,
 requiredAcks = produceRequest.acks,
 internalTopicsAllowed = internalTopicsAllowed,
 origin = AppendOrigin.CLIENT,

Review Comment:
   nit: I wonder if we should remove the `origin` parameter from 
`handleProduceAppend` as it should always come from a client by definition.



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3041,31 +3007,57 @@ class ReplicaManagerTest {
   origin = origin,
   entriesPerPartition = Map(partition -> records),
   responseCallback = appendCallback,
-  transactionalId = transactionalId,
 )
 
 result
   }
 
-  private def appendRecordsToMultipleTopics(replicaManager: ReplicaManager,
-entriesToAppend: 
Map[TopicPartition, MemoryRecords],
-transactionalId: String,
-origin: AppendOrigin = 
AppendOrigin.CLIENT,
-requiredAcks: Short = -1): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
+  private def handleProduceAppendToMultipleTopics(replicaManager: 
ReplicaManager,
+  entriesToAppend: 
Map[TopicPartition, MemoryRecords],
+  transactionalId: String,
+  origin: AppendOrigin = 
AppendOrigin.CLIENT,
+  requiredAcks: Short = -1): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
 val result = new CallbackResult[Map[TopicPartition, PartitionResponse]]()
 def appendCallback(responses: Map[TopicPartition, PartitionResponse]): 
Unit = {
   responses.foreach( response => 
assertTrue(responses.get(response._1).isDefined))
   result.fire(responses)
 }
+  replicaManager.handleProduceAppend(

Review Comment:
   nit: Indentation seems off here.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig,
   requestLocal: RequestLocal,
   verificationErrors: Map[TopicPartition, Errors]
 ): Unit = {
-  // Map transaction coordinator errors to known errors for the response
-  val convertedErrors = verificationErrors.map { case (tp, error) =>
-error match {
-  case Errors.CONCURRENT_TRANSACTIONS |
-Errors.COORDINATOR_LOAD_IN_PROGRESS |
-Errors.COORDINATOR_NOT_AVAILABLE |
-Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
-  case _ => tp -> error
-}
-
-  }

Review Comment:
   For my understanding, we remove this here and we adds it back in 
handleProduceAppend and we rely on the conversion in the group coordinator. Did 
I get it right? In the group coordinator, we don't handle 
`CONCURRENT_TRANSACTIONS`, I think. I need to double check.



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2560,41 +2562,6 @@ class ReplicaManagerTest {
 }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
-  def testMaybeVerificationErrorConversions(error: Errors): Unit = {

Review Comment:
   Don't we need to keep this one as we still have those conversion but in a 
different place now?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -762,167 +763,124 @@ class ReplicaManager(val config: KafkaConfig,
 delayedProduceLock: Option[Lock] = None,
 recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
 requestLocal: RequestLocal = RequestLocal.NoCaching,
-transactionalId: String = null,
-actionQueue: ActionQueue = this.defaultActionQueue): Unit 
= {
-if (isValidRequiredAcks(requiredAcks)) {
-
-  val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = 
mutable.Map[TopicPartition, VerificationGuard]()
-  val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, 
errorsPerPartition) =
-if (transactionalId == null || 
!config.transactionPartitionVerificationEnable)
-  (entriesPerPartition, Map.empty

[PR] DO NOT MERGE: Isolate Connect OffsetsApiIntegrationTest [kafka]

2024-01-18 Thread via GitHub


C0urante opened a new pull request, #15226:
URL: https://github.com/apache/kafka/pull/15226

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]

2024-01-18 Thread via GitHub


C0urante commented on PR #14694:
URL: https://github.com/apache/kafka/pull/14694#issuecomment-1898692586

   @gharris1727 yes it is, will close.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]

2024-01-18 Thread via GitHub


C0urante closed pull request #14694: MINOR: Log a warning when connectors 
generate greater than tasks.max task configs
URL: https://github.com/apache/kafka/pull/14694


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] DRAFT: State updater with synchronous remove [kafka]

2024-01-18 Thread via GitHub


cadonna opened a new pull request, #15227:
URL: https://github.com/apache/kafka/pull/15227

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16159: MINOR - Removed debug log [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 opened a new pull request, #15228:
URL: https://github.com/apache/kafka/pull/15228

   Removed debug log as next time to update runs in poll loop and excessive 
logging happens.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-18 Thread via GitHub


lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1457609049


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1134,9 +1134,22 @@ private CompletableFuture assignPartitions(
 // Make assignment effective on the client by updating the 
subscription state.
 updateSubscription(assignedPartitions, false);
 
+// Pause partitions to ensure that fetch does not start until the 
callback completes.
+assignedPartitions.forEach(tp -> 
subscriptions.pause(tp.topicPartition()));

Review Comment:
   Hey, I totally get it, pause is only preventing fetching but we also need to 
prevent initializing the offsets as they could change during the ongoing 
callback, I missed that. I'm still thinking about this, but I see 2 approaches 
for now:
   1. Introducing a new state for the newly added partitions, with changes in 
the logic to avoid fetch and initialization based on this state (not on pause). 
Partitions would be in this state while the callback executes, and then move 
into INITIALIALIZING as if they had been just added. This one is tricky and 
needs more thinking, it would change several core parts around the subscription 
state that it is used by many components, so hard to even start to think about 
the impact. 
   2. Continue using the pause mechanism to prevent fetching (not re-inventing 
the wheel for that), and change the `initWithCommittedOffsetsIfNeeded` for the 
new consumer only, simply to make sure that it initializes positions only for 
partitions that are INITIALIZING and not paused (conceptually this new state we 
have in the new consumer while the callback runs). That would probably achieve 
what we want, changing only the new consumer, which I like, since this whole 
situation exists only in it, due to its multi-thread app/background. Still 
thinking about potential unwanted impact of skipping initializing positions for 
paused partitions that are INITIALIZING...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]

2024-01-18 Thread via GitHub


ex172000 commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457646378


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,6 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   Do we want to emit some metrics instead so users can still get a sense of 
the state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]

2024-01-18 Thread via GitHub


ex172000 commented on code in PR #15225:
URL: https://github.com/apache/kafka/pull/15225#discussion_r1457658098


##
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala:
##
@@ -277,6 +283,22 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
 producer.close()
   }
 
+  private def createTopicWithAssignment(topic: String, 
partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = {

Review Comment:
   Is it possible to place this method inside the `TestUtils`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] DO NOT MERGE: Isolate Connect tests [kafka]

2024-01-18 Thread via GitHub


C0urante opened a new pull request, #15229:
URL: https://github.com/apache/kafka/pull/15229

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2024-01-18 Thread via GitHub


C0urante merged PR #12290:
URL: https://github.com/apache/kafka/pull/12290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]

2024-01-18 Thread via GitHub


philipnee commented on PR #15228:
URL: https://github.com/apache/kafka/pull/15228#issuecomment-1898789273

   Thanks @apoorvmittal10 - Could you elaborate on the purpose of this log 
line? how important is it for the user to know about the "next update time"?  I 
wonder if we could just log it when update is happening.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-18 Thread via GitHub


pprovenzano commented on PR #15197:
URL: https://github.com/apache/kafka/pull/15197#issuecomment-1898795980

   Looks like it is already fixed with:
   MINOR: Fix compilation issue in ReplicaManagerTest (
   

   apache#15222 )
   

   
   
   The issue was caused by a collision of my change and the following:
   KAFKA-16078: Be more consistent about getting the latest MetadataVersion
   

   
   --Proven
   
   
   On Thu, Jan 18, 2024 at 3:16 AM Matthias J. Sax ***@***.***>
   wrote:
   
   > @pprovenzano  @cmccabe
   >  -- it seems this PR did break trunk build.
   >
   > > Task :core:compileTestScala
   > [Error] 
/Users/matthias/IdeaProjects/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:6178:23:
 value latest is not a member of object 
org.apache.kafka.server.common.MetadataVersion
   > one error found
   >
   > Can you take a look and help to fix it?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1457691847


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1492,6 +1501,20 @@ public static  Set diff(final Supplier> 
constructor, final Set l
 return result;
 }
 
+/**
+ * @param set Source set.
+ * @param toRemove Elements to remove.
+ * @return {@code set} copy without {@code toRemove} elements.
+ * @param  Element type.
+ */
+@SuppressWarnings("unchecked")
+public static  Set minus(Set set, T...toRemove) {

Review Comment:
   Moved



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -594,6 +594,15 @@ public static  String join(T[] strs, String separator) {
 return join(Arrays.asList(strs), separator);
 }
 
+/**
+ * Create a string representation of a collection joined by ", ".
+ * @param collection The list of items
+ * @return The string representation.
+ */
+public static  String join(Collection collection) {

Review Comment:
   Method removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] DO NOT MERGE: Isolate Connect OffsetsApiIntegrationTest [kafka]

2024-01-18 Thread via GitHub


C0urante closed pull request #15226: DO NOT MERGE: Isolate Connect 
OffsetsApiIntegrationTest
URL: https://github.com/apache/kafka/pull/15226


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] DO NOT MERGE: Isolate Connect OffsetsApiIntegrationTest [kafka]

2024-01-18 Thread via GitHub


C0urante commented on PR #15226:
URL: https://github.com/apache/kafka/pull/15226#issuecomment-1898810463

   Looks like isolating a single test suite removes the conditions that lead to 
flakiness. Closing in favor of https://github.com/apache/kafka/pull/15229


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-18 Thread via GitHub


C0urante closed pull request #15218: KAFKA-16107: Stop fetching while 
onPartitionsAssign completes 
URL: https://github.com/apache/kafka/pull/15218


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-18 Thread via GitHub


C0urante commented on PR #15218:
URL: https://github.com/apache/kafka/pull/15218#issuecomment-1898819047

   @anurag-harness This looks like an accidentally-opened PR and it copies the 
title from https://github.com/apache/kafka/pull/15215, which may lead to some 
confusion. I've closed it for now; please feel free to reopen if there are 
legitimate changes to the code base you'd like to propose.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457704365


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {
+

Review Comment:
   Do we really need this empty line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457704867


##
checkstyle/import-control.xml:
##
@@ -261,6 +261,10 @@
 
   
 
+
+  

Review Comment:
   Do we really need this empty block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457709089


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Can ew introduce new module and config into separate PR?



##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Can we introduce new module and config into separate PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15210:
URL: https://github.com/apache/kafka/pull/15210#discussion_r1457711259


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1547,6 +1553,29 @@ public String toString() {
 }
 }
 
+private class ConsumerCoordinatorMetrics {

Review Comment:
   I split the implementation into two classes because there's no need to pass 
the ref of this entire object to the request manager just for the commitSensor 
(see addCommitSensor method).  Instead, I think it would be a lot easier to 
pass the Metrics object to the manager and create their own sensors 
(essentially these metrics objects just hold a bunch of sensors referenced from 
Metrics).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15727: Added KRaft support in AlterUserScramCredentialsRequestNotAuthorizedTest [kafka]

2024-01-18 Thread via GitHub


adixitconfluent commented on PR #15224:
URL: https://github.com/apache/kafka/pull/15224#issuecomment-1898856832

   As represented by the CLI screenshot in the PR description, the tests that 
have been changed are passing. However the build is failing. Adding a 
screenshot of no new test failures from build. I can confirm the 2 tests 
`testAlterNothingNotAuthorized` and
   `testAlterSomethingNotAuthorized`I have changed are not a part of failing 
tests in build. Test failures - 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15224/1/tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16092) Queues for Kafka

2024-01-18 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-16092:
-
Labels: queues-for-kafka  (was: )

> Queues for Kafka
> 
>
> Key: KAFKA-16092
> URL: https://issues.apache.org/jira/browse/KAFKA-16092
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: queues-for-kafka
>
> This Jira tracks the development of KIP-932: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15197:
URL: https://github.com/apache/kafka/pull/15197#issuecomment-1898958611

   Thanks. The fix was pushed after I left my comment. Glad it's resolved.
   
   Yeah, I did expect that two overlapping PR got merged simultaneously.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode

2024-01-18 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16163:
--

 Summary: Constant resignation/reelection of controller when 
starting a single node in combined mode
 Key: KAFKA-16163
 URL: https://issues.apache.org/jira/browse/KAFKA-16163
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Mickael Maison


When starting a single node in combined mode:
{noformat}
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties
$ bin/kafka-server-start.sh config/kraft/server.properties{noformat}
 

it's constantly spamming the logs with:
{noformat}
[2024-01-18 17:37:09,065] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch request 
from the majority of the voters within 3000ms. Current fetched voters are []. 
(org.apache.kafka.raft.LeaderState)
[2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to 
ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, 
unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, 
epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, 
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
voterStates={1=ReplicaState(nodeId=1, 
endOffset=Optional[LogOffsetMetadata(offset=835, 
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) 
(org.apache.kafka.raft.QuorumState)
[2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,072] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,123] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,124] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,175] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,176] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,227] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,229] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,279] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread){noformat}
This did not happen in 3.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16164) Pre-Vote

2024-01-18 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-16164:


 Summary: Pre-Vote
 Key: KAFKA-16164
 URL: https://issues.apache.org/jira/browse/KAFKA-16164
 Project: Kafka
  Issue Type: Improvement
Reporter: Alyssa Huang


Implementing pre-vote as described in 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode

2024-01-18 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808336#comment-17808336
 ] 

Mickael Maison commented on KAFKA-16163:


It looks like this behavior was introduced in 
https://github.com/apache/kafka/commit/37416e1aebae33d01d5059ba906ec8e0e1107284

> Constant resignation/reelection of controller when starting a single node in 
> combined mode
> --
>
> Key: KAFKA-16163
> URL: https://issues.apache.org/jira/browse/KAFKA-16163
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Major
>
> When starting a single node in combined mode:
> {noformat}
> $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties
> $ bin/kafka-server-start.sh config/kraft/server.properties{noformat}
>  
> it's constantly spamming the logs with:
> {noformat}
> [2024-01-18 17:37:09,065] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch 
> request from the majority of the voters within 3000ms. Current fetched voters 
> are []. (org.apache.kafka.raft.LeaderState)
> [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to 
> ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, 
> unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, 
> epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, 
> metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
> voterStates={1=ReplicaState(nodeId=1, 
> endOffset=Optional[LogOffsetMetadata(offset=835, 
> metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
> lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, 
> hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState)
> [2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,072] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,123] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,124] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,175] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,176] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,227] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,229] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,279] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread){noformat}
> This did not happen in 3.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15151:
URL: https://github.com/apache/kafka/pull/15151#issuecomment-1898982326

   `trunk` was broken -- rebased. Also re-triggered system tests just to be 
sure: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6038/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457831330


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -918,7 +920,7 @@ public void replay(
 groupId,
 topic,
 partition,
-OffsetAndMetadata.fromRecord(value)
+OffsetAndMetadata.fromRecord(offset, value)

Review Comment:
   I wonder if that is easy to get confused. And if there is a way to make it 
easier to known what the parameters mean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457832081


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -712,13 +712,14 @@ public void run() {
 try {
 // Apply the records to the state machine.
 if (result.replayRecords()) {
-result.records().forEach(record ->
+for (int i = 0; i < result.records().size(); 
i++) {
 context.coordinator.replay(
+prevLastWrittenOffset + i,

Review Comment:
   could we maybe leave a comment about that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


OmniaGM commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457856799


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   it will be very small one which I don't believe it worth it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]

2024-01-18 Thread via GitHub


cmccabe opened a new pull request, #15230:
URL: https://github.com/apache/kafka/pull/15230

   When a broker is down, and a topic is deleted, this will result in that 
broker seeing "stray replicas" the next time it starts up. These replicas 
contain data that used to be important, but which now needs to be deleted. 
Stray replica deletion is handled during the initial metadata publishing step 
on the broker.
   
   Previously, we deleted these stray replicas after starting up BOTH 
LogManager and ReplicaManager. However, this wasn't quite correct. The presence 
of the stray replicas confused ReplicaManager. Instead, we should delete the 
stray replicas BEFORE starting ReplicaManager.
   
   This bug triggered when a topic was deleted and re-created while a broker 
was down, and some of the replicas of the re-created topic landed on that 
broker. The impact was that the stray replicas were deleted, but the new 
replicas for the next iteration of the topic never got created. This, in turn, 
led to persistent under-replication until the next time the broker was 
restarted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457875345


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,6 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   I think emitting metrics would have been helpful if we want to derive some 
meaningful information about the running reporter, but here the log was helpful 
to see if reporter is working correctly while testing/debugging. Rather than 
removing it, I moved it to trace so still there is a way to know what's the 
state of reporter if we need to debug application. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-01-18 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16165:
--

 Summary: Consumer invalid transition on expired poll interval
 Key: KAFKA-16165
 URL: https://issues.apache.org/jira/browse/KAFKA-16165
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 
org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on PR #15228:
URL: https://github.com/apache/kafka/pull/15228#issuecomment-1899066505

   > Thanks @apoorvmittal10 - Could you elaborate on the purpose of this log 
line? how important is it for the user to know about the "next update time"? I 
wonder if we could just log it when update is happening.
   
   @philipnee The purpose of the log line was to debug if reporter is actually 
working and what's the time remaining while testing (was helpful in 
development). Rather than completely removing the line, I have moved it to 
trace so there is still some way for debugging later.
   
   I have added another debug log line which will only be logged when a 
telemetry request is created in accordance with push interval time interval so 
that ll be minimal and will still let developer an idea if telemetry is working 
and in which state (get subscription or push telemetry).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]

2024-01-18 Thread via GitHub


gharris1727 commented on PR #13294:
URL: https://github.com/apache/kafka/pull/13294#issuecomment-1899079016

   Test failures appear unrelated, and the connect and mirror tests pass 
locally for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-01-18 Thread via GitHub


vvcephei commented on code in PR #15219:
URL: https://github.com/apache/kafka/pull/15219#discussion_r1457894664


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -146,7 +146,7 @@ public static  QueryResult handleBasicQueries(
 "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
 );
 }
-result.setPosition(position);
+result.setPosition(position.copy());

Review Comment:
   Better yet, we could make the copy at the beginning of this method or even 
on the caller side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on code in PR #15190:
URL: https://github.com/apache/kafka/pull/15190#discussion_r145796


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1140,9 +1141,9 @@ private[kafka] class Processor(
 expiredConnectionsKilledCount.record(null, 1, 0)
   } else {
 val connectionId = receive.source
-val context = new RequestContext(header, connectionId, 
channel.socketAddress,
-  channel.principal, listenerName, securityProtocol,
-  channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener, channel.principalSerde)
+val context = new RequestContext(header, connectionId, 
channel.socketAddress, Optional.of(channel.socketPort()),

Review Comment:
   I find RequestContext is either created in SocketServer or during forwarding 
requests. We currently don't require client port information outside KIP-714, 
push telemetry request is not marked forwardable, wiring the client port 
information elsewhere seems not useful at this point of time hence I marked it 
as Optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]

2024-01-18 Thread via GitHub


gharris1727 merged PR #13294:
URL: https://github.com/apache/kafka/pull/13294


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2024-01-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-5863:
---
Fix Version/s: 3.8.0

> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Ted Yu
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.8.0
>
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15807) Add support for compression/decompression of metrics

2024-01-18 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-15807.
---
Resolution: Done

> Add support for compression/decompression of metrics
> 
>
> Key: KAFKA-15807
> URL: https://issues.apache.org/jira/browse/KAFKA-15807
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-18 Thread via GitHub


mumrah commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1457908869


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+   * index that is not included in the result.
+   *
+   * @param image   The metadata image
+   * @param topicName   The name of the topic.
+   * @param listenerNameThe listener name.
+   * @param startIndex  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
+   *Note that, the upper index can be 
larger than the largest partition index in
+   *this topic.
+   * @returnA collection of topic partition 
metadata and next partition index (-1 means
+   *no next partition).
+   */
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName,
+startIndex: Int,
+maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => (None, -1)
+  case Some(topic) => {
+val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+val partitions = topic.partitions().keySet()
+val upperIndex = topic.partitions().size().min(startIndex + maxCount)
+val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
+for (partitionId <- startIndex until upperIndex) {
+  topic.partitions().get(partitionId) match {
+case partition : PartitionRegistration => {
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName, false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(MetadataResponse.NO_LEADER_ID)
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+case Some(leader) =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(leader.id())
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+  }
+}
+case _ =>

Review Comment:
   Should we throw an ISE here rather than silently continue? Maybe we could 
just log an error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-18 Thread via GitHub


divijvaidya commented on PR #15133:
URL: https://github.com/apache/kafka/pull/15133#issuecomment-1899108113

   > @divijvaidya , since Satish is busy, could you help review this PR? We'd 
like to get it into v3.7.0 for the completion of KIP-963. Thanks.
   
   Sorry I have been busy with work lately. Will look at this first thing 
tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1457913561


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -861,19 +861,9 @@ private void maybeUpdatePartitionEpoch(
 ConsumerGroupMember oldMember,
 ConsumerGroupMember newMember
 ) {
-if (oldMember == null) {
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-} else {
-if 
(!oldMember.assignedPartitions().equals(newMember.assignedPartitions())) {
-removePartitionEpochs(oldMember.assignedPartitions());
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-}
-if 
(!oldMember.partitionsPendingRevocation().equals(newMember.partitionsPendingRevocation()))
 {
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-}
-}
+maybeRemovePartitionEpoch(oldMember);

Review Comment:
   That was the only thing I could think of as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16164: Pre-Vote RPCs [part 1] [kafka]

2024-01-18 Thread via GitHub


ahuang98 opened a new pull request, #15231:
URL: https://github.com/apache/kafka/pull/15231

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1457917435


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -875,30 +875,43 @@ private void maybeRemovePartitionEpoch(
 ConsumerGroupMember oldMember
 ) {
 if (oldMember != null) {
-removePartitionEpochs(oldMember.assignedPartitions());
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
+removePartitionEpochs(oldMember.assignedPartitions(), 
oldMember.memberEpoch());
+removePartitionEpochs(oldMember.partitionsPendingRevocation(), 
oldMember.memberEpoch());
 }
 }
 
 /**
  * Removes the partition epochs based on the provided assignment.
  *
  * @param assignmentThe assignment.
+ * @param expectedEpoch The expected epoch.
+ * @throws IllegalStateException if the epoch does not match the expected 
one.
+ * package-private for testing.
  */
-private void removePartitionEpochs(
-Map> assignment
+void removePartitionEpochs(
+Map> assignment,
+int expectedEpoch
 ) {
 assignment.forEach((topicId, assignedPartitions) -> {
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
-assignedPartitions.forEach(partitionsOrNull::remove);
+assignedPartitions.forEach(partitionId -> {
+Integer prevValue = 
partitionsOrNull.remove(partitionId);
+if (prevValue != expectedEpoch) {
+throw new IllegalStateException(
+String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
+"still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+}
+});
 if (partitionsOrNull.isEmpty()) {
 return null;
 } else {
 return partitionsOrNull;
 }
 } else {
-return null;
+throw new IllegalStateException(

Review Comment:
   What is the affect of throwing this error? Do we also block removing the 
rest of the partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-18 Thread via GitHub


artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1457923250


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+   * index that is not included in the result.
+   *
+   * @param image   The metadata image
+   * @param topicName   The name of the topic.
+   * @param listenerNameThe listener name.
+   * @param startIndex  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
+   *Note that, the upper index can be 
larger than the largest partition index in
+   *this topic.
+   * @returnA collection of topic partition 
metadata and next partition index (-1 means
+   *no next partition).
+   */
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName,
+startIndex: Int,
+maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => (None, -1)
+  case Some(topic) => {
+val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+val partitions = topic.partitions().keySet()
+val upperIndex = topic.partitions().size().min(startIndex + maxCount)
+val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
+for (partitionId <- startIndex until upperIndex) {
+  topic.partitions().get(partitionId) match {
+case partition : PartitionRegistration => {
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName, false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(MetadataResponse.NO_LEADER_ID)
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+case Some(leader) =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(leader.id())
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+  }
+}
+case _ =>

Review Comment:
   Definitely should log an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-01-18 Thread via GitHub


mjsax commented on code in PR #15219:
URL: https://github.com/apache/kafka/pull/15219#discussion_r1457939699


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -146,7 +146,7 @@ public static  QueryResult handleBasicQueries(
 "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
 );
 }
-result.setPosition(position);
+result.setPosition(position.copy());

Review Comment:
   Thanks. I was hoping it would be thready-safe already -- maybe not. Let me 
double check the code and figure it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457941137


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -379,6 +379,7 @@ public Optional> createRequest() 
{
 lock.readLock().unlock();
 }
 
+log.debug("Creating telemetry request. Telemetry state: {}", 
localState);

Review Comment:
   it would be useful to know the type of the request client is sending: either 
it is a subscriptionRequest or push request.  I guess we can derive it from the 
current state, but state is rather an internal thing so that the person looking 
at it might not know what it means. my suggestion is to be explicit about "we 
are sending a push request" or "we are sending a subscription request".  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457944304


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);
+log.trace("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   Do you think it makes sense to rephrase the log? For the `msg` can we 
explicitly say:
   
   ...client will wait for {}ms before submitting the next...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457945378


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);
+log.trace("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   so that we can remove the "returning the value 224678 ms" - it was a bit 
hard to understand what it actually mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2024-01-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15843:
---
Description: 
Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
is not the case when triggering onPartitionsRevoked or Lost). This is the 
behaviour of the legacy coordinator, and the new consumer implementation 
maintains the same principle. We should review this to fully understand if it 
is really needed to call onPartitionsAssigned with empty assignment (or if it 
should behave consistently with the onRevoke/Lost).

Note that the consumer integration tests rely on this call to 
onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)

  was:Legacy coordinator triggers onPartitionsAssigned with empty assignment 
(which is not the case when triggering onPartitionsRevoked or Lost). This is 
the behaviour of the legacy coordinator, and the new consumer implementation 
maintains the same principle. We should review this to fully understand if it 
is really needed to call onPartitionsAssigned with empty assignment (or if it 
should behave consistently with the onRevoke/Lost)


> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost).
> Note that the consumer integration tests rely on this call to 
> onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >