mumrah commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1819761533
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,13 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
Review Comment:
Returning an OptionalInt would be a bit nicer than throwing there (maybe).
If we actually want a helper method that throws, we should incorporate that
into the name (e.g., `leaderEpochOrThrow`)
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -181,6 +181,11 @@ public static RecordState forId(byte id) {
*/
private final TopicIdPartition topicIdPartition;
+ /**
+ * The leader epoch is used to track the partition epoch.
+ */
+ private final int leaderEpoch;
Review Comment:
Making this final implies the SharePartition is now scoped to the lifetime
of a partition's leader epoch. Since SPs are managed by the node which is the
leader for that partition, I guess this is already the case (and not really a
problem). We normally expect the leader to move when the leader epoch
increases, but I'm not sure if that's always the case.
Hypothetically, if a leader epoch increased but the leader did not move,
would it be possible to reuse the SharePartition state? Or would we need to
re-load its state from the persister anyways?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -606,22 +641,51 @@ private void maybeCompleteInitializationWithException(
return;
}
+ // Remove the partition from the cache as it's failed to initialize.
+ partitionCacheMap.remove(sharePartitionKey);
+ // The partition initialization failed, so complete the request with
the exception.
+ // The server should not be in this state, so log the error on broker
and surface the same
+ // to the client. The broker should not be in this state, investigate
the root cause of the error.
+ log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ maybeCompleteShareFetchExceptionally(future,
Collections.singletonList(topicIdPartition), throwable);
+ }
+
+ private void handleSharePartitionException(
+ SharePartitionKey sharePartitionKey,
+ Throwable throwable
+ ) {
if (throwable instanceof NotLeaderOrFollowerException || throwable
instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}",
sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
partitionCacheMap.remove(sharePartitionKey);
Review Comment:
Do we have a state machine written down somewhere for SharePartition?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]