smjn opened a new pull request, #17580: URL: https://github.com/apache/kafka/pull/17580
It is possible that due to a network partition or delays on the wire a read request with a higher `leaderEpoch` reaches the share coordinator before a write request with a lower one for the same share partition. In this case, we have chosen to honor the highest `leaderEpoch` request and reject any subsequent lower value requests irrespective of the RPC type. However, this means that we need to persist the `leaderEpoch` seen in the read requests. In the `ShareCoordinatorShard.readState` method we are storing the `leaderEpoch` present in the request directly in the `leaderEpochMap` timeline hashmap, if it is the highest `leaderEpoch` seen so far for a specific share partition. This however is folly. The coordinator runtime guarantees the consistency of the timeline data structures when updated via the `replay` method. The `replay` method is called by the runtime whenever it persists some records into the topic it is managing. This method is implemented in `ShareCoordinatorShard.replay`. Therefore, to remedy the situation this PR adds code into the `ShareCoordinatorService.readState` method to issue a `runtime.scheduleWriteOperation` call if the incoming read state request holds a valid `leaderEpoch` value (not -1). We cannot ascertain if the `leaderEpoch` in the read request is the highest so far for the given share partition because it is not possible to lookup the timeline data structures in the `ShareCoordinatorService` class since they are housed in `ShareCoordinatorShard` and an offset is needed to do the lookup, which we won't have. Secondly, we cannot move code to write the record in the shard read state method as we need the runtime to execute that callback which is present in the `ShareCoordinatorService`. Hence, the new code will issue a `scheduleWriteOperation` call if leaderEpoch is != -1 and the supplied callback method `ShareCoordinatorShard.writeLeaderEpoch` (also part of the PR) will generate a record if the `leaderEpoch` is highest, or no record if it is same as last seen, or error if the epoch is old. We required a separate method because we want to only look at `leaderEpoch` and simply ignore other data fields. Also, we want to perform the optimization of not generating records if the `leaderEpoch` is -1 or equal to highest seen so far. Subsequently, a sequential call to the `runtime.scheduleReadOperation` will be made in `ShareCoordinatorService.readState` which is same as before. **TLDR**: We will issue a phantom write call (not explicitly issued by a caller) in `ShareCoordinatorService.readState` if the read request contains a valid `leaderEpoch`. Based on the response to the write, a standard read call will be scheduled. This way we will be able to persist the `leaderEpoch` correctly in the topic and timeline data structures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
