junrao commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1815761464
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1743,7 +1750,7 @@ CompletableFuture<Void>
writeShareGroupState(List<PersisterStateBatch> stateBatc
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new
TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
- topicIdPartition.partition(), stateEpoch, startOffset,
0, stateBatches))))
+ topicIdPartition.partition(), stateEpoch, startOffset,
leaderEpoch, stateBatches))))
Review Comment:
If we get an error like UNKNOWN_TOPIC_OR_PARTITION or FENCED_STATE_EPOCH,
should we remove the sharePartition too?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -576,21 +605,24 @@ void processShareFetch(ShareFetchData shareFetchData) {
addDelayedShareFetch(new DelayedShareFetch(shareFetchData,
replicaManager, this),
delayedShareFetchWatchKeys);
} catch (Exception e) {
- // In case exception occurs then release the locks so queue can be
further processed.
- log.error("Error processing fetch queue for share partitions", e);
- if (!shareFetchData.future().isDone()) {
- shareFetchData.future().completeExceptionally(e);
- }
+ // Complete the whole fetch request with an exception if there is
an error processing.
+ // The exception currently can be thrown only if there is an error
while initializing
+ // the share partition. But skip the processing for other share
partitions in the request
+ // as this situation is not expected.
+ log.error("Error processing share fetch request", e);
+ maybeCompleteShareFetchWithException(shareFetchData.future(),
shareFetchData.partitionMaxBytes().keySet(), e);
}
}
private SharePartition getOrCreateSharePartition(SharePartitionKey
sharePartitionKey) {
return partitionCacheMap.computeIfAbsent(sharePartitionKey,
k -> {
long start = time.hiResClockMs();
+ int leaderEpoch =
ShareFetchUtils.leaderEpoch(replicaManager,
sharePartitionKey.topicIdPartition().topicPartition());
SharePartition partition = new SharePartition(
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
+ leaderEpoch,
Review Comment:
ShareFetchUtils.leaderEpoch can return exceptions like
NOT_LEADER_OR_FOLLOWER and UNKNOWN_TOPIC_OR_PARTITION. Should we handle that at
the partition level?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -617,22 +650,35 @@ private void maybeCompleteInitializationWithException(
return;
}
+ // Remove the partition from the cache as it's failed to initialize.
Review Comment:
At the beginning of this method, we check for LeaderNotAvailableException.
When do we get that exception? ReadShareGroupStateResponse doesn't seem to have
that error.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -617,22 +650,35 @@ private void maybeCompleteInitializationWithException(
return;
}
+ // Remove the partition from the cache as it's failed to initialize.
+ partitionCacheMap.remove(sharePartitionKey);
+ // The partition initialization failed, so complete the request with
the exception.
+ // The server should not be in this state, so log the error on broker
and surface the same
+ // to the client. The broker should not be in this state, investigate
the root cause of the error.
+ log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ maybeCompleteShareFetchWithException(future,
Collections.singletonList(topicIdPartition), throwable);
+ }
+
+ private void handleFencedSharePartitionException(
+ SharePartitionKey sharePartitionKey,
+ Throwable throwable
+ ) {
if (throwable instanceof NotLeaderOrFollowerException || throwable
instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}",
sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
partitionCacheMap.remove(sharePartitionKey);
- future.completeExceptionally(throwable);
- return;
}
+ }
- // The partition initialization failed, so complete the request with
the exception.
- // The server should not be in this state, so log the error on broker
and surface the same
- // to the client. As of now this state is in-recoverable for the
broker, and we should
- // investigate the root cause of the error.
- log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
- future.completeExceptionally(throwable);
+ private void
maybeCompleteShareFetchWithException(CompletableFuture<Map<TopicIdPartition,
PartitionData>> future,
+ Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) {
+ if (!future.isDone()) {
+ Errors error = Errors.forException(throwable);
+
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
+ tp -> tp, tp -> new
PartitionData().setErrorCode(error.code()).setErrorMessage(error.message()))));
Review Comment:
We lose the error message in throwable when converting it to Error.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -608,6 +640,7 @@ private SharePartition
getOrCreateSharePartition(SharePartitionKey sharePartitio
private void maybeCompleteInitializationWithException(
SharePartitionKey sharePartitionKey,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+ TopicIdPartition topicIdPartition,
Review Comment:
Do we need this since it's part of SharePartitionKey?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -617,22 +650,35 @@ private void maybeCompleteInitializationWithException(
return;
}
+ // Remove the partition from the cache as it's failed to initialize.
+ partitionCacheMap.remove(sharePartitionKey);
+ // The partition initialization failed, so complete the request with
the exception.
+ // The server should not be in this state, so log the error on broker
and surface the same
+ // to the client. The broker should not be in this state, investigate
the root cause of the error.
+ log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ maybeCompleteShareFetchWithException(future,
Collections.singletonList(topicIdPartition), throwable);
+ }
+
+ private void handleFencedSharePartitionException(
+ SharePartitionKey sharePartitionKey,
+ Throwable throwable
+ ) {
Review Comment:
Should we include UNKNOWN_TOPIC_OR_PARTITION below too?
##########
server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java:
##########
@@ -113,6 +127,7 @@ public String toString() {
", maxBytes=" + maxBytes +
", isolation=" + isolation +
", clientMetadata=" + clientMetadata +
+ ", shareFetchRequest=" + shareFetchRequest +
Review Comment:
Should we include the new param in `hashCode` and `equals` too?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -127,7 +127,7 @@ public void onComplete() {
shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
- shareFetchData.future().completeExceptionally(e);
+
sharePartitionManager.handleFetchException(shareFetchData.groupId(),
topicPartitionData.keySet(), shareFetchData.future(), e);
Review Comment:
Typically, the error code is returned in responseLogResult. So we need to
handle the error there too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]