[
https://issues.apache.org/jira/browse/KAFKA-16565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True updated KAFKA-16565:
--
Reviewer: Lucas Brutschy
Description:
In {{verifiable_consumer.py}}, the
{{IncrementalAssignmentConsumerEventHandler}} contains this logic:
{code:python}
def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
{code}
If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that
isn't in the list, a generic Python list error is thrown. For now, we should
first check that the {{TopicPartition}} is in the list with an {{assert}} that
provides better information .
was:
In {{{}verifiable_consumer.py{}}}, the Incremental
{code:java}
def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
{code}
If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that
isn't in the list, an error is thrown. For now, we should first check that the
{{TopicPartition}} is in the list, and if not, log a warning or something.
> IncrementalAssignmentConsumerEventHandler throws error when attempting to
> remove a partition that isn't assigned
>
>
> Key: KAFKA-16565
> URL: https://issues.apache.org/jira/browse/KAFKA-16565
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
> Labels: consumer-threading-refactor, kip-848-client-support,
> system-tests
> Fix For: 3.8.0
>
>
> In {{verifiable_consumer.py}}, the
> {{IncrementalAssignmentConsumerEventHandler}} contains this logic:
> {code:python}
> def handle_partitions_revoked(self, event):
> self.revoked_count += 1
> self.state = ConsumerState.Rebalancing
> self.position = {}
> for topic_partition in event["partitions"]:
> topic = topic_partition["topic"]
> partition = topic_partition["partition"]
> self.assignment.remove(TopicPartition(topic, partition))
> {code}
> If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that
> isn't in the list, a generic Python list error is thrown. For now, we should
> first check that the {{TopicPartition}} is in the list with an {{assert}}
> that provides better information .
--
This message was sent by Atlassian Jira
(v8.20.10#820010)