[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mayank Shekhar Narula updated KAFKA-15824: ------------------------------------------ Description: As can be [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] doesn't check if partition is subscribed. It can be done by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check makes writing thread-safe code w.r.t SubscriptionState class awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set<TopicPartition> allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} was: As can be [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check makes writing thread-safe code w.r.t SubscriptionState class awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set<TopicPartition> allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > ------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Mayank Shekhar Narula > Assignee: Mayank Shekhar Narula > Priority: Major > Fix For: 3.7.0 > > > As can be > [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459] > doesn't check if partition is subscribed. It can be done by checking > TopicPartitionState cached is null or not, as done by > [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]. > So it throws IllegalStateException for a partition that is yet not > subscribed. > Lack of this check makes writing thread-safe code w.r.t SubscriptionState > class awkward. This can be seen from the example code below. For example, at > line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it > could be removed from subscribed partitions(in a separate thread). So this > forces the user of this class to handle IllegalStateException which is > awkward. > {code:java} > // Following is example code for the user of > SubscriptionState::maybeValidatePositionForCurrentLeader > Set<TopicPartition> allCurrentlySubscribedTopics = > subscriptionState.assignedPartitions(); // line 1 > if(allCurrentlySubscribedTopics.contains(tp)) { > ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = > metadata.currentLeader(tp); > try() { > subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, > leaderAndEpoch); // line 2 > } catch (IllegalStateException e) { > // recover from it. // line 3 > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)