This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf15205647e KAFKA-19350 Don't propagate the error caused by 
CreateTopicPolicy to FatalFaultHandler (#19857)
bf15205647e is described below

commit bf15205647e8701ceb58d03e805b8549267a2a5d
Author: S.Y. Wang <[email protected]>
AuthorDate: Wed Jun 18 01:51:50 2025 +0900

    KAFKA-19350 Don't propagate the error caused by CreateTopicPolicy to 
FatalFaultHandler (#19857)
    
    `CreateTopicPolicy#validate` may throw unexpected exception  other than
    `PolicyViolationException`. We should handle this case as well.
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../controller/ReplicationControlManager.java      |  7 +++++
 .../controller/ReplicationControlManagerTest.java  | 30 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 22c8de053a0..9ad4bba3424 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -871,6 +871,13 @@ public class ReplicationControlManager {
                 createTopicPolicy.get().validate(supplier.get());
             } catch (PolicyViolationException e) {
                 return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
+            } catch (Throwable e) {
+                // return the corresponding API error, but emit the stack 
trace first if it is an unknown server error
+                ApiError apiError = ApiError.fromThrowable(e);
+                if (apiError.error() == Errors.UNKNOWN_SERVER_ERROR) {
+                    log.error("Unknown server error validating Create Topic", 
e);
+                }
+                return apiError;
             }
         }
         return ApiError.NONE;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 3a33111a318..ff18775e79c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -143,6 +144,7 @@ import static 
org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES
 import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
 import static 
org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
 import static 
org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
 import static 
org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG;
@@ -920,6 +922,34 @@ public class ReplicationControlManagerTest {
         ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}}, 
POLICY_VIOLATION.code());
     }
 
+    @Test
+    public void testCreateTopicsWithPolicyUnexpectedException() {
+        CreateTopicPolicy policy = new CreateTopicPolicy() {
+            @Override
+            public void validate(RequestMetadata requestMetadata) throws 
PolicyViolationException {
+                if (requestMetadata.topic().equals("known_error")) {
+                    throw new InvalidTopicException("Known client-server 
errors");
+                }
+
+                throw new RuntimeException("Unknown client-server errors");
+            }
+
+            @Override
+            public void close() throws Exception { /* Nothing to do */ }
+
+            @Override
+            public void configure(Map<String, ?> configs) { /* Nothing to do 
*/ }
+        };
+
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().
+                setCreateTopicPolicy(policy).
+                build();
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
+        ctx.createTestTopic("known_error", 2, (short) 2, 
INVALID_TOPIC_EXCEPTION.code());
+        ctx.createTestTopic("blah_error", 2, (short) 2, 
UNKNOWN_SERVER_ERROR.code());
+    }
+
     @Test
     public void testCreateTopicWithCollisionChars() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();

Reply via email to