mumrah commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1819761533


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,13 @@ static long offsetForEarliestTimestamp(TopicIdPartition 
topicIdPartition, Replic
                 Optional.empty(), true).timestampAndOffsetOpt();
         return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;
     }
+
+    static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {

Review Comment:
   Returning an OptionalInt would be a bit nicer than throwing there (maybe). 
If we actually want a helper method that throws, we should incorporate that 
into the name (e.g., `leaderEpochOrThrow`)



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -181,6 +181,11 @@ public static RecordState forId(byte id) {
      */
     private final TopicIdPartition topicIdPartition;
 
+    /**
+     * The leader epoch is used to track the partition epoch.
+     */
+    private final int leaderEpoch;

Review Comment:
   Making this final implies the SharePartition is now scoped to the lifetime 
of a partition's leader epoch. Since SPs are managed by the node which is the 
leader for that partition, I guess this is already the case (and not really a 
problem). We normally expect the leader to move when the leader epoch 
increases, but I'm not sure if that's always the case.
   
   Hypothetically, if a leader epoch increased but the leader did not move, 
would it be possible to reuse the SharePartition state? Or would we need to 
re-load its state from the persister anyways?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -606,22 +641,51 @@ private void maybeCompleteInitializationWithException(
             return;
         }
 
+        // Remove the partition from the cache as it's failed to initialize.
+        partitionCacheMap.remove(sharePartitionKey);
+        // The partition initialization failed, so complete the request with 
the exception.
+        // The server should not be in this state, so log the error on broker 
and surface the same
+        // to the client. The broker should not be in this state, investigate 
the root cause of the error.
+        log.error("Error initializing share partition with key {}", 
sharePartitionKey, throwable);
+        maybeCompleteShareFetchExceptionally(future, 
Collections.singletonList(topicIdPartition), throwable);
+    }
+
+    private void handleSharePartitionException(
+        SharePartitionKey sharePartitionKey,
+        Throwable throwable
+    ) {
         if (throwable instanceof NotLeaderOrFollowerException || throwable 
instanceof FencedStateEpochException) {
             log.info("The share partition with key {} is fenced: {}", 
sharePartitionKey, throwable.getMessage());
             // The share partition is fenced hence remove the partition from 
map and let the client retry.
             // But surface the error to the client so client might take some 
action i.e. re-fetch
             // the metadata and retry the fetch on new leader.
             partitionCacheMap.remove(sharePartitionKey);

Review Comment:
   Do we have a state machine written down somewhere for SharePartition? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to