[ 
https://issues.apache.org/jira/browse/KAFKA-15998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Haapala updated KAFKA-15998:
-------------------------------------
    Attachment: image-2024-12-16-15-36-47-059.png

> EAGER rebalance onPartitionsAssigned() called with no previous 
> onPartitionsLost() nor onPartitionsRevoked()
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15998
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15998
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.4.0
>            Reporter: Jonathan Haapala
>            Priority: Major
>         Attachments: image-2024-12-16-15-36-47-059.png
>
>
> I ran into a case where {{onPartitionsAssigned()}} was called without first 
> calling {{onPartitionsRevoked()}} and there is no indication that 
> {{onPartitionsLost()}} was called or had any reason to be called. We are 
> using the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0.
> Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}:
> {quote}In eager rebalancing, it will always be called at the start of a 
> rebalance and after the consumer stops fetching data.
> {quote}
> We internally keep track of partition states with a state machine, and rely 
> on these APIs to assert what expected states we are in. So when a partition 
> is Revoked and then re-Assigned, we know that we kept ownership. Moreover, if 
> we are assigned partitions in EAGER rebalancing, we expect that entire 
> assignment is passed to {{{}onPartitionsAssigned(){}}}, because if 
> {{onPartitionsRevoked()}} is always called at the start of a rebalance and 
> EAGER protocol always revokes the entire assignment, then by the time we hit 
> {{onPartitionsAssigned()}} there should be nothing assigned from the 
> consumer's point of view, and therefore the entire assignment is newly added.
> However, we recently ran into a situation where we received an assignment 
> while the consumer's existing assignment was non-empty:
> |     *Pod*|                                       *Message*|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Notifying assignor about the {*}new 
> Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
> topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, 
> topic-123, topic-130, topic-137, topic-141])|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, 
> topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
> topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
> topic-141|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} 
> [kafka-coordinator-heartbeat-thread \\| metric-aggregator] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] *Request joining group* due to: group is already 
> rebalancing|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Successfully joined group with generation 
> Generation\{generationId=12417, 
> memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
>  protocol='sticky'}|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Successfully synced group in generation 
> Generation{generationId={*}12417{*}, 
> memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
>  protocol='sticky'}|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Notifying assignor about the {*}new 
> Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
> topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: |
> Here you can see we get assigned partitions:
>   26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141
> And promptly see them all as newly added when passed to 
> {{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices 
> another rebalance and requests to join. It quickly succeeds and then almost 
> immediately successfully syncs. We then get a new assignment:
>   26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117
> This is a subset of the partitions we were assigned previously, missing 123, 
> 130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the 
> beginning of this rebalance, the consumer still has the old assignment as its 
> current assignment rather than it being empty, and so it thinks there are no 
> newly assigned partitions.
> Using this diagram from 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceProtocol:Stop-The-WorldEffect]
>  as a visual guide, it seems like we sent the JoinGroup and succeeded in 
> joining, but then we seemingly skipped to the SyncGroup and got our 
> assignment.  
> !https://cwiki.apache.org/confluence/download/attachments/103090108/Rebalance%20Today.jpg?version=1&modificationDate=1554837450000&api=v2!
> Here are the group coordinator assignment logs for the initial assignment and 
> then the assignment without a revoke. You can see they are sequential 
> generations 12416 and 12417, so none are missed.
> ||Pod||Message||
> |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:25,709\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Finished assignment for group at generation 
> {*}12416{*}: 
> {consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
>  topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, 
> topic-136, topic-27, topic-56, topic-119, topic-22, topic-65, topic-43, 
> topic-64]), 
> consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
>  topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, 
> topic-106, topic-110, topic-114, topic-118, topic-124, topic-131, topic-138, 
> topic-142]), 
> consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
>  topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, 
> topic-107, topic-111, topic-115, topic-120, topic-125, topic-132, topic-139, 
> topic-143]), 
> consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
>  topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, 
> topic-52, topic-63, topic-69, topic-87, topic-91, topic-93, topic-96, 
> topic-98]), 
> consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
>  topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
> topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
> topic-141]), 
> consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
>  topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, 
> topic-36, topic-47, topic-48, topic-50, topic-51, topic-53, topic-59, 
> topic-61]), 
> consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
>  topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, 
> topic-104, topic-108, topic-112, topic-116, topic-122, topic-127, topic-135, 
> topic-140]), 
> consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
>  topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, 
> topic-55, topic-67, topic-75, topic-89, topic-92, topic-94, topic-97, 
> topic-100]), 
> consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
>  topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, 
> topic-41, topic-58, topic-121, topic-24, topic-84, topic-76, topic-126])}|
> |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:32,119\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Finished assignment for group at generation 
> {*}12417{*}: 
> {consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
>  topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, 
> topic-136, topic-27, topic-56, topic-119]), 
> consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
>  topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, 
> topic-106, topic-110, topic-114, topic-118]), 
> consumer.metric-data-points.metric-aggregator-76fd27b3-fb5b-49aa-af2a-b53150897cf9=Assignment(partitions=[topic-22,
>  topic-51, topic-61, topic-76, topic-92, topic-96, topic-100, topic-124, 
> topic-127, topic-132, topic-138, topic-141]), 
> consumer.metric-data-points.metric-aggregator-9f6ab144-85a7-468c-9aa1-11f5445084b9=Assignment(partitions=[topic-43,
>  topic-59, topic-65, topic-91, topic-94, topic-98, topic-123, topic-126, 
> topic-131, topic-137, topic-140, topic-143]), 
> consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
>  topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, 
> topic-107, topic-111, topic-115, topic-120]), 
> consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
>  topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, 
> topic-52, topic-63, topic-69, topic-87]), 
> consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
>  topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
> topic-105, topic-109, topic-113, topic-117]), 
> consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
>  topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, 
> topic-36, topic-47, topic-48, topic-50]), 
> consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
>  topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, 
> topic-104, topic-108, topic-112, topic-116]), 
> consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
>  topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, 
> topic-55, topic-67, topic-75, topic-89]), 
> consumer.metric-data-points.metric-aggregator-91b1e207-0978-430e-9393-679ac44647b8=Assignment(partitions=[topic-24,
>  topic-53, topic-64, topic-84, topic-93, topic-97, topic-122, topic-125, 
> topic-130, topic-135, topic-139, topic-142]), 
> consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
>  topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, 
> topic-41, topic-58, topic-121])}|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to