junrao commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1809558420
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -66,6 +66,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
Review Comment:
extra new line
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -603,22 +612,43 @@ private void maybeCompleteInitializationWithException(
return;
}
+ // Remove the partition from the cache as it's failed to initialize.
+ partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) -> null);
+ // The partition initialization failed, so complete the request with
the exception.
+ // The server should not be in this state, so log the error on broker
and surface the same
+ // to the client. The broker should not be in this state, investigate
the root cause of the error.
+ log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ future.completeExceptionally(throwable);
+ }
+
+ private void handleSharePartitionException(
+ SharePartitionKey sharePartitionKey,
+ Throwable throwable
+ ) {
if (throwable instanceof NotLeaderOrFollowerException || throwable
instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}",
sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
- partitionCacheMap.remove(sharePartitionKey);
- future.completeExceptionally(throwable);
- return;
+ partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) ->
null);
}
+ }
- // The partition initialization failed, so complete the request with
the exception.
- // The server should not be in this state, so log the error on broker
and surface the same
- // to the client. As of now this state is in-recoverable for the
broker, and we should
- // investigate the root cause of the error.
- log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
- future.completeExceptionally(throwable);
+ // TODO: Should the return be -1 or throw an exception?
+ private int getLeaderEpoch(TopicPartition tp) {
+ Either<Errors, Partition> partitionOrError =
replicaManager.getPartitionOrError(tp);
+ if (partitionOrError.isLeft()) {
+ log.error("Failed to get partition leader for topic partition: {}-{}
due to error: {}",
+ tp.topic(), tp.partition(), partitionOrError.left().get());
+ return -1;
Review Comment:
We need to return a NOT_LEADER_OR_FOLLOWER error to the client if the broker
is not the leader.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -594,7 +602,8 @@ private SharePartition
getOrCreateSharePartition(SharePartitionKey sharePartitio
private void maybeCompleteInitializationWithException(
SharePartitionKey sharePartitionKey,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
- Throwable throwable) {
+ Throwable throwable
+ ) {
Review Comment:
Why do we need to make this change? The current format seems to match other
existing code.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -603,22 +612,43 @@ private void maybeCompleteInitializationWithException(
return;
}
+ // Remove the partition from the cache as it's failed to initialize.
+ partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) -> null);
+ // The partition initialization failed, so complete the request with
the exception.
+ // The server should not be in this state, so log the error on broker
and surface the same
+ // to the client. The broker should not be in this state, investigate
the root cause of the error.
+ log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ future.completeExceptionally(throwable);
+ }
+
+ private void handleSharePartitionException(
+ SharePartitionKey sharePartitionKey,
+ Throwable throwable
+ ) {
if (throwable instanceof NotLeaderOrFollowerException || throwable
instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}",
sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
- partitionCacheMap.remove(sharePartitionKey);
- future.completeExceptionally(throwable);
- return;
+ partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) ->
null);
}
+ }
- // The partition initialization failed, so complete the request with
the exception.
- // The server should not be in this state, so log the error on broker
and surface the same
- // to the client. As of now this state is in-recoverable for the
broker, and we should
- // investigate the root cause of the error.
- log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
- future.completeExceptionally(throwable);
+ // TODO: Should the return be -1 or throw an exception?
+ private int getLeaderEpoch(TopicPartition tp) {
Review Comment:
We don't use getters. So this can just be leaderEpoch.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -603,22 +612,43 @@ private void maybeCompleteInitializationWithException(
return;
}
+ // Remove the partition from the cache as it's failed to initialize.
+ partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) -> null);
+ // The partition initialization failed, so complete the request with
the exception.
+ // The server should not be in this state, so log the error on broker
and surface the same
+ // to the client. The broker should not be in this state, investigate
the root cause of the error.
+ log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ future.completeExceptionally(throwable);
+ }
+
+ private void handleSharePartitionException(
+ SharePartitionKey sharePartitionKey,
+ Throwable throwable
+ ) {
if (throwable instanceof NotLeaderOrFollowerException || throwable
instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}",
sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
- partitionCacheMap.remove(sharePartitionKey);
- future.completeExceptionally(throwable);
- return;
+ partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) ->
null);
Review Comment:
Is there a particular reason to use `computeIfPresent` instead of `remove`?
The latter is more intuitive.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -260,11 +262,13 @@ public CompletableFuture<Map<TopicIdPartition,
ShareAcknowledgeResponseData.Part
this.shareGroupMetrics.shareAcknowledgement();
Map<TopicIdPartition, CompletableFuture<Errors>> futures = new
HashMap<>();
acknowledgeTopics.forEach((topicIdPartition,
acknowledgePartitionBatches) -> {
- SharePartition sharePartition =
partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
+ SharePartitionKey sharePartitionKey = sharePartitionKey(groupId,
topicIdPartition);
+ SharePartition sharePartition =
partitionCacheMap.get(sharePartitionKey);
if (sharePartition != null) {
CompletableFuture<Errors> future = new CompletableFuture<>();
sharePartition.acknowledge(memberId,
acknowledgePartitionBatches).whenComplete((result, throwable) -> {
if (throwable != null) {
+ handleSharePartitionException(sharePartitionKey,
throwable);
Review Comment:
To be consistent, we want to add the same logic for shareFetch too.
To do this, we need to extend FetchParams such that `fetchOnlyLeader()` is
true for share fetch and handle `NotLeaderOrFollowerException` accordingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]