This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c32c167e040 KAFKA-18781: Extend RefreshRetriableException related
exceptions (#19136)
c32c167e040 is described below
commit c32c167e0403bb7820e4a072aacb860c2ff06e19
Author: Kaushik Raina <[email protected]>
AuthorDate: Fri Mar 14 21:41:31 2025 +0530
KAFKA-18781: Extend RefreshRetriableException related exceptions (#19136)
- Extended derived exceptions described in
[KIP-1050](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=309496816#KIP1050:ConsistenterrorhandlingforTransactions-RefreshRetriableException)
to include the new RefreshRetriableException in base hierarchy.
- Added unit tests to validate the hierarchy of the derived exceptions
in relevant scenarios.
Reviewers: Justine Olshan <[email protected]>
---
.../errors/CoordinatorNotAvailableException.java | 2 +-
.../common/errors/InvalidMetadataException.java | 2 +-
.../common/errors/NotCoordinatorException.java | 2 +-
.../common/errors/RefreshRetriableException.java | 13 +++++++++++
.../errors/TransactionExceptionHierarchyTest.java | 27 ++++++++++++++++++++++
5 files changed, 43 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
index 827ce54e0e5..2bd7d911a7a 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
@@ -23,7 +23,7 @@ package org.apache.kafka.common.errors;
* In the context of the transactional coordinator, this error will be
returned if the underlying transactional log
* is under replicated or if an append to the log times out.
*/
-public class CoordinatorNotAvailableException extends RetriableException {
+public class CoordinatorNotAvailableException extends
RefreshRetriableException {
public static final CoordinatorNotAvailableException INSTANCE = new
CoordinatorNotAvailableException();
private static final long serialVersionUID = 1L;
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
index 504e8f3cc78..f3485a8364a 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.errors;
/**
* An exception that may indicate the client's metadata is out of date
*/
-public abstract class InvalidMetadataException extends RetriableException {
+public abstract class InvalidMetadataException extends
RefreshRetriableException {
private static final long serialVersionUID = 1L;
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
index 00ca32cffd1..dc116d02c05 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
@@ -23,7 +23,7 @@ package org.apache.kafka.common.errors;
* In the context of the transactional coordinator, it returns this error when
it receives a transactional
* request with a transactionalId the coordinator doesn't own.
*/
-public class NotCoordinatorException extends RetriableException {
+public class NotCoordinatorException extends RefreshRetriableException {
private static final long serialVersionUID = 1L;
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/RefreshRetriableException.java
b/clients/src/main/java/org/apache/kafka/common/errors/RefreshRetriableException.java
index 0538935eef1..480cccf7763 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/RefreshRetriableException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/RefreshRetriableException.java
@@ -22,7 +22,20 @@ package org.apache.kafka.common.errors;
* The request can be modified or updated with fresh metadata before being
retried.
*/
public abstract class RefreshRetriableException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public RefreshRetriableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
public RefreshRetriableException(String message) {
super(message);
}
+
+ public RefreshRetriableException(Throwable cause) {
+ super(cause);
+ }
+
+ public RefreshRetriableException() {
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java
b/clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java
index 64cd0167845..d642351e9fa 100644
---
a/clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.errors;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -48,4 +49,30 @@ public class TransactionExceptionHierarchyTest {
assertFalse(RefreshRetriableException.class.isAssignableFrom(exceptionClass),
exceptionClass.getSimpleName() + " should NOT extend
RefreshRetriableException");
}
+
+ /**
+ * Verifies that RefreshRetriableException extends RetriableException.
+ */
+ @Test
+ void testRefreshRetriableException() {
+
assertTrue(RetriableException.class.isAssignableFrom(RefreshRetriableException.class),
+ "RefreshRetriableException should extend RetriableException");
+ }
+
+ /**
+ * Verifies that the given exception class extends
`RefreshRetriableException`
+ *
+ * @param exceptionClass the exception class to check
+ */
+ @ParameterizedTest
+ @ValueSource(classes = {
+ UnknownTopicOrPartitionException.class,
+ NotLeaderOrFollowerException.class,
+ NotCoordinatorException.class,
+ CoordinatorNotAvailableException.class
+ })
+ void testRefreshRetriableExceptionHierarchy(Class<? extends Exception>
exceptionClass) {
+
assertTrue(RefreshRetriableException.class.isAssignableFrom(exceptionClass),
+ exceptionClass.getSimpleName() + " should extend
RefreshRetriableException");
+ }
}