[ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
------------------------------------------
    Description: 
As can be 
[maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
 doesn't check if partition is subscribed. It can be done by checking 
TopicPartitionState cached is null or not, as done by 
[maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check makes writing thread-safe code w.r.t SubscriptionState class 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set<TopicPartition> allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
    subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 

  was:
As can be 
[maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
 doesn't check if partition is subscribed by checking TopicPartitionState 
cached is null or not, as done by 
[maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check makes writing thread-safe code w.r.t SubscriptionState class 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set<TopicPartition> allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
    subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 


> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15824
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15824
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Mayank Shekhar Narula
>            Assignee: Mayank Shekhar Narula
>            Priority: Major
>             Fix For: 3.7.0
>
>
> As can be 
> [maybeValidatePositionForCurrentLeader|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L459]
>  doesn't check if partition is subscribed. It can be done by checking 
> TopicPartitionState cached is null or not, as done by 
> [maybeCompleteValidation|https://github.com/msn-tldr/kafka/blob/2e2f32c05008cdd7009e5f76fdd92f98996aab84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477].
>  So it throws IllegalStateException for a partition that is yet not 
> subscribed.
> Lack of this check makes writing thread-safe code w.r.t SubscriptionState 
> class awkward. This can be seen from the example code below. For example, at 
> line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it 
> could be removed from subscribed partitions(in a separate thread). So this 
> forces the user of this class to handle IllegalStateException which is 
> awkward.
> {code:java}
> // Following is example code for the user of 
> SubscriptionState::maybeValidatePositionForCurrentLeader
> Set<TopicPartition> allCurrentlySubscribedTopics = 
> subscriptionState.assignedPartitions(); // line 1
> if(allCurrentlySubscribedTopics.contains(tp)) {
>      ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
> metadata.currentLeader(tp);
>   try() {
>     subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
> leaderAndEpoch); // line 2
>   } catch (IllegalStateException e) {
>    // recover from it. // line 3
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to