This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bf15205647e KAFKA-19350 Don't propagate the error caused by
CreateTopicPolicy to FatalFaultHandler (#19857)
bf15205647e is described below
commit bf15205647e8701ceb58d03e805b8549267a2a5d
Author: S.Y. Wang <[email protected]>
AuthorDate: Wed Jun 18 01:51:50 2025 +0900
KAFKA-19350 Don't propagate the error caused by CreateTopicPolicy to
FatalFaultHandler (#19857)
`CreateTopicPolicy#validate` may throw unexpected exception other than
`PolicyViolationException`. We should handle this case as well.
Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../controller/ReplicationControlManager.java | 7 +++++
.../controller/ReplicationControlManagerTest.java | 30 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 22c8de053a0..9ad4bba3424 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -871,6 +871,13 @@ public class ReplicationControlManager {
createTopicPolicy.get().validate(supplier.get());
} catch (PolicyViolationException e) {
return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
+ } catch (Throwable e) {
+ // return the corresponding API error, but emit the stack
trace first if it is an unknown server error
+ ApiError apiError = ApiError.fromThrowable(e);
+ if (apiError.error() == Errors.UNKNOWN_SERVER_ERROR) {
+ log.error("Unknown server error validating Create Topic",
e);
+ }
+ return apiError;
}
}
return ApiError.NONE;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 3a33111a318..ff18775e79c 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -143,6 +144,7 @@ import static
org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES
import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
import static
org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
import static
org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static
org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static
org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG;
@@ -920,6 +922,34 @@ public class ReplicationControlManagerTest {
ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}},
POLICY_VIOLATION.code());
}
+ @Test
+ public void testCreateTopicsWithPolicyUnexpectedException() {
+ CreateTopicPolicy policy = new CreateTopicPolicy() {
+ @Override
+ public void validate(RequestMetadata requestMetadata) throws
PolicyViolationException {
+ if (requestMetadata.topic().equals("known_error")) {
+ throw new InvalidTopicException("Known client-server
errors");
+ }
+
+ throw new RuntimeException("Unknown client-server errors");
+ }
+
+ @Override
+ public void close() throws Exception { /* Nothing to do */ }
+
+ @Override
+ public void configure(Map<String, ?> configs) { /* Nothing to do
*/ }
+ };
+
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().
+ setCreateTopicPolicy(policy).
+ build();
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+ ctx.createTestTopic("known_error", 2, (short) 2,
INVALID_TOPIC_EXCEPTION.code());
+ ctx.createTestTopic("blah_error", 2, (short) 2,
UNKNOWN_SERVER_ERROR.code());
+ }
+
@Test
public void testCreateTopicWithCollisionChars() {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().build();