[
https://issues.apache.org/jira/browse/KAFKA-15998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Haapala updated KAFKA-15998:
-------------------------------------
Attachment: image-2024-12-16-15-53-53-816.png
> EAGER rebalance onPartitionsAssigned() called with no previous
> onPartitionsLost() nor onPartitionsRevoked()
> -----------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-15998
> URL: https://issues.apache.org/jira/browse/KAFKA-15998
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 3.4.0
> Reporter: Jonathan Haapala
> Priority: Major
> Attachments: image-2024-12-16-15-36-47-059.png,
> image-2024-12-16-15-53-53-816.png
>
>
> I ran into a case where {{onPartitionsAssigned()}} was called without first
> calling {{onPartitionsRevoked()}} and there is no indication that
> {{onPartitionsLost()}} was called or had any reason to be called. We are
> using the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0.
> Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}:
> {quote}In eager rebalancing, it will always be called at the start of a
> rebalance and after the consumer stops fetching data.
> {quote}
> We internally keep track of partition states with a state machine, and rely
> on these APIs to assert what expected states we are in. So when a partition
> is Revoked and then re-Assigned, we know that we kept ownership. Moreover, if
> we are assigned partitions in EAGER rebalancing, we expect that entire
> assignment is passed to {{{}onPartitionsAssigned(){}}}, because if
> {{onPartitionsRevoked()}} is always called at the start of a rebalance and
> EAGER protocol always revokes the entire assignment, then by the time we hit
> {{onPartitionsAssigned()}} there should be nothing assigned from the
> consumer's point of view, and therefore the entire assignment is newly added.
> However, we recently ran into a situation where we received an assignment
> while the consumer's existing assignment was non-empty:
> | *Pod*| *Message*|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Notifying assignor about the {*}new
> Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78,
> topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117,
> topic-123, topic-130, topic-137, topic-141])|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26,
> topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101,
> topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137,
> topic-141|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC}
> [kafka-coordinator-heartbeat-thread \\| metric-aggregator] INFO
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] *Request joining group* due to: group is already
> rebalancing|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Successfully joined group with generation
> Generation\{generationId=12417,
> memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
> protocol='sticky'}|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Successfully synced group in generation
> Generation{generationId={*}12417{*},
> memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
> protocol='sticky'}|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Notifying assignor about the {*}new
> Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78,
> topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: |
> Here you can see we get assigned partitions:
> 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141
> And promptly see them all as newly added when passed to
> {{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices
> another rebalance and requests to join. It quickly succeeds and then almost
> immediately successfully syncs. We then get a new assignment:
> 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117
> This is a subset of the partitions we were assigned previously, missing 123,
> 130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the
> beginning of this rebalance, the consumer still has the old assignment as its
> current assignment rather than it being empty, and so it thinks there are no
> newly assigned partitions.
> Using this diagram from
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceProtocol:Stop-The-WorldEffect]
> as a visual guide, it seems like we sent the JoinGroup and succeeded in
> joining, but then we seemingly skipped to the SyncGroup and got our
> assignment.
> !https://cwiki.apache.org/confluence/download/attachments/103090108/Rebalance%20Today.jpg?version=1&modificationDate=1554837450000&api=v2!
> Here are the group coordinator assignment logs for the initial assignment and
> then the assignment without a revoke. You can see they are sequential
> generations 12416 and 12417, so none are missed.
> ||Pod||Message||
> |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:25,709\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Finished assignment for group at generation
> {*}12416{*}:
> {consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
> topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134,
> topic-136, topic-27, topic-56, topic-119, topic-22, topic-65, topic-43,
> topic-64]),
> consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
> topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102,
> topic-106, topic-110, topic-114, topic-118, topic-124, topic-131, topic-138,
> topic-142]),
> consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
> topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103,
> topic-107, topic-111, topic-115, topic-120, topic-125, topic-132, topic-139,
> topic-143]),
> consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
> topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42,
> topic-52, topic-63, topic-69, topic-87, topic-91, topic-93, topic-96,
> topic-98]),
> consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
> topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101,
> topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137,
> topic-141]),
> consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
> topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34,
> topic-36, topic-47, topic-48, topic-50, topic-51, topic-53, topic-59,
> topic-61]),
> consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
> topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99,
> topic-104, topic-108, topic-112, topic-116, topic-122, topic-127, topic-135,
> topic-140]),
> consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
> topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46,
> topic-55, topic-67, topic-75, topic-89, topic-92, topic-94, topic-97,
> topic-100]),
> consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
> topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68,
> topic-41, topic-58, topic-121, topic-24, topic-84, topic-76, topic-126])}|
> |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:32,119\{UTC}
> [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer.metric-data-points.metric-aggregator,
> groupId=metric-aggregator] Finished assignment for group at generation
> {*}12417{*}:
> {consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
> topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134,
> topic-136, topic-27, topic-56, topic-119]),
> consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
> topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102,
> topic-106, topic-110, topic-114, topic-118]),
> consumer.metric-data-points.metric-aggregator-76fd27b3-fb5b-49aa-af2a-b53150897cf9=Assignment(partitions=[topic-22,
> topic-51, topic-61, topic-76, topic-92, topic-96, topic-100, topic-124,
> topic-127, topic-132, topic-138, topic-141]),
> consumer.metric-data-points.metric-aggregator-9f6ab144-85a7-468c-9aa1-11f5445084b9=Assignment(partitions=[topic-43,
> topic-59, topic-65, topic-91, topic-94, topic-98, topic-123, topic-126,
> topic-131, topic-137, topic-140, topic-143]),
> consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
> topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103,
> topic-107, topic-111, topic-115, topic-120]),
> consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
> topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42,
> topic-52, topic-63, topic-69, topic-87]),
> consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
> topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101,
> topic-105, topic-109, topic-113, topic-117]),
> consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
> topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34,
> topic-36, topic-47, topic-48, topic-50]),
> consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
> topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99,
> topic-104, topic-108, topic-112, topic-116]),
> consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
> topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46,
> topic-55, topic-67, topic-75, topic-89]),
> consumer.metric-data-points.metric-aggregator-91b1e207-0978-430e-9393-679ac44647b8=Assignment(partitions=[topic-24,
> topic-53, topic-64, topic-84, topic-93, topic-97, topic-122, topic-125,
> topic-130, topic-135, topic-139, topic-142]),
> consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
> topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68,
> topic-41, topic-58, topic-121])}|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)