lucasbru opened a new pull request, #15271:
URL: https://github.com/apache/kafka/pull/15271

   The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` 
may lose part of the server-provided assignment when metadata is delayed. The 
reason is incorrect handling of partially resolved topic names, as in this 
example:
   
    * We get assigned `T1-1` and `T2-1`
    * We reconcile `T1-1`, `T2-1` remains in `assignmentUnresolved` since the 
topic id T2 is not known yet
    * We get new cluster metadata, which includes `T2`, so `T2-1` is moved to 
`assignmentReadyToReconcile`
    * We call reconcile -- `T2-1` is now treated as the full assignment, so 
`T1-1` is being revoked
    * We end up with assignment `T2-1`, which is inconsistent with the 
broker-side target assignment.
   
   Generally, this seems to be a problem around semantics of the internal 
collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence of 
a topic in `assignmentReadyToReconcile` may either mean revocation of the topic 
partition(s), or unavailability of a topic name for the topic, depending on the 
context.
   
   This change reimplements that part of the internal state of 
`MembershipManagerImpl` with simpler and more correct invariants by using a 
single collection `currentTargetAssignment` which is based on topic IDs and 
always corresponds to the latest assignment received from the broker. During 
every attempted reconciliation, all topic IDs will be resolved from the local 
cache, which should not introduce a lot of overhead. `assignmentUnresolved` and 
`assignmentReadyToReconcile` are removed. 
   
   This change is in line with the goal of using topic IDs instead of topic 
names in the consumer internal state, and fixes the bug of losing partitions 
when delayed metadata arrives.
   
   A unit test testing the above situation is added.
   
   Note, that this change does not fully the reconciliation problems, because 
if a new assignment or new metadata arrives during an ongoing reconciliation, 
it will never be applied. This will be solved in a separate change 
(KAFKA-15832).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to