Jonathan Haapala created KAFKA-15998:
----------------------------------------
Summary: EAGER rebalance onPartitionsAssigned() called with no
previous onPartitionsLost() nor onPartitionsRevoked()
Key: KAFKA-15998
URL: https://issues.apache.org/jira/browse/KAFKA-15998
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 3.4.0
Reporter: Jonathan Haapala
I ran into a case where {{onPartitionsAssigned()}} was called without first
calling {{onPartitionsRevoked()}} and there is no indication that
{{onPartitionsLost()}} was called or had any reason to be called. We are using
the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0.
Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}:
{quote}In eager rebalancing, it will always be called at the start of a
rebalance and after the consumer stops fetching data.{quote}
We internally keep track of partition states with a state machine, and rely on
these APIs to assert what expected states we are in. So when a partition is
Revoked and then re-Assigned, we know that we kept ownership. Moreover, if we
are assigned partitions in EAGER rebalancing, we expect that entire assignment
is passed to {{{}onPartitionsAssigned(){}}}, because if
{{onPartitionsRevoked()}} is always called at the start of a rebalance and
EAGER protocol always revokes the entire assignment, then by the time we hit
{{onPartitionsAssigned()}} there should be nothing assigned from the consumer's
point of view, and therefore the entire assignment is newly added.
However, we recently ran into a situation where we received an assignment while
the consumer's existing assignment was non-empty:
| *Pod*
| *Message*
|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Notifying assignor about the {*}new
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78,
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117,
topic-123, topic-130, topic-137, topic-141])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26,
topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101,
topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137,
topic-141|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC}
[kafka-coordinator-heartbeat-thread \| metric-aggregator] INFO
o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] *Request joining group* due to: group is already
rebalancing|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Successfully joined group with generation
Generation\{generationId=12417,
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Successfully synced group in generation
Generation{generationId={*}12417{*},
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Notifying assignor about the \{*}new
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78,
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: |
Here you can see we get assigned partitions:
26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141
And promptly see them all as newly added when passed to
{{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices
another rebalance and requests to join. It quickly succeeds and then almost
immediately successfully syncs. We then get a new assignment:
26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117
This is a subset of the partitions we were assigned previously, missing 123,
130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the
beginning of this rebalance, the consumer still has the old assignment as its
current assignment rather than it being empty, and so it thinks there are no
newly assigned partitions.
Using this diagram from
[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceProtocol:Stop-The-WorldEffect]
as a visual guide, it seems like we sent the JoinGroup and succeeded in
joining, but then we seemingly skipped to the SyncGroup and got our assignment.
!https://cwiki.apache.org/confluence/download/attachments/103090108/Rebalance%20Today.jpg?version=1&modificationDate=1554837450000&api=v2!
Here are the group coordinator assignment logs for the initial assignment and
then the assignment without a revoke. You can see they are sequential
generations 12416 and 12417, so none are missed.
||Pod||Message||
|aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:25,709\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Finished assignment for group at generation
{*}12416{*}:
\{consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134,
topic-136, topic-27, topic-56, topic-119, topic-22, topic-65, topic-43,
topic-64]),
consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102,
topic-106, topic-110, topic-114, topic-118, topic-124, topic-131, topic-138,
topic-142]),
consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103,
topic-107, topic-111, topic-115, topic-120, topic-125, topic-132, topic-139,
topic-143]),
consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42,
topic-52, topic-63, topic-69, topic-87, topic-91, topic-93, topic-96,
topic-98]),
consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101,
topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137,
topic-141]),
consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34,
topic-36, topic-47, topic-48, topic-50, topic-51, topic-53, topic-59,
topic-61]),
consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99,
topic-104, topic-108, topic-112, topic-116, topic-122, topic-127, topic-135,
topic-140]),
consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46,
topic-55, topic-67, topic-75, topic-89, topic-92, topic-94, topic-97,
topic-100]),
consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68,
topic-41, topic-58, topic-121, topic-24, topic-84, topic-76, topic-126])}|
|aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:32,119\{UTC}
[KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer.metric-data-points.metric-aggregator,
groupId=metric-aggregator] Finished assignment for group at generation
{*}12417{*}:
\{consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134,
topic-136, topic-27, topic-56, topic-119]),
consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102,
topic-106, topic-110, topic-114, topic-118]),
consumer.metric-data-points.metric-aggregator-76fd27b3-fb5b-49aa-af2a-b53150897cf9=Assignment(partitions=[topic-22,
topic-51, topic-61, topic-76, topic-92, topic-96, topic-100, topic-124,
topic-127, topic-132, topic-138, topic-141]),
consumer.metric-data-points.metric-aggregator-9f6ab144-85a7-468c-9aa1-11f5445084b9=Assignment(partitions=[topic-43,
topic-59, topic-65, topic-91, topic-94, topic-98, topic-123, topic-126,
topic-131, topic-137, topic-140, topic-143]),
consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103,
topic-107, topic-111, topic-115, topic-120]),
consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42,
topic-52, topic-63, topic-69, topic-87]),
consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101,
topic-105, topic-109, topic-113, topic-117]),
consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34,
topic-36, topic-47, topic-48, topic-50]),
consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99,
topic-104, topic-108, topic-112, topic-116]),
consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46,
topic-55, topic-67, topic-75, topic-89]),
consumer.metric-data-points.metric-aggregator-91b1e207-0978-430e-9393-679ac44647b8=Assignment(partitions=[topic-24,
topic-53, topic-64, topic-84, topic-93, topic-97, topic-122, topic-125,
topic-130, topic-135, topic-139, topic-142]),
consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68,
topic-41, topic-58, topic-121])}|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)