apoorvmittal10 commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1811298975
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1497,17 +1535,29 @@ private long findLastOffsetAcknowledged() {
}
// Visible for testing
- boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch>
stateBatches) {
+ boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch>
stateBatches, boolean isRetry) {
WriteShareGroupStateResult response;
try {
response = persister.writeState(new
WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new
TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
- topicIdPartition.partition(), stateEpoch,
startOffset, 0, stateBatches))))
+ topicIdPartition.partition(), stateEpoch,
startOffset, leaderEpoch, stateBatches))))
).build()).build()).get();
} catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() != null && e.getCause() instanceof
FencedLeaderEpochException) {
+ if (isRetry) {
+ log.error("Fenced leader exception occurred for the share
partition: {}-{}. Re-try failed,"
+ + " current leader epoch: {}", groupId,
topicIdPartition, leaderEpoch, e);
+ return false;
+ }
+ log.info("Fenced leader exception occurred for the share
partition: {}-{}. Re-fetch partition"
+ + " leader epoch, current leader epoch: {}", groupId,
topicIdPartition, leaderEpoch);
+ leaderEpoch =
getLeaderEpoch(topicIdPartition.topicPartition());
+ // Retry the write state operation.
+ return isWriteShareGroupStateSuccessful(stateBatches, true);
Review Comment:
I have addressed the concern.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]