This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ac583ad2c02 KAFKA-19455: Retry persister request on metadata image
issues. (#20078)
ac583ad2c02 is described below
commit ac583ad2c02e47c33d5614248b28718ed09bf6e4
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Jul 2 00:17:59 2025 +0530
KAFKA-19455: Retry persister request on metadata image issues. (#20078)
* If we get an `UNKNOWN_TOPIC_OR_PARTITION` response from the
`ShareCoordinator` is could imply a transient issue where the metadata
image is not upto date.
* In this case it makes sense to retry the request to give time for data
to be available.
* In this PR, we are making the required change.
Reviewers: Andrew Schofield <[email protected]>
---
.../apache/kafka/server/share/persister/PersisterStateManager.java | 6 ++++++
.../org/apache/kafka/coordinator/share/ShareCoordinatorShard.java | 1 +
2 files changed, 7 insertions(+)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 98b08a3e50a..ccfae329c7b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -454,6 +454,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE: // retriable error codes
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
log.debug("Received retriable error in find coordinator
for {} using key {}: {}", name(), partitionKey(), error.message());
if (!findCoordBackoff.canAttempt()) {
log.error("Exhausted max retries to find coordinator
for {} using key {} without success.", name(), partitionKey());
@@ -581,6 +582,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
log.debug("Received retriable error in
initialize state RPC for key {}: {}", partitionKey(), error.message());
if (!initializeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for
initialize state RPC for key {} without success.", partitionKey());
@@ -739,6 +741,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
log.debug("Received retriable error in write
state RPC for key {}: {}", partitionKey(), error.message());
if (!writeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for write
state RPC for key {} without success.", partitionKey());
@@ -881,6 +884,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
log.debug("Received retriable error in read
state RPC for key {}: {}", partitionKey(), error.message());
if (!readStateBackoff.canAttempt()) {
log.error("Exhausted max retries for read
state RPC for key {} without success.", partitionKey());
@@ -1023,6 +1027,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
log.debug("Received retriable error in read
state summary RPC for key {}: {}", partitionKey(), error.message());
if (!readStateSummaryBackoff.canAttempt()) {
log.error("Exhausted max retries for read
state summary RPC for key {} without success.", partitionKey());
@@ -1162,6 +1167,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
log.debug("Received retriable error in delete
state RPC for key {}: {}", partitionKey(), error.message());
if (!deleteStateBackoff.canAttempt()) {
log.error("Exhausted max retries for
delete state RPC for key {} without success.", partitionKey());
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index f5d9a3cfb30..86b2d506376 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -207,6 +207,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
@Override
public void onLoaded(MetadataImage newImage) {
+ this.metadataImage = newImage;
coordinatorMetrics.activateMetricsShard(metricsShard);
}