Andrew Schofield created KAFKA-19204:
----------------------------------------
Summary: Timeouts in coordinator runtime operations can break
share group initialization and deletion
Key: KAFKA-19204
URL: https://issues.apache.org/jira/browse/KAFKA-19204
Project: Kafka
Issue Type: Sub-task
Reporter: Andrew Schofield
Assignee: Sushant Mahajan
There is a class of problems in the GroupCoordinator to do with compound share
group operations. When handling requests such as ShareGroupHeartbeat, the group
coordinator service calls the `CoordinatorRuntime.scheduleWriteOperation`
method, supplying an operation to call and a timeout. The operation returns a
result and a set of records to be written to the __consumer_offsets topic. The
coordinator runtime method returns a CompletableFuture to be completed when the
operation and the writing to the topic finishes.
The problem comes when the records are written to the topic (meaning that the
operation succeeded) but the writing of the records does not complete within
the timeout, as seems to occur if a following replica broker is restarted at an
inopportune moment. In this case, any chained logic which is waiting for the
successful completion using a method such as CompletableFuture.thenCompose will
not execute.
There are four methods in GroupCoordinatorService which are problematic in this
regard:
* shareGroupHeartbeat
* uninitializeShareGroupState
* performShareGroupStateMetadataInitialize
* deleteShareGroups
Let's look at the first of these, shareGroupHeartbeat.
The write operation calls GroupCoordinatorRuntime.shareGroupHeartbeat. That
does the processing for the heartbeat and returns the records which will form
the persistent updates to remember the heartbeat's effects. This might include
a ShareGroupStatePartitionMetadataRecord if it is necessary to ask the
ShareCoordinator to initialize a new partition.
So far, so good. But the GC expects to call
GroupCoordinatorService.persisterInitialize when the operation completes
successfully. However, if the operation does complete successfully, the records
are added to the consumer-offsets topic but THEN the replication of the records
does not complete within the timeout, the call to persisterInitialize will not
occur.
Subsequently, the writing of the records may complete, which drives the replay
logic, but that does not initiate the persisterInitialize either. So the
initializing partitions remain in limbo.
It seems that in this case, the replay of the record should be the signal that
the persisterInitialize method should be called.
The uninitializeShareGroupState and performShareGroupStateMetadataInitialize
method have a similar problem, but less tricky I think because the action to be
performed after completion is more amenable to mild restructuring.
The final case, deleteGroups, looks like it needs more work, but that's
potentially just allowing the persister's deletion to be deferred until the
replay of the ShareGroupStatePartitionMetadataRecord containing the
deletingTopics field.
Thanks to [~cwadhwa] for digging into this as a result of his multi-broker
system tests.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)