apoorvmittal10 commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1811298975


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1497,17 +1535,29 @@ private long findLastOffsetAcknowledged() {
     }
 
     // Visible for testing
-    boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch> 
stateBatches) {
+    boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch> 
stateBatches, boolean isRetry) {
         WriteShareGroupStateResult response;
         try {
             response = persister.writeState(new 
WriteShareGroupStateParameters.Builder()
                 .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
                     .setGroupId(this.groupId)
                     .setTopicsData(Collections.singletonList(new 
TopicData<>(topicIdPartition.topicId(),
                         
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
-                            topicIdPartition.partition(), stateEpoch, 
startOffset, 0, stateBatches))))
+                            topicIdPartition.partition(), stateEpoch, 
startOffset, leaderEpoch, stateBatches))))
                     ).build()).build()).get();
         } catch (InterruptedException | ExecutionException e) {
+            if (e.getCause() != null && e.getCause() instanceof 
FencedLeaderEpochException) {
+                if (isRetry) {
+                    log.error("Fenced leader exception occurred for the share 
partition: {}-{}. Re-try failed,"
+                        + " current leader epoch: {}", groupId, 
topicIdPartition, leaderEpoch, e);
+                    return false;
+                }
+                log.info("Fenced leader exception occurred for the share 
partition: {}-{}. Re-fetch partition"
+                    + " leader epoch, current leader epoch: {}", groupId, 
topicIdPartition, leaderEpoch);
+                leaderEpoch = 
getLeaderEpoch(topicIdPartition.topicPartition());
+                // Retry the write state operation.
+                return isWriteShareGroupStateSuccessful(stateBatches, true);

Review Comment:
   I have addressed the concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to