Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
chia7712 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1676503836 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsUnsubscribe() { +MembershipManagerImpl membershipManager = new MembershipManagerImpl( +GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), +subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(), +backgroundEventHandler, time, rebalanceMetricsManager); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsFenced() { +MembershipManagerImpl membershipManager = createMemberInStableState(); +ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); +mockFencedMemberStuckOnUserCallback(membershipManager, invoker); +assertEquals(MemberState.FENCED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsStale() { Review Comment: > Let's keep the testLeaveGroupWhenStateIsFatal, we didn't have coverage for that before this PR It seems to me `testLeaveGroupWhenStateIsFatal` is used to verify "leaveGroup" after it is in fatal state. There are many existent cases which can offer the scenario. For example: `testFatalFailureWhenMemberAlreadyLeft`. Hence, we can add `leaveGroup` and `verify(subscriptionState)` to those test cases after the state is in fatal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1676110674 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsUnsubscribe() { +MembershipManagerImpl membershipManager = new MembershipManagerImpl( +GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), +subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(), +backgroundEventHandler, time, rebalanceMetricsManager); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsFenced() { +MembershipManagerImpl membershipManager = createMemberInStableState(); +ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); +mockFencedMemberStuckOnUserCallback(membershipManager, invoker); +assertEquals(MemberState.FENCED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsStale() { Review Comment: one last: the `testLeaveGroupWhenStateIsUnsubscribe` could be removed too, we already have `testLeaveGroupWhenMemberAlreadyLeft` for that. (Let's keep the `testLeaveGroupWhenStateIsFatal`, we didn't have coverage for that before this PR, thanks!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1676082409 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsUnsubscribe() { +MembershipManagerImpl membershipManager = new MembershipManagerImpl( +GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), +subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(), +backgroundEventHandler, time, rebalanceMetricsManager); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsFenced() { +MembershipManagerImpl membershipManager = createMemberInStableState(); +ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); +mockFencedMemberStuckOnUserCallback(membershipManager, invoker); +assertEquals(MemberState.FENCED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsStale() { Review Comment: Agree. Removed them. Thanks for review and finding this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1676058719 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsUnsubscribe() { +MembershipManagerImpl membershipManager = new MembershipManagerImpl( +GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), +subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(), +backgroundEventHandler, time, rebalanceMetricsManager); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsFenced() { +MembershipManagerImpl membershipManager = createMemberInStableState(); +ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); +mockFencedMemberStuckOnUserCallback(membershipManager, invoker); +assertEquals(MemberState.FENCED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsStale() { Review Comment: sorry a bit late, but I noticed now that this test is a duplicate, we already have a `testLeaveGroupWhenMemberIsStale`, that you updated to verify the unsubscribe, so I would say we just remove this one (and `testLeaveGroupWhenStateIsFenced`, that seems a dup of `testLeaveGroupWhenMemberFenced`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1676058719 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2317,49 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsUnsubscribe() { +MembershipManagerImpl membershipManager = new MembershipManagerImpl( +GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), +subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(), +backgroundEventHandler, time, rebalanceMetricsManager); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsFenced() { +MembershipManagerImpl membershipManager = createMemberInStableState(); +ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); +mockFencedMemberStuckOnUserCallback(membershipManager, invoker); +assertEquals(MemberState.FENCED, membershipManager.state()); + +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); +} + +@Test +public void testLeaveGroupWhenStateIsStale() { Review Comment: sorry a bit late, but I noticed now that this test is a duplicate, we already have a `testLeaveGroupWhenMemberIsStale`, that you updated to verify the unsubscribe, so I would say we just remove this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1674145282 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1501,11 +1501,11 @@ public void unsubscribe() { private void resetGroupMetadata() { groupMetadata.updateAndGet( -oldGroupMetadataOptional -> oldGroupMetadataOptional -.map(oldGroupMetadata -> initializeConsumerGroupMetadata( -oldGroupMetadata.groupId(), -oldGroupMetadata.groupInstanceId() -)) +oldGroupMetadataOptional -> oldGroupMetadataOptional +.map(oldGroupMetadata -> initializeConsumerGroupMetadata( +oldGroupMetadata.groupId(), +oldGroupMetadata.groupInstanceId() +)) Review Comment: Yeah, we should revert this change. Thanks for the comment. I have rebased trunk branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on PR #16449: URL: https://github.com/apache/kafka/pull/16449#issuecomment-2223050194 Hey @FrankYang0529 , thanks for the updates. Only one nit left. Also I notice the build did not complete, please merge trunk latest changes too and let's see what we get on the next run. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1674077031 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1501,11 +1501,11 @@ public void unsubscribe() { private void resetGroupMetadata() { groupMetadata.updateAndGet( -oldGroupMetadataOptional -> oldGroupMetadataOptional -.map(oldGroupMetadata -> initializeConsumerGroupMetadata( -oldGroupMetadata.groupId(), -oldGroupMetadata.groupInstanceId() -)) +oldGroupMetadataOptional -> oldGroupMetadataOptional +.map(oldGroupMetadata -> initializeConsumerGroupMetadata( +oldGroupMetadata.groupId(), +oldGroupMetadata.groupInstanceId() +)) Review Comment: format is off here right? (I guess it's because you had it inside an `if` at some point, but was then removed). I expect we should end up with the `resetGroupMetadata` unchanged in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1670789098 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ## @@ -25,62 +25,98 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; import org.apache.kafka.common.utils.LogContext; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private ApplicationEventProcessor processor; private CommitRequestManager commitRequestManager; private HeartbeatRequestManager heartbeatRequestManager; private MembershipManager membershipManager; +private SubscriptionState subscriptionState; -@BeforeEach -public void setup() { +private void setupProcessor(boolean withGroupId) { LogContext logContext = new LogContext(); OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class); TopicMetadataRequestManager topicMetadataRequestManager = mock(TopicMetadataRequestManager.class); FetchRequestManager fetchRequestManager = mock(FetchRequestManager.class); -CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); -commitRequestManager = mock(CommitRequestManager.class); -heartbeatRequestManager = mock(HeartbeatRequestManager.class); -membershipManager = mock(MembershipManager.class); -RequestManagers requestManagers = new RequestManagers( -logContext, -offsetsRequestManager, -topicMetadataRequestManager, -fetchRequestManager, -Optional.of(coordinatorRequestManager), -Optional.of(commitRequestManager), -Optional.of(heartbeatRequestManager), -Optional.of(membershipManager) -); +RequestManagers requestManagers; +if (withGroupId) { +CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); +commitRequestManager = mock(CommitRequestManager.class); +membershipManager = mock(MembershipManager.class); +heartbeatRequestManager = mock(HeartbeatRequestManager.class); +subscriptionState = mock(SubscriptionState.class); Review Comment: Thanks for the suggestion. I update it and make it more simple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1670787996 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1499,14 +1499,16 @@ public void unsubscribe() { } } -private void resetGroupMetadata() { -groupMetadata.updateAndGet( -oldGroupMetadataOptional -> oldGroupMetadataOptional -.map(oldGroupMetadata -> initializeConsumerGroupMetadata( -oldGroupMetadata.groupId(), -oldGroupMetadata.groupInstanceId() -)) -); +private void maybeResetGroupMetadata() { +if (groupMetadata.get().isPresent()) { Review Comment: Thanks for the suggestion. I didn't notice that. Yeah, we don't need to check whether `groupMetadata` is present here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on PR #16449: URL: https://github.com/apache/kafka/pull/16449#issuecomment-2218046299 Thanks for the updates @FrankYang0529 , left a suggestion for consideration. Also could you please rebase to trigger the build? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1670733263 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ## @@ -25,62 +25,98 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; import org.apache.kafka.common.utils.LogContext; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private ApplicationEventProcessor processor; private CommitRequestManager commitRequestManager; private HeartbeatRequestManager heartbeatRequestManager; private MembershipManager membershipManager; +private SubscriptionState subscriptionState; -@BeforeEach -public void setup() { +private void setupProcessor(boolean withGroupId) { LogContext logContext = new LogContext(); OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class); TopicMetadataRequestManager topicMetadataRequestManager = mock(TopicMetadataRequestManager.class); FetchRequestManager fetchRequestManager = mock(FetchRequestManager.class); -CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); -commitRequestManager = mock(CommitRequestManager.class); -heartbeatRequestManager = mock(HeartbeatRequestManager.class); -membershipManager = mock(MembershipManager.class); -RequestManagers requestManagers = new RequestManagers( -logContext, -offsetsRequestManager, -topicMetadataRequestManager, -fetchRequestManager, -Optional.of(coordinatorRequestManager), -Optional.of(commitRequestManager), -Optional.of(heartbeatRequestManager), -Optional.of(membershipManager) -); +RequestManagers requestManagers; +if (withGroupId) { +CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); +commitRequestManager = mock(CommitRequestManager.class); +membershipManager = mock(MembershipManager.class); +heartbeatRequestManager = mock(HeartbeatRequestManager.class); +subscriptionState = mock(SubscriptionState.class); Review Comment: we don't need these vars, so what about we remove them all and pass the mock directly to the requestManagers constructor? Actually, we could consider simplifying more and reduce the whole block from ln 57 to ln 85 to a much shorter single path for creating the requestManagers: ``` RequestManagers requestManagers = new RequestManagers( logContext, offsetsRequestManager, topicMetadataRequestManager, fetchRequestManager, withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(mock(CommitRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(mock(HeartbeatRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(mock(MembershipManager.class)) : Optional.empty()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1670733263 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ## @@ -25,62 +25,98 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; import org.apache.kafka.common.utils.LogContext; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private ApplicationEventProcessor processor; private CommitRequestManager commitRequestManager; private HeartbeatRequestManager heartbeatRequestManager; private MembershipManager membershipManager; +private SubscriptionState subscriptionState; -@BeforeEach -public void setup() { +private void setupProcessor(boolean withGroupId) { LogContext logContext = new LogContext(); OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class); TopicMetadataRequestManager topicMetadataRequestManager = mock(TopicMetadataRequestManager.class); FetchRequestManager fetchRequestManager = mock(FetchRequestManager.class); -CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); -commitRequestManager = mock(CommitRequestManager.class); -heartbeatRequestManager = mock(HeartbeatRequestManager.class); -membershipManager = mock(MembershipManager.class); -RequestManagers requestManagers = new RequestManagers( -logContext, -offsetsRequestManager, -topicMetadataRequestManager, -fetchRequestManager, -Optional.of(coordinatorRequestManager), -Optional.of(commitRequestManager), -Optional.of(heartbeatRequestManager), -Optional.of(membershipManager) -); +RequestManagers requestManagers; +if (withGroupId) { +CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); +commitRequestManager = mock(CommitRequestManager.class); +membershipManager = mock(MembershipManager.class); +heartbeatRequestManager = mock(HeartbeatRequestManager.class); +subscriptionState = mock(SubscriptionState.class); Review Comment: we don't need these vars, so what about we remove them all and pass the mock directly to the requestManagers constructor? Actually, we could consider simplifying more and reduce the whole block from ln 57 to ln 85 to a much shorter single path for creating the requestManagers: ```suggestion RequestManagers requestManagers = new RequestManagers( logContext, offsetsRequestManager, topicMetadataRequestManager, fetchRequestManager, withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(mock(CommitRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(mock(HeartbeatRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(mock(MembershipManager.class)) : Optional.empty()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1670689743 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1499,14 +1499,16 @@ public void unsubscribe() { } } -private void resetGroupMetadata() { -groupMetadata.updateAndGet( -oldGroupMetadataOptional -> oldGroupMetadataOptional -.map(oldGroupMetadata -> initializeConsumerGroupMetadata( -oldGroupMetadata.groupId(), -oldGroupMetadata.groupInstanceId() -)) -); +private void maybeResetGroupMetadata() { +if (groupMetadata.get().isPresent()) { Review Comment: We do want this semantics, but I would say we don't need the check because we already get it from the `.map` call on the Optional right below (it will only map if there is a value) ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1499,14 +1499,16 @@ public void unsubscribe() { } } -private void resetGroupMetadata() { -groupMetadata.updateAndGet( -oldGroupMetadataOptional -> oldGroupMetadataOptional -.map(oldGroupMetadata -> initializeConsumerGroupMetadata( -oldGroupMetadata.groupId(), -oldGroupMetadata.groupInstanceId() -)) -); +private void maybeResetGroupMetadata() { +if (groupMetadata.get().isPresent()) { Review Comment: We do want this semantic, but I would say we don't need the check because we already get it from the `.map` call on the Optional right below (it will only map if there is a value) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597909 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1476,21 +1478,19 @@ public void unsubscribe() { acquireAndEnsureOpen(); try { fetchBuffer.retainAll(Collections.emptySet()); -if (groupMetadata.get().isPresent()) { -Timer timer = time.timer(Long.MAX_VALUE); -UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); -applicationEventHandler.add(unsubscribeEvent); -log.info("Unsubscribing all topics or patterns and assigned partitions {}", +Timer timer = time.timer(Long.MAX_VALUE); +UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); +applicationEventHandler.add(unsubscribeEvent); +log.info("Unsubscribing all topics or patterns and assigned partitions {}", subscriptions.assignedPartitions()); -try { -processBackgroundEvents(unsubscribeEvent.future(), timer); -log.info("Unsubscribed all topics or patterns and assigned partitions"); -} catch (TimeoutException e) { -log.error("Failed while waiting for the unsubscribe event to complete"); -} -resetGroupMetadata(); +try { +processBackgroundEvents(unsubscribeEvent.future(), timer); +log.info("Unsubscribed all topics or patterns and assigned partitions"); +} catch (TimeoutException e) { +log.error("Failed while waiting for the unsubscribe event to complete"); } +if (groupMetadata.get().isPresent()) resetGroupMetadata(); Review Comment: Change `resetGroupMetadata` to `maybeResetGroupMetadata` and move `groupMetadata.get().isPresent()` to it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597763 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -213,14 +217,16 @@ private void process(final SubscriptionChangeEvent ignored) { * the group is sent out. */ private void process(final UnsubscribeEvent event) { -if (!requestManagers.heartbeatRequestManager.isPresent()) { -KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event"); -event.future().completeExceptionally(error); -return; +if (requestManagers.heartbeatRequestManager.isPresent()) { +MembershipManager membershipManager = requestManagers.heartbeatRequestManager.get().membershipManager(); +CompletableFuture future = membershipManager.leaveGroup(); +future.whenComplete(complete(event.future())); +} else { +// If the group membership manager is not present, we can't send the leave group request, Review Comment: Update the comment. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597826 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ## @@ -69,7 +70,8 @@ public void setup() { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, -metadata +metadata, +mock(SubscriptionState.class) Review Comment: Add related test cases. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667045593 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1476,21 +1478,19 @@ public void unsubscribe() { acquireAndEnsureOpen(); try { fetchBuffer.retainAll(Collections.emptySet()); -if (groupMetadata.get().isPresent()) { -Timer timer = time.timer(Long.MAX_VALUE); -UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); -applicationEventHandler.add(unsubscribeEvent); -log.info("Unsubscribing all topics or patterns and assigned partitions {}", +Timer timer = time.timer(Long.MAX_VALUE); +UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); +applicationEventHandler.add(unsubscribeEvent); +log.info("Unsubscribing all topics or patterns and assigned partitions {}", subscriptions.assignedPartitions()); -try { -processBackgroundEvents(unsubscribeEvent.future(), timer); -log.info("Unsubscribed all topics or patterns and assigned partitions"); -} catch (TimeoutException e) { -log.error("Failed while waiting for the unsubscribe event to complete"); -} -resetGroupMetadata(); +try { +processBackgroundEvents(unsubscribeEvent.future(), timer); +log.info("Unsubscribed all topics or patterns and assigned partitions"); +} catch (TimeoutException e) { +log.error("Failed while waiting for the unsubscribe event to complete"); } +if (groupMetadata.get().isPresent()) resetGroupMetadata(); Review Comment: nit: this `resetGroupMetadata` is only used here I believe. We could simply have a "maybeResetGroupMetadata();" call here, and move the check for groupMetadata.get().isPresent() into it? seems cleaner to me but totally up to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667043062 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ## @@ -69,7 +70,8 @@ public void setup() { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, -metadata +metadata, +mock(SubscriptionState.class) Review Comment: Could we add a test for the UnsubscribeEvent processing to cover what we expect on the 2 paths: - if groupId => verify(membershipManager).leaveGroup(); - else => verify(subscriptionState).unsubscribe(); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667039196 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -213,14 +217,16 @@ private void process(final SubscriptionChangeEvent ignored) { * the group is sent out. */ private void process(final UnsubscribeEvent event) { -if (!requestManagers.heartbeatRequestManager.isPresent()) { -KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event"); -event.future().completeExceptionally(error); -return; +if (requestManagers.heartbeatRequestManager.isPresent()) { +MembershipManager membershipManager = requestManagers.heartbeatRequestManager.get().membershipManager(); +CompletableFuture future = membershipManager.leaveGroup(); +future.whenComplete(complete(event.future())); +} else { +// If the group membership manager is not present, we can't send the leave group request, Review Comment: nit for a clearer comment maybe: what about just stating that If the consumer is not using the group management capabilities, we still need to clear all assignments it may have? (seems simpler, to the root cause and action needed). The "can't send leave group" is true, but that's just one of the many actions we can't (and actually don't need to) take just because the consumer is not using group mgmt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667035370 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -213,14 +217,14 @@ private void process(final SubscriptionChangeEvent ignored) { * the group is sent out. */ private void process(final UnsubscribeEvent event) { -if (!requestManagers.heartbeatRequestManager.isPresent()) { -KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event"); -event.future().completeExceptionally(error); -return; +event.future().whenComplete((__, ___) -> subscriptions.unsubscribe()); +if (requestManagers.heartbeatRequestManager.isPresent()) { +MembershipManager membershipManager = requestManagers.heartbeatRequestManager.get().membershipManager(); +CompletableFuture future = membershipManager.leaveGroup(); +future.whenComplete(complete(event.future())); +} else { +event.future().complete(null); Review Comment: Agree, and we should clarify it here, and in the UnsubscribeEvent itself I would say. I checked the comments on both, and seems ok if we keep them like they are (all holds true), and just add a final sentence to describe that if no group ID has been defined by the consumer, the UnsubscribeEvent will still ensure that all topics and assigned partitions are cleared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1667008009 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -687,7 +687,6 @@ public CompletableFuture leaveGroup() { "to clear its assignment and send a leave group heartbeat", memberId); } // Clear the subscription, no matter if the callback execution failed or succeeded. -subscriptions.unsubscribe(); Review Comment: Okay. I revert the change in `MembershipManagerImpl.java#leaveGroup` and call `subscriptions.unsubscribe()` directly in `ApplicationEventProcessor#process` if `heartbeatRequestManager` is not present. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
chia7712 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1666993636 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -687,7 +687,6 @@ public CompletableFuture leaveGroup() { "to clear its assignment and send a leave group heartbeat", memberId); } // Clear the subscription, no matter if the callback execution failed or succeeded. -subscriptions.unsubscribe(); Review Comment: This change means `leaveGroup` does NOT cleanup any subscribed/assigned partitions from `SubscriptionState`. It seems to me that is a bit weird since that is `leaveGroup`'s responsibility to ensure the cleanup of subscribed/assigned partitions. Maybe another solution is to add `subscriptions.unsubscribe` to all paths of `leaveGroup`. for example: ```java if (isNotInGroup()) { if (state == MemberState.FENCED) { clearAssignment(); transitionTo(MemberState.UNSUBSCRIBED); } subscriptions.unsubscribe(); return CompletableFuture.completedFuture(null); } ``` Also, in the `process(UnsubscribeEvent)` we can call `subscriptions.unsubscribe();` directly if `MembershipMgr` is nonexistent. WDYT? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -213,14 +217,14 @@ private void process(final SubscriptionChangeEvent ignored) { * the group is sent out. */ private void process(final UnsubscribeEvent event) { -if (!requestManagers.heartbeatRequestManager.isPresent()) { -KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event"); -event.future().completeExceptionally(error); -return; +event.future().whenComplete((__, ___) -> subscriptions.unsubscribe()); +if (requestManagers.heartbeatRequestManager.isPresent()) { +MembershipManager membershipManager = requestManagers.heartbeatRequestManager.get().membershipManager(); +CompletableFuture future = membershipManager.leaveGroup(); +future.whenComplete(complete(event.future())); +} else { +event.future().complete(null); Review Comment: This is another behavior change: `UnsubscribeEvent` can be sent even though `membershipManager` is not running Hence, please add comments to explain it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1666979963 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1490,6 +1490,8 @@ public void unsubscribe() { log.error("Failed while waiting for the unsubscribe event to complete"); } resetGroupMetadata(); +} else { Review Comment: Hi @chia7712, sorry, I didn't notice that. I introduce `SubscriptionState` to `ApplicationEventProcessor`, call `subscriptions.unsubscribe()` for `UnsubscribeEvent`, and remove `subscriptions.unsubscribe()` from `MembershipManagerImpl#leaveGroup`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
chia7712 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1666864664 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1490,6 +1490,8 @@ public void unsubscribe() { log.error("Failed while waiting for the unsubscribe event to complete"); } resetGroupMetadata(); +} else { Review Comment: @FrankYang0529 Please take a look at @lianetm comments: > we would have to end up calling the subcriptionState.unsubscribe from the ApplicationEventProcessor where we process(UnsubscribeEvent). We wouldn't be able to reuse the MembershipMgr logic (which is part of what I had in mind when commenting on the jira), because if there is no groupId we don't even have an instance of the MembershipMgr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1666855070 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1490,6 +1490,8 @@ public void unsubscribe() { log.error("Failed while waiting for the unsubscribe event to complete"); } resetGroupMetadata(); +} else { Review Comment: Hi @chia7712 and @lianetm, it looks like we can't use `UnsubscribeEvent` when group id is empty, because `heartbeatRequestManager` will be empty. The test case `PlaintextConsumerTest#testConsumingWithNullGroupId` can't pass. https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L215-L220 https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java#L161-L208 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1666386551 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1490,6 +1490,8 @@ public void unsubscribe() { log.error("Failed while waiting for the unsubscribe event to complete"); } resetGroupMetadata(); +} else { Review Comment: Hi @chia7712, thanks for the suggestion. I also updated related test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
chia7712 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665964592 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1490,6 +1490,8 @@ public void unsubscribe() { log.error("Failed while waiting for the unsubscribe event to complete"); } resetGroupMetadata(); +} else { Review Comment: I like the idea too. > we would have to end up calling the subcriptionState.unsubscribe from the ApplicationEventProcessor where we process(UnsubscribeEvent). We wouldn't be able to reuse the MembershipMgr logic (which is part of what I had in mind when commenting on the jira), because if there is no groupId we don't even have an instance of the MembershipMgr. nice point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665958301 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1490,6 +1490,8 @@ public void unsubscribe() { log.error("Failed while waiting for the unsubscribe event to complete"); } resetGroupMetadata(); +} else { Review Comment: yeap, I really liked that idea when I shared it. We should only note that if we take that path, we would have to end up calling the `subcriptionState.unsubscribe` from the `ApplicationEventProcessor` where we process(UnsubscribeEvent). We wouldn't be able to reuse the `MembershipMgr` logic (which is part of what I had in mind when commenting on the jira), because if there is no groupId we don't even have an instance of the MembershipMgr. Still, I like the idea because 1) simplifies the consumer.unsubscribe, 2) all subscriptionState.unsubscribe calls are in the background thread associated to the UnsubscribeEvent (directly if no groupId, indirectly if groupId present, delegating on the membershipMgr). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
chia7712 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665940798 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1490,6 +1490,8 @@ public void unsubscribe() { log.error("Failed while waiting for the unsubscribe event to complete"); } resetGroupMetadata(); +} else { Review Comment: inspired by @lianetm comments in jira. Maybe we can leverage `UnsubscribeEvent` to handle the `unsubscribe`. For example: ```java public void unsubscribe() { acquireAndEnsureOpen(); try { fetchBuffer.retainAll(Collections.emptySet()); Timer timer = time.timer(Long.MAX_VALUE); UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); applicationEventHandler.add(unsubscribeEvent); log.info("Unsubscribing all topics or patterns and assigned partitions {}", subscriptions.assignedPartitions()); try { processBackgroundEvents(unsubscribeEvent.future(), timer); log.info("Unsubscribed all topics or patterns and assigned partitions"); } catch (TimeoutException e) { log.error("Failed while waiting for the unsubscribe event to complete"); } if (groupMetadata.get().isPresent()) resetGroupMetadata(); } catch (Exception e) { log.error("Unsubscribe failed", e); throw e; } finally { release(); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665795593 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2121,6 +2136,8 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm assertTrue(membershipManager.currentAssignment().isNone()); assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty()); assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch()); +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); Review Comment: Thank you for the detailed explanation. Sorry that I misunderstood your previous comment. I need to read KIP-848 again to catch up more detailed logic here. Could you also check that whether other places use correct `verify(subscriptionState).unsubscribe();` statement? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665795593 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2121,6 +2136,8 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm assertTrue(membershipManager.currentAssignment().isNone()); assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty()); assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch()); +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); Review Comment: Thank you for the detailed explanation. Sorry that I misunderstood your previous comment. I need to read KIP-848 again to catch up more detailed logic here. Could you also check that whether other places use correct `verify(subscriptionState).unsubscribe();` statement. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665774645 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2121,6 +2136,8 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm assertTrue(membershipManager.currentAssignment().isNone()); assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty()); assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch()); +membershipManager.leaveGroup(); +verify(subscriptionState).unsubscribe(); Review Comment: uhm I would say we don't want to force a leave group here, because it changes the semantics of all the tests around stale members. All stale members send a leave group request to the broker (therefore the test name), but not all stale members receive an API-triggered call to leaveGroup (diff stuff). So we have 2 scenarios for stale members here: 1. member becomes stale + consumer poll -> it will rejoin the group reusing the same subscription it had. This is probably the common case, working fine before this PR (stale clears assignment only when it completes callbacks [here](https://github.com/apache/kafka/blob/cb63fdb77ed869f2c519ca002ca757425b6b3076/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L900)) 1. member goes STALE + consumer leaves group before the stale member has the chance to rejoin -> we want to clear the subscription, because of the "external" leave group call. This is the edge case this PR fixes and adds tests for, you already added `testLeaveGroupWhenStateIsStale`. Actually, for the common case 1 of stale members that will rejoin (do not receive a forced call to leaveGroup), we should `verify(subscriptionState, never()).unsubscribe();`.We should add that line here instead. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665591893 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: Thanks for the suggestion. I add `verify(subscriptionState).unsubscribe();` to test cases which calling `leaveGroup`. The only different one is `testLeaveGroupWhenMemberAlreadyLeaving`. I use `verify(subscriptionState, never()).unsubscribe();` for second leave attempt. Could you take a look again? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665069309 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: Cool. Related although not introduced by this PR, this makes me notice that we don't have this full story covered in the tests (not testing the call to `subscriptionState.unsubscribed` when `isNotInGroup` is false). Would you mind adding the same verification you added for `notInGroup` true, to the func that covers this for `notInGroup` false? Should be the same `verify(subscriptionState).unsubscribe();` but added to `assertStaleMemberLeavesGroupAndClearsAssignment` I would say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301020 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: Yeah, I agree we should not have test cases which are not real cases. I change `spy` back to `mock` and only test `subscriptionState#unsubscribe` is called when `isNotInGroup` is true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301811 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() { verify(backgroundEventReaper).reap(time.milliseconds()); } +@Test +public void testUnsubscribeWithoutGroupId() { +consumer = newConsumerWithoutGroupId(); +completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Review Comment: Yeah, removed unused mock. Thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: I'm not sure we would ever be able to achieve this path we're mocking here with a real consumer, so wondering if this is even a valid scenario to test? (this test, and the ones below for fenced and stale too). A member in fatal/fenced/stale state means it was part of a group (automatic assignment after subscribe), and then we're mocking a manual assignment that takes effect and is cleared on leaveGroup. I expect that would never be possible because the consumer does not allow to mix the subscription types, so the call to assign would fail before actually updating the assignment and numAssignedPartitions (a consumer would have to unsubscribe or assign with empty to then be able to manually assign the topic0 partition) Just for the record, I totally see the value in the `testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and it's a valid combination since it's not mixing subscription types, but I struggle to see this call to assign happening when a member is fatal/fenced/stale? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on PR #16449: URL: https://github.com/apache/kafka/pull/16449#issuecomment-2204975848 Hey @FrankYang0529, thanks a lot for the fix! Left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1663411634 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() { verify(backgroundEventReaper).reap(time.milliseconds()); } +@Test +public void testUnsubscribeWithoutGroupId() { +consumer = newConsumerWithoutGroupId(); +completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Review Comment: do we need this? I wouldn't expect we have to mock anything related to committed offsets or fetching because we're not polling in the test (or using committed offsets in any way) right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: I'm not sure we would ever be able to achieve this path we're mocking here with a real consumer, so wondering if this is even a valid scenario to test? (this test, and the ones below for fenced and stale too). A member in fatal/fenced/stale state means it was part of a group (automatic assignment after subscribe), and then we're mocking a manual assignment and takes effect and is cleared on leaveGroup. I expect that would never be possible because the consumer does not allow to mix the subscription types, so the call to assign would fail before actually updating the assignment and numAssignedPartitions (a consumer would have to unsubscribe or assign with empty to then be able to manually assign the topic0 partition) Just for the record, I totally see the value in the `testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and it's a valid combination since it's not mixing subscription types, but I struggle to see this call to assign happening when a member is fatal/fenced/stale? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org