Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-12 Thread via GitHub


chia7712 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1676503836


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsUnsubscribe() {
+MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
+subscriptionState, commitRequestManager, metadata, 
LOG_CONTEXT, Optional.empty(),
+backgroundEventHandler, time, rebalanceMetricsManager);
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsFenced() {
+MembershipManagerImpl membershipManager = createMemberInStableState();
+ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+mockFencedMemberStuckOnUserCallback(membershipManager, invoker);
+assertEquals(MemberState.FENCED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsStale() {

Review Comment:
   > Let's keep the testLeaveGroupWhenStateIsFatal, we didn't have coverage for 
that before this PR
   
   It seems to me `testLeaveGroupWhenStateIsFatal` is used to verify 
"leaveGroup" after it is in fatal state. There are many existent cases which 
can offer the scenario. For example: `testFatalFailureWhenMemberAlreadyLeft`. 
Hence, we can add `leaveGroup` and `verify(subscriptionState)` to those test 
cases after the state is in fatal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-12 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1676110674


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsUnsubscribe() {
+MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
+subscriptionState, commitRequestManager, metadata, 
LOG_CONTEXT, Optional.empty(),
+backgroundEventHandler, time, rebalanceMetricsManager);
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsFenced() {
+MembershipManagerImpl membershipManager = createMemberInStableState();
+ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+mockFencedMemberStuckOnUserCallback(membershipManager, invoker);
+assertEquals(MemberState.FENCED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsStale() {

Review Comment:
   one last: the `testLeaveGroupWhenStateIsUnsubscribe` could be removed too, 
we already have `testLeaveGroupWhenMemberAlreadyLeft` for that. (Let's keep the 
`testLeaveGroupWhenStateIsFatal`, we didn't have coverage for that before this 
PR, thanks!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-12 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1676082409


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsUnsubscribe() {
+MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
+subscriptionState, commitRequestManager, metadata, 
LOG_CONTEXT, Optional.empty(),
+backgroundEventHandler, time, rebalanceMetricsManager);
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsFenced() {
+MembershipManagerImpl membershipManager = createMemberInStableState();
+ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+mockFencedMemberStuckOnUserCallback(membershipManager, invoker);
+assertEquals(MemberState.FENCED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsStale() {

Review Comment:
   Agree. Removed them. Thanks for review and finding this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-12 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1676058719


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsUnsubscribe() {
+MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
+subscriptionState, commitRequestManager, metadata, 
LOG_CONTEXT, Optional.empty(),
+backgroundEventHandler, time, rebalanceMetricsManager);
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsFenced() {
+MembershipManagerImpl membershipManager = createMemberInStableState();
+ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+mockFencedMemberStuckOnUserCallback(membershipManager, invoker);
+assertEquals(MemberState.FENCED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsStale() {

Review Comment:
   sorry a bit late, but I noticed now that this test is a duplicate, we 
already have a `testLeaveGroupWhenMemberIsStale`, that you updated to verify 
the unsubscribe, so I would say we just remove this one (and 
`testLeaveGroupWhenStateIsFenced`, that seems a dup of 
`testLeaveGroupWhenMemberFenced`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-12 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1676058719


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsUnsubscribe() {
+MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
+subscriptionState, commitRequestManager, metadata, 
LOG_CONTEXT, Optional.empty(),
+backgroundEventHandler, time, rebalanceMetricsManager);
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsFenced() {
+MembershipManagerImpl membershipManager = createMemberInStableState();
+ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+mockFencedMemberStuckOnUserCallback(membershipManager, invoker);
+assertEquals(MemberState.FENCED, membershipManager.state());
+
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();
+}
+
+@Test
+public void testLeaveGroupWhenStateIsStale() {

Review Comment:
   sorry a bit late, but I noticed now that this test is a duplicate, we 
already have a `testLeaveGroupWhenMemberIsStale`, that you updated to verify 
the unsubscribe, so I would say we just remove this one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-11 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1674145282


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1501,11 +1501,11 @@ public void unsubscribe() {
 
 private void resetGroupMetadata() {
 groupMetadata.updateAndGet(
-oldGroupMetadataOptional -> oldGroupMetadataOptional
-.map(oldGroupMetadata -> initializeConsumerGroupMetadata(
-oldGroupMetadata.groupId(),
-oldGroupMetadata.groupInstanceId()
-))
+oldGroupMetadataOptional -> oldGroupMetadataOptional
+.map(oldGroupMetadata -> 
initializeConsumerGroupMetadata(
+oldGroupMetadata.groupId(),
+oldGroupMetadata.groupInstanceId()
+))

Review Comment:
   Yeah, we should revert this change. Thanks for the comment. I have rebased 
trunk branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-11 Thread via GitHub


lianetm commented on PR #16449:
URL: https://github.com/apache/kafka/pull/16449#issuecomment-2223050194

   Hey @FrankYang0529 , thanks for the updates. Only one nit left. Also I 
notice the build did not complete, please merge trunk latest changes too and 
let's see what we get on the next run. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1674077031


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1501,11 +1501,11 @@ public void unsubscribe() {
 
 private void resetGroupMetadata() {
 groupMetadata.updateAndGet(
-oldGroupMetadataOptional -> oldGroupMetadataOptional
-.map(oldGroupMetadata -> initializeConsumerGroupMetadata(
-oldGroupMetadata.groupId(),
-oldGroupMetadata.groupInstanceId()
-))
+oldGroupMetadataOptional -> oldGroupMetadataOptional
+.map(oldGroupMetadata -> 
initializeConsumerGroupMetadata(
+oldGroupMetadata.groupId(),
+oldGroupMetadata.groupInstanceId()
+))

Review Comment:
   format is off here right? (I guess it's because you had it inside an `if` at 
some point, but was then removed). I expect we should end up with the 
`resetGroupMetadata` unchanged in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-09 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1670789098


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -25,62 +25,98 @@
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
 import org.apache.kafka.common.utils.LogContext;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ApplicationEventProcessorTest {
 private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
 private ApplicationEventProcessor processor;
 private CommitRequestManager commitRequestManager;
 private HeartbeatRequestManager heartbeatRequestManager;
 private MembershipManager membershipManager;
+private SubscriptionState subscriptionState;
 
-@BeforeEach
-public void setup() {
+private void setupProcessor(boolean withGroupId) {
 LogContext logContext = new LogContext();
 OffsetsRequestManager offsetsRequestManager = 
mock(OffsetsRequestManager.class);
 TopicMetadataRequestManager topicMetadataRequestManager = 
mock(TopicMetadataRequestManager.class);
 FetchRequestManager fetchRequestManager = 
mock(FetchRequestManager.class);
-CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
-commitRequestManager = mock(CommitRequestManager.class);
-heartbeatRequestManager = mock(HeartbeatRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-RequestManagers requestManagers = new RequestManagers(
-logContext,
-offsetsRequestManager,
-topicMetadataRequestManager,
-fetchRequestManager,
-Optional.of(coordinatorRequestManager),
-Optional.of(commitRequestManager),
-Optional.of(heartbeatRequestManager),
-Optional.of(membershipManager)
-);
+RequestManagers requestManagers;
+if (withGroupId) {
+CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
+commitRequestManager = mock(CommitRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+subscriptionState = mock(SubscriptionState.class);

Review Comment:
   Thanks for the suggestion. I update it and make it more simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-09 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1670787996


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1499,14 +1499,16 @@ public void unsubscribe() {
 }
 }
 
-private void resetGroupMetadata() {
-groupMetadata.updateAndGet(
-oldGroupMetadataOptional -> oldGroupMetadataOptional
-.map(oldGroupMetadata -> initializeConsumerGroupMetadata(
-oldGroupMetadata.groupId(),
-oldGroupMetadata.groupInstanceId()
-))
-);
+private void maybeResetGroupMetadata() {
+if (groupMetadata.get().isPresent()) {

Review Comment:
   Thanks for the suggestion. I didn't notice that. Yeah, we don't need to 
check whether `groupMetadata` is present here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-09 Thread via GitHub


lianetm commented on PR #16449:
URL: https://github.com/apache/kafka/pull/16449#issuecomment-2218046299

   Thanks for the updates @FrankYang0529 , left a suggestion for consideration. 
   
   Also could you please rebase to trigger the build? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-09 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1670733263


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -25,62 +25,98 @@
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
 import org.apache.kafka.common.utils.LogContext;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ApplicationEventProcessorTest {
 private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
 private ApplicationEventProcessor processor;
 private CommitRequestManager commitRequestManager;
 private HeartbeatRequestManager heartbeatRequestManager;
 private MembershipManager membershipManager;
+private SubscriptionState subscriptionState;
 
-@BeforeEach
-public void setup() {
+private void setupProcessor(boolean withGroupId) {
 LogContext logContext = new LogContext();
 OffsetsRequestManager offsetsRequestManager = 
mock(OffsetsRequestManager.class);
 TopicMetadataRequestManager topicMetadataRequestManager = 
mock(TopicMetadataRequestManager.class);
 FetchRequestManager fetchRequestManager = 
mock(FetchRequestManager.class);
-CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
-commitRequestManager = mock(CommitRequestManager.class);
-heartbeatRequestManager = mock(HeartbeatRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-RequestManagers requestManagers = new RequestManagers(
-logContext,
-offsetsRequestManager,
-topicMetadataRequestManager,
-fetchRequestManager,
-Optional.of(coordinatorRequestManager),
-Optional.of(commitRequestManager),
-Optional.of(heartbeatRequestManager),
-Optional.of(membershipManager)
-);
+RequestManagers requestManagers;
+if (withGroupId) {
+CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
+commitRequestManager = mock(CommitRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+subscriptionState = mock(SubscriptionState.class);

Review Comment:
   we don't need these vars, so what about we remove them all and pass the mock 
directly to the requestManagers constructor? 
   
   Actually, we could consider simplifying more and reduce the whole block from 
ln 57 to ln 85 to a much shorter single path for creating the requestManagers:
   ```
   RequestManagers requestManagers = new RequestManagers(
   logContext,
   offsetsRequestManager,
   topicMetadataRequestManager,
   fetchRequestManager,
   withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) 
: Optional.empty(),
   withGroupId ? Optional.of(mock(CommitRequestManager.class)) : 
Optional.empty(),
   withGroupId ? Optional.of(mock(HeartbeatRequestManager.class)) : 
Optional.empty(),
   withGroupId ? Optional.of(mock(MembershipManager.class)) : 
Optional.empty());
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-09 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1670733263


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -25,62 +25,98 @@
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
 import org.apache.kafka.common.utils.LogContext;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ApplicationEventProcessorTest {
 private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
 private ApplicationEventProcessor processor;
 private CommitRequestManager commitRequestManager;
 private HeartbeatRequestManager heartbeatRequestManager;
 private MembershipManager membershipManager;
+private SubscriptionState subscriptionState;
 
-@BeforeEach
-public void setup() {
+private void setupProcessor(boolean withGroupId) {
 LogContext logContext = new LogContext();
 OffsetsRequestManager offsetsRequestManager = 
mock(OffsetsRequestManager.class);
 TopicMetadataRequestManager topicMetadataRequestManager = 
mock(TopicMetadataRequestManager.class);
 FetchRequestManager fetchRequestManager = 
mock(FetchRequestManager.class);
-CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
-commitRequestManager = mock(CommitRequestManager.class);
-heartbeatRequestManager = mock(HeartbeatRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-RequestManagers requestManagers = new RequestManagers(
-logContext,
-offsetsRequestManager,
-topicMetadataRequestManager,
-fetchRequestManager,
-Optional.of(coordinatorRequestManager),
-Optional.of(commitRequestManager),
-Optional.of(heartbeatRequestManager),
-Optional.of(membershipManager)
-);
+RequestManagers requestManagers;
+if (withGroupId) {
+CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
+commitRequestManager = mock(CommitRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+subscriptionState = mock(SubscriptionState.class);

Review Comment:
   we don't need these vars, so what about we remove them all and pass the mock 
directly to the requestManagers constructor? 
   
   Actually, we could consider simplifying more and reduce the whole block from 
ln 57 to ln 85 to a much shorter single path for creating the requestManagers:
   ```suggestion
   RequestManagers requestManagers = new RequestManagers(
   logContext,
   offsetsRequestManager,
   topicMetadataRequestManager,
   fetchRequestManager,
   withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) 
: Optional.empty(),
   withGroupId ? Optional.of(mock(CommitRequestManager.class)) : 
Optional.empty(),
   withGroupId ? Optional.of(mock(HeartbeatRequestManager.class)) : 
Optional.empty(),
   withGroupId ? Optional.of(mock(MembershipManager.class)) : 
Optional.empty());
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-09 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1670689743


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1499,14 +1499,16 @@ public void unsubscribe() {
 }
 }
 
-private void resetGroupMetadata() {
-groupMetadata.updateAndGet(
-oldGroupMetadataOptional -> oldGroupMetadataOptional
-.map(oldGroupMetadata -> initializeConsumerGroupMetadata(
-oldGroupMetadata.groupId(),
-oldGroupMetadata.groupInstanceId()
-))
-);
+private void maybeResetGroupMetadata() {
+if (groupMetadata.get().isPresent()) {

Review Comment:
   We do want this semantics, but I would say we don't need the check because 
we already get it from the `.map` call on the Optional right below (it will 
only map if there is a value)



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1499,14 +1499,16 @@ public void unsubscribe() {
 }
 }
 
-private void resetGroupMetadata() {
-groupMetadata.updateAndGet(
-oldGroupMetadataOptional -> oldGroupMetadataOptional
-.map(oldGroupMetadata -> initializeConsumerGroupMetadata(
-oldGroupMetadata.groupId(),
-oldGroupMetadata.groupInstanceId()
-))
-);
+private void maybeResetGroupMetadata() {
+if (groupMetadata.get().isPresent()) {

Review Comment:
   We do want this semantic, but I would say we don't need the check because we 
already get it from the `.map` call on the Optional right below (it will only 
map if there is a value)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-06 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597909


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1476,21 +1478,19 @@ public void unsubscribe() {
 acquireAndEnsureOpen();
 try {
 fetchBuffer.retainAll(Collections.emptySet());
-if (groupMetadata.get().isPresent()) {
-Timer timer = time.timer(Long.MAX_VALUE);
-UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
-applicationEventHandler.add(unsubscribeEvent);
-log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
+Timer timer = time.timer(Long.MAX_VALUE);
+UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
+applicationEventHandler.add(unsubscribeEvent);
+log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
 subscriptions.assignedPartitions());
 
-try {
-processBackgroundEvents(unsubscribeEvent.future(), timer);
-log.info("Unsubscribed all topics or patterns and assigned 
partitions");
-} catch (TimeoutException e) {
-log.error("Failed while waiting for the unsubscribe event 
to complete");
-}
-resetGroupMetadata();
+try {
+processBackgroundEvents(unsubscribeEvent.future(), timer);
+log.info("Unsubscribed all topics or patterns and assigned 
partitions");
+} catch (TimeoutException e) {
+log.error("Failed while waiting for the unsubscribe event to 
complete");
 }
+if (groupMetadata.get().isPresent()) resetGroupMetadata();

Review Comment:
   Change `resetGroupMetadata` to `maybeResetGroupMetadata` and move 
`groupMetadata.get().isPresent()` to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-06 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597763


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -213,14 +217,16 @@ private void process(final SubscriptionChangeEvent 
ignored) {
  *  the group is sent out.
  */
 private void process(final UnsubscribeEvent event) {
-if (!requestManagers.heartbeatRequestManager.isPresent()) {
-KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
-event.future().completeExceptionally(error);
-return;
+if (requestManagers.heartbeatRequestManager.isPresent()) {
+MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
+CompletableFuture future = membershipManager.leaveGroup();
+future.whenComplete(complete(event.future()));
+} else {
+// If the group membership manager is not present, we can't send 
the leave group request,

Review Comment:
   Update the comment. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-06 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597826


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -69,7 +70,8 @@ public void setup() {
 processor = new ApplicationEventProcessor(
 new LogContext(),
 requestManagers,
-metadata
+metadata,
+mock(SubscriptionState.class)

Review Comment:
   Add related test cases. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667045593


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1476,21 +1478,19 @@ public void unsubscribe() {
 acquireAndEnsureOpen();
 try {
 fetchBuffer.retainAll(Collections.emptySet());
-if (groupMetadata.get().isPresent()) {
-Timer timer = time.timer(Long.MAX_VALUE);
-UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
-applicationEventHandler.add(unsubscribeEvent);
-log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
+Timer timer = time.timer(Long.MAX_VALUE);
+UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
+applicationEventHandler.add(unsubscribeEvent);
+log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
 subscriptions.assignedPartitions());
 
-try {
-processBackgroundEvents(unsubscribeEvent.future(), timer);
-log.info("Unsubscribed all topics or patterns and assigned 
partitions");
-} catch (TimeoutException e) {
-log.error("Failed while waiting for the unsubscribe event 
to complete");
-}
-resetGroupMetadata();
+try {
+processBackgroundEvents(unsubscribeEvent.future(), timer);
+log.info("Unsubscribed all topics or patterns and assigned 
partitions");
+} catch (TimeoutException e) {
+log.error("Failed while waiting for the unsubscribe event to 
complete");
 }
+if (groupMetadata.get().isPresent()) resetGroupMetadata();

Review Comment:
   nit: this `resetGroupMetadata` is only used here I believe. We could simply 
have a "maybeResetGroupMetadata();" call here, and move the check for 
groupMetadata.get().isPresent() into it? seems cleaner to me but totally up to 
you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667043062


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -69,7 +70,8 @@ public void setup() {
 processor = new ApplicationEventProcessor(
 new LogContext(),
 requestManagers,
-metadata
+metadata,
+mock(SubscriptionState.class)

Review Comment:
   Could we add a test for the UnsubscribeEvent processing to cover what we 
expect on the 2 paths:
   - if groupId => verify(membershipManager).leaveGroup();
   - else => verify(subscriptionState).unsubscribe();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667039196


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -213,14 +217,16 @@ private void process(final SubscriptionChangeEvent 
ignored) {
  *  the group is sent out.
  */
 private void process(final UnsubscribeEvent event) {
-if (!requestManagers.heartbeatRequestManager.isPresent()) {
-KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
-event.future().completeExceptionally(error);
-return;
+if (requestManagers.heartbeatRequestManager.isPresent()) {
+MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
+CompletableFuture future = membershipManager.leaveGroup();
+future.whenComplete(complete(event.future()));
+} else {
+// If the group membership manager is not present, we can't send 
the leave group request,

Review Comment:
   nit for a clearer comment maybe: what about just stating that If the 
consumer is not using the group management capabilities, we still need to clear 
all assignments it may have? (seems simpler, to the root cause and action 
needed). The "can't send leave group" is true, but that's just one of the many 
actions we can't (and actually don't need to) take just because the consumer is 
not using group mgmt. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667035370


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -213,14 +217,14 @@ private void process(final SubscriptionChangeEvent 
ignored) {
  *  the group is sent out.
  */
 private void process(final UnsubscribeEvent event) {
-if (!requestManagers.heartbeatRequestManager.isPresent()) {
-KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
-event.future().completeExceptionally(error);
-return;
+event.future().whenComplete((__, ___) -> subscriptions.unsubscribe());
+if (requestManagers.heartbeatRequestManager.isPresent()) {
+MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
+CompletableFuture future = membershipManager.leaveGroup();
+future.whenComplete(complete(event.future()));
+} else {
+event.future().complete(null);

Review Comment:
   Agree, and we should clarify it here, and in the UnsubscribeEvent itself I 
would say. I checked the comments on both, and seems ok if we keep them like 
they are (all holds true), and just add a final sentence to describe that if no 
group ID has been defined by the consumer, the UnsubscribeEvent will still 
ensure that all topics and assigned partitions are cleared.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667008009


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -687,7 +687,6 @@ public CompletableFuture leaveGroup() {
 "to clear its assignment and send a leave group 
heartbeat", memberId);
 }
 // Clear the subscription, no matter if the callback execution 
failed or succeeded.
-subscriptions.unsubscribe();

Review Comment:
   Okay. I revert the change in `MembershipManagerImpl.java#leaveGroup` and 
call `subscriptions.unsubscribe()` directly in 
`ApplicationEventProcessor#process` if `heartbeatRequestManager` is not present.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


chia7712 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1666993636


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -687,7 +687,6 @@ public CompletableFuture leaveGroup() {
 "to clear its assignment and send a leave group 
heartbeat", memberId);
 }
 // Clear the subscription, no matter if the callback execution 
failed or succeeded.
-subscriptions.unsubscribe();

Review Comment:
   This change means `leaveGroup` does NOT cleanup any subscribed/assigned 
partitions from `SubscriptionState`. It seems to me that is a bit weird since 
that is `leaveGroup`'s  responsibility to ensure the cleanup of 
subscribed/assigned partitions.  Maybe another solution is to add 
`subscriptions.unsubscribe` to all paths of `leaveGroup`. for example:
   ```java
   if (isNotInGroup()) {
   if (state == MemberState.FENCED) {
   clearAssignment();
   transitionTo(MemberState.UNSUBSCRIBED);
   }
   subscriptions.unsubscribe();
   return CompletableFuture.completedFuture(null);
   }
   ```
   
   Also, in the `process(UnsubscribeEvent)` we can call 
`subscriptions.unsubscribe();` directly if `MembershipMgr` is nonexistent. WDYT?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -213,14 +217,14 @@ private void process(final SubscriptionChangeEvent 
ignored) {
  *  the group is sent out.
  */
 private void process(final UnsubscribeEvent event) {
-if (!requestManagers.heartbeatRequestManager.isPresent()) {
-KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
-event.future().completeExceptionally(error);
-return;
+event.future().whenComplete((__, ___) -> subscriptions.unsubscribe());
+if (requestManagers.heartbeatRequestManager.isPresent()) {
+MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
+CompletableFuture future = membershipManager.leaveGroup();
+future.whenComplete(complete(event.future()));
+} else {
+event.future().complete(null);

Review Comment:
   This is another behavior change: `UnsubscribeEvent` can be sent even though 
`membershipManager` is not running 
   
   Hence, please add comments to explain it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1666979963


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1490,6 +1490,8 @@ public void unsubscribe() {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
 resetGroupMetadata();
+} else {

Review Comment:
   Hi @chia7712, sorry, I didn't notice that. I introduce `SubscriptionState` 
to `ApplicationEventProcessor`, call `subscriptions.unsubscribe()` for 
`UnsubscribeEvent`, and remove `subscriptions.unsubscribe()` from 
`MembershipManagerImpl#leaveGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


chia7712 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1666864664


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1490,6 +1490,8 @@ public void unsubscribe() {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
 resetGroupMetadata();
+} else {

Review Comment:
   @FrankYang0529 Please take a look at @lianetm comments:
   
   > we would have to end up calling the subcriptionState.unsubscribe from the 
ApplicationEventProcessor where we process(UnsubscribeEvent). We wouldn't be 
able to reuse the MembershipMgr logic (which is part of what I had in mind when 
commenting on the jira), because if there is no groupId we don't even have an 
instance of the MembershipMgr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1666855070


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1490,6 +1490,8 @@ public void unsubscribe() {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
 resetGroupMetadata();
+} else {

Review Comment:
   Hi @chia7712 and @lianetm, it looks like we can't use `UnsubscribeEvent` 
when group id is empty, because `heartbeatRequestManager` will be empty. The 
test case `PlaintextConsumerTest#testConsumingWithNullGroupId` can't pass.
   
   
https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L215-L220
   
   
https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java#L161-L208



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-05 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1666386551


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1490,6 +1490,8 @@ public void unsubscribe() {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
 resetGroupMetadata();
+} else {

Review Comment:
   Hi @chia7712, thanks for the suggestion. I also updated related test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-04 Thread via GitHub


chia7712 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665964592


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1490,6 +1490,8 @@ public void unsubscribe() {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
 resetGroupMetadata();
+} else {

Review Comment:
   I like the idea too. 
   
   > we would have to end up calling the subcriptionState.unsubscribe from the 
ApplicationEventProcessor where we process(UnsubscribeEvent). We wouldn't be 
able to reuse the MembershipMgr logic (which is part of what I had in mind when 
commenting on the jira), because if there is no groupId we don't even have an 
instance of the MembershipMgr.
   
   nice point!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-04 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665958301


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1490,6 +1490,8 @@ public void unsubscribe() {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
 resetGroupMetadata();
+} else {

Review Comment:
   yeap, I really liked that idea when I shared it. We should only note that if 
we take that path, we would have to end up calling the 
`subcriptionState.unsubscribe` from the `ApplicationEventProcessor` where we 
process(UnsubscribeEvent). We wouldn't be able to reuse the `MembershipMgr` 
logic (which is part of what I had in mind when commenting on the jira), 
because if there is no groupId we don't even have an instance of the 
MembershipMgr. 
   
   Still, I like the idea because 1) simplifies the consumer.unsubscribe, 2) 
all subscriptionState.unsubscribe calls are in the background thread associated 
to the UnsubscribeEvent (directly if no groupId, indirectly if groupId present, 
delegating on the membershipMgr). Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-04 Thread via GitHub


chia7712 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665940798


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1490,6 +1490,8 @@ public void unsubscribe() {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
 resetGroupMetadata();
+} else {

Review Comment:
   inspired by @lianetm comments in jira. Maybe we can leverage 
`UnsubscribeEvent` to handle the `unsubscribe`. For example:
   
   ```java
   public void unsubscribe() {
   acquireAndEnsureOpen();
   try {
   fetchBuffer.retainAll(Collections.emptySet());
   Timer timer = time.timer(Long.MAX_VALUE);
   UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
   applicationEventHandler.add(unsubscribeEvent);
   log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
   subscriptions.assignedPartitions());
   try {
   processBackgroundEvents(unsubscribeEvent.future(), timer);
   log.info("Unsubscribed all topics or patterns and assigned 
partitions");
   } catch (TimeoutException e) {
   log.error("Failed while waiting for the unsubscribe event to 
complete");
   }
   if (groupMetadata.get().isPresent()) resetGroupMetadata();
   } catch (Exception e) {
   log.error("Unsubscribe failed", e);
   throw e;
   } finally {
   release();
   }
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-04 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665795593


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2121,6 +2136,8 @@ private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
 assertTrue(membershipManager.currentAssignment().isNone());
 assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
 assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();

Review Comment:
Thank you for the detailed explanation. Sorry that I misunderstood your 
previous comment. I need to read KIP-848 again to catch up more detailed logic 
here. Could you also check that whether other places use correct 
`verify(subscriptionState).unsubscribe();` statement? Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-04 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665795593


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2121,6 +2136,8 @@ private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
 assertTrue(membershipManager.currentAssignment().isNone());
 assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
 assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();

Review Comment:
Thank you for the detailed explanation. Sorry that I misunderstood your 
previous comment. I need to read KIP-848 again to catch up more detailed logic 
here. Could you also check that whether other places use correct 
`verify(subscriptionState).unsubscribe();` statement. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-04 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665774645


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2121,6 +2136,8 @@ private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
 assertTrue(membershipManager.currentAssignment().isNone());
 assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
 assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
+membershipManager.leaveGroup();
+verify(subscriptionState).unsubscribe();

Review Comment:
   uhm I would say we don't want to force a leave group here, because it 
changes the semantics of all the tests around stale members. All stale members 
send a leave group request to the broker (therefore the test name), but not all 
stale members receive an API-triggered call to leaveGroup (diff stuff). So we 
have 2 scenarios for stale members here:
   
   1. member becomes stale + consumer poll -> it will rejoin the group reusing 
the same subscription it had. This is probably the common case, working fine 
before this PR (stale clears assignment only when it completes callbacks 
[here](https://github.com/apache/kafka/blob/cb63fdb77ed869f2c519ca002ca757425b6b3076/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L900))
   1. member goes STALE  + consumer leaves group before the stale member has 
the chance to rejoin ->  we want to clear the subscription, because of the 
"external" leave group call. This is the edge case this PR fixes and adds tests 
for, you already added `testLeaveGroupWhenStateIsStale`. 
   
   Actually, for the common case 1 of stale members that will rejoin (do not 
receive a forced call to leaveGroup), we should `verify(subscriptionState, 
never()).unsubscribe();`.We should add that line here instead. Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-04 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665591893


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   Thanks for the suggestion. I add `verify(subscriptionState).unsubscribe();` 
to test cases which calling `leaveGroup`. The only different one is 
`testLeaveGroupWhenMemberAlreadyLeaving`. I use `verify(subscriptionState, 
never()).unsubscribe();` for second leave attempt. Could you take a look again? 
Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-03 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665069309


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   Cool. Related although not introduced by this PR, this makes me notice that 
we don't have this full story covered in the tests (not testing the call to 
`subscriptionState.unsubscribed` when `isNotInGroup` is false).  
   
   Would you mind adding the same verification you added for `notInGroup` true, 
to the func that covers this for `notInGroup` false? Should be the same 
`verify(subscriptionState).unsubscribe();` but added to 
`assertStaleMemberLeavesGroupAndClearsAssignment` I would say. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-03 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301020


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   Yeah, I agree we should not have test cases which are not real cases. I 
change `spy` back to `mock` and only test `subscriptionState#unsubscribe` is 
called when `isNotInGroup` is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-03 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301811


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() {
 verify(backgroundEventReaper).reap(time.milliseconds());
 }
 
+@Test
+public void testUnsubscribeWithoutGroupId() {
+consumer = newConsumerWithoutGroupId();
+completeFetchedCommittedOffsetApplicationEventExceptionally(new 
TimeoutException());
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));

Review Comment:
   Yeah, removed unused mock. Thanks for the review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   I'm not sure we would ever be able to achieve this path we're mocking here 
with a real consumer, so wondering if this is even a valid scenario to test? 
(this test, and the ones below for fenced and stale too).  
   
   A member in fatal/fenced/stale state means it was part of a group (automatic 
assignment after subscribe), and then we're mocking a manual assignment that 
takes effect and is cleared on leaveGroup. I expect that would never be 
possible because the consumer does not allow to mix the subscription types, so 
the call to assign would fail before actually updating the assignment and 
numAssignedPartitions (a consumer would have to unsubscribe or assign with 
empty to then be able to manually assign the topic0 partition) 
   
   Just for the record, I totally see the value in the 
`testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and 
it's a valid combination since it's not mixing subscription types, but I 
struggle to see this call to assign happening when a member is 
fatal/fenced/stale?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on PR #16449:
URL: https://github.com/apache/kafka/pull/16449#issuecomment-2204975848

   Hey @FrankYang0529, thanks a lot for the fix! Left some comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1663411634


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() {
 verify(backgroundEventReaper).reap(time.milliseconds());
 }
 
+@Test
+public void testUnsubscribeWithoutGroupId() {
+consumer = newConsumerWithoutGroupId();
+completeFetchedCommittedOffsetApplicationEventExceptionally(new 
TimeoutException());
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));

Review Comment:
   do we need this? I wouldn't expect we have to mock anything related to 
committed offsets or fetching because we're not polling in the test (or using 
committed offsets in any way) right?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   I'm not sure we would ever be able to achieve this path we're mocking here 
with a real consumer, so wondering if this is even a valid scenario to test? 
(this test, and the ones below for fenced and stale too).  
   
   A member in fatal/fenced/stale state means it was part of a group (automatic 
assignment after subscribe), and then we're mocking a manual assignment and 
takes effect and is cleared on leaveGroup. I expect that would never be 
possible because the consumer does not allow to mix the subscription types, so 
the call to assign would fail before actually updating the assignment and 
numAssignedPartitions (a consumer would have to unsubscribe or assign with 
empty to then be able to manually assign the topic0 partition) 
   
   Just for the record, I totally see the value in the 
`testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and 
it's a valid combination since it's not mixing subscription types, but I 
struggle to see this call to assign happening when a member is 
fatal/fenced/stale?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org