This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 555744da70 KAFKA-14124: improve quorum controller fault handling 
(#12447)
555744da70 is described below

commit 555744da7040f1ad91decc3cf2b813285af60aa2
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Thu Aug 4 22:49:45 2022 -0700

    KAFKA-14124: improve quorum controller fault handling (#12447)
    
    Before trying to commit a batch of records to the __cluster_metadata log, 
the active controller
    should try to apply them to its current in-memory state. If this 
application process fails, the
    active controller process should exit, allowing another node to take 
leadership. This will prevent
    most bad metadata records from ending up in the log and help to surface 
errors during testing.
    
    Similarly, if the active controller attempts to renounce leadership, and 
the renunciation process
    itself fails, the process should exit. This will help avoid bugs where the 
active controller
    continues in an undefined state.
    
    In contrast, standby controllers that experience metadata application 
errors should continue on, in
    order to avoid a scenario where a bad record brings down the whole 
controller cluster.  The
    intended effect of these changes is to make it harder to commit a bad 
record to the metadata log,
    but to continue to ride out the bad record as well as possible if such a 
record does get committed.
    
    This PR introduces the FaultHandler interface to implement these concepts. 
In junit tests, we use a
    FaultHandler implementation which does not exit the process. This allows us 
to avoid terminating
    the gradle test runner, which would be very disruptive. It also allows us 
to ensure that the test
    surfaces these exceptions, which we previously were not doing (the mock 
fault handler stores the
    exception).
    
    In addition to the above, this PR fixes a bug where RaftClient#resign was 
not being called from the
    renounce() function. This bug could have resulted in the raft layer not 
being informed of an active
    controller resigning.
    
    Reviewers: David Arthur <mum...@gmail.com>
---
 build.gradle                                       |   2 +
 checkstyle/import-control-core.xml                 |   1 +
 checkstyle/import-control.xml                      |   4 +
 checkstyle/suppressions.xml                        |   2 +
 .../main/scala/kafka/server/ControllerServer.scala |  10 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   6 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  36 +-
 .../kafka/server/QuorumTestHarness.scala           |   6 +
 .../apache/kafka/controller/QuorumController.java  | 382 ++++++++++++---------
 .../metadata/fault/MetadataFaultException.java     |  32 ++
 .../kafka/metadata/fault/MetadataFaultHandler.java |  36 ++
 .../kafka/controller/QuorumControllerTest.java     |  25 ++
 .../kafka/controller/QuorumControllerTestEnv.java  |  15 +
 .../apache/kafka/server/fault/FaultHandler.java    |  58 ++++
 .../server/fault/ProcessExitingFaultHandler.java   |  37 ++
 .../kafka/server/fault/MockFaultHandler.java       |  65 ++++
 .../server/fault/MockFaultHandlerException.java    |  38 ++
 17 files changed, 586 insertions(+), 169 deletions(-)

diff --git a/build.gradle b/build.gradle
index 54068f2977..e1f773075a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -897,6 +897,7 @@ project(':core') {
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':metadata').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
     testImplementation(libs.apacheda) {
@@ -1179,6 +1180,7 @@ project(':metadata') {
     testImplementation libs.slf4jlog4j
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
     generator project(':generator')
   }
 
diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index 28b325b093..4042cba402 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -54,6 +54,7 @@
     <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.metalog" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
   </subpackage>
 
   <subpackage name="tools">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 211d23ff60..4b07a26cba 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -232,6 +232,7 @@
     <allow pkg="org.apache.kafka.raft" />
     <allow pkg="org.apache.kafka.server.authorizer" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
     <allow pkg="org.apache.kafka.server.metrics" />
     <allow pkg="org.apache.kafka.server.policy"/>
     <allow pkg="org.apache.kafka.snapshot" />
@@ -276,6 +277,9 @@
       <allow pkg="org.apache.kafka.controller" />
       <allow pkg="org.apache.kafka.metadata" />
     </subpackage>
+    <subpackage name="fault">
+      <allow pkg="org.apache.kafka.server.fault" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="metalog">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f6ca0d02fe..bec3da1637 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,8 @@
     <suppress 
checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
               
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" 
files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
+    <suppress checks="MethodLength"
+              files="(KafkaClusterTestKit).java"/>
 
     <!-- Clients -->
     <suppress id="dontUseSystemExit"
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index cff88d2b6b..212c092e1a 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.OptionalLong
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, TimeUnit}
-
 import kafka.cluster.Broker.ServerInfo
 import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
@@ -46,6 +45,7 @@ import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 
@@ -65,7 +65,9 @@ class ControllerServer(
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]],
   val configSchema: KafkaConfigSchema,
   val raftApiVersions: ApiVersions,
-  val bootstrapMetadata: BootstrapMetadata
+  val bootstrapMetadata: BootstrapMetadata,
+  val metadataFaultHandler: FaultHandler,
+  val fatalFaultHandler: FaultHandler,
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
@@ -204,7 +206,9 @@ class ControllerServer(
           setAlterConfigPolicy(alterConfigPolicy.asJava).
           setConfigurationValidator(new ControllerConfigurationValidator()).
           setStaticConfig(config.originals).
-          setBootstrapMetadata(bootstrapMetadata)
+          setBootstrapMetadata(bootstrapMetadata).
+          setMetadataFaultHandler(metadataFaultHandler).
+          setFatalFaultHandler(fatalFaultHandler)
       }
       authorizer match {
         case Some(a: ClusterMetadataAuthorizer) => 
controllerBuilder.setAuthorizer(a)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 07a3118372..e7cf8f8f1f 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -29,9 +29,11 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.metadata.fault.MetadataFaultHandler
 import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.nio.file.Paths
@@ -106,7 +108,9 @@ class KafkaRaftServer(
       controllerQuorumVotersFuture,
       KafkaRaftServer.configSchema,
       raftManager.apiVersions,
-      bootstrapMetadata
+      bootstrapMetadata,
+      new MetadataFaultHandler(),
+      new ProcessExitingFaultHandler(),
     ))
   } else {
     None
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java 
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index c961d71bbe..42120324f5 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -40,6 +40,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
     public static class Builder {
         private TestKitNodes nodes;
         private Map<String, String> configProps = new HashMap<>();
+        private MockFaultHandler metadataFaultHandler = new 
MockFaultHandler("metadataFaultHandler");
+        private MockFaultHandler fatalFaultHandler = new 
MockFaultHandler("fatalFaultHandler");
 
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
@@ -190,7 +193,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         connectFutureManager.future,
                         KafkaRaftServer.configSchema(),
                         raftManager.apiVersions(),
-                        bootstrapMetadata
+                        bootstrapMetadata,
+                        metadataFaultHandler,
+                        fatalFaultHandler
                     );
                     controllers.put(node.id(), controller);
                     
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@@ -273,7 +278,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 throw e;
             }
             return new KafkaClusterTestKit(executorService, nodes, controllers,
-                brokers, raftManagers, connectFutureManager, baseDirectory);
+                brokers, raftManagers, connectFutureManager, baseDirectory,
+                metadataFaultHandler, fatalFaultHandler);
         }
 
         private String listeners(int node) {
@@ -314,14 +320,20 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
     private final Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> 
raftManagers;
     private final ControllerQuorumVotersFutureManager 
controllerQuorumVotersFutureManager;
     private final File baseDirectory;
-
-    private KafkaClusterTestKit(ExecutorService executorService,
-                                TestKitNodes nodes,
-                                Map<Integer, ControllerServer> controllers,
-                                Map<Integer, BrokerServer> brokers,
-                                Map<Integer, 
KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
-                                ControllerQuorumVotersFutureManager 
controllerQuorumVotersFutureManager,
-                                File baseDirectory) {
+    private final MockFaultHandler metadataFaultHandler;
+    private final MockFaultHandler fatalFaultHandler;
+
+    private KafkaClusterTestKit(
+        ExecutorService executorService,
+        TestKitNodes nodes,
+        Map<Integer, ControllerServer> controllers,
+        Map<Integer, BrokerServer> brokers,
+        Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
+        ControllerQuorumVotersFutureManager 
controllerQuorumVotersFutureManager,
+        File baseDirectory,
+        MockFaultHandler metadataFaultHandler,
+        MockFaultHandler fatalFaultHandler
+    ) {
         this.executorService = executorService;
         this.nodes = nodes;
         this.controllers = controllers;
@@ -329,6 +341,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.raftManagers = raftManagers;
         this.controllerQuorumVotersFutureManager = 
controllerQuorumVotersFutureManager;
         this.baseDirectory = baseDirectory;
+        this.metadataFaultHandler = metadataFaultHandler;
+        this.fatalFaultHandler = fatalFaultHandler;
     }
 
     public void format() throws Exception {
@@ -520,6 +534,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
             executorService.shutdownNow();
             executorService.awaitTermination(5, TimeUnit.MINUTES);
         }
+        metadataFaultHandler.maybeRethrowFirstException();
+        fatalFaultHandler.maybeRethrowFirstException();
     }
 
     private void waitForAllFutures(List<Entry<String, Future<?>>> 
futureEntries)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index a2393cdccb..9894df9c5f 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.controller.BootstrapMetadata
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.MockFaultHandler
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
@@ -188,6 +189,8 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
+  val faultHandler = new MockFaultHandler("quorumTestHarnessFaultHandler")
+
   // Note: according to the junit documentation: "JUnit Jupiter does not 
guarantee the execution
   // order of multiple @BeforeEach methods that are declared within a single 
test class or test
   // interface." Therefore, if you have things you would like to do before 
each test case runs, it
@@ -308,6 +311,8 @@ abstract class QuorumTestHarness extends Logging {
         configSchema = KafkaRaftServer.configSchema,
         raftApiVersions = raftManager.apiVersions,
         bootstrapMetadata = BootstrapMetadata.create(metadataVersion, 
bootstrapRecords.asJava),
+        metadataFaultHandler = faultHandler,
+        fatalFaultHandler = faultHandler,
       )
       controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) 
=> {
         if (e != null) {
@@ -374,6 +379,7 @@ abstract class QuorumTestHarness extends Logging {
     }
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     Configuration.setConfiguration(null)
+    faultHandler.maybeRethrowFirstException()
   }
 
   // Trigger session expiry by reusing the session id in another client
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0290e0040c..a4cc1d92cb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -91,6 +91,7 @@ import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -149,6 +150,8 @@ public final class QuorumController implements Controller {
     static public class Builder {
         private final int nodeId;
         private final String clusterId;
+        private FaultHandler fatalFaultHandler = null;
+        private FaultHandler metadataFaultHandler = null;
         private Time time = Time.SYSTEM;
         private String threadNamePrefix = null;
         private LogContext logContext = null;
@@ -175,6 +178,16 @@ public final class QuorumController implements Controller {
             this.clusterId = clusterId;
         }
 
+        public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
+            this.fatalFaultHandler = fatalFaultHandler;
+            return this;
+        }
+
+        public Builder setMetadataFaultHandler(FaultHandler 
metadataFaultHandler) {
+            this.metadataFaultHandler = metadataFaultHandler;
+            return this;
+        }
+
         public int nodeId() {
             return nodeId;
         }
@@ -287,6 +300,10 @@ public final class QuorumController implements Controller {
                 throw new IllegalStateException("You must specify an initial 
metadata.version using the kafka-storage tool.");
             } else if (quorumFeatures == null) {
                 throw new IllegalStateException("You must specify the quorum 
features");
+            } else if (fatalFaultHandler == null) {
+                throw new IllegalStateException("You must specify a fatal 
fault handler.");
+            } else if (metadataFaultHandler == null) {
+                throw new IllegalStateException("You must specify a metadata 
fault handler.");
             }
 
             if (threadNamePrefix == null) {
@@ -304,6 +321,8 @@ public final class QuorumController implements Controller {
             try {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix 
+ "QuorumController");
                 return new QuorumController(
+                    fatalFaultHandler,
+                    metadataFaultHandler,
                     logContext,
                     nodeId,
                     clusterId,
@@ -426,12 +445,18 @@ public final class QuorumController implements Controller 
{
                 exception.getClass().getSimpleName(), deltaUs);
             return exception;
         }
-        log.warn("{}: failed with unknown server exception {} at epoch {} in 
{} us.  " +
-                "Renouncing leadership and reverting to the last committed 
offset {}.",
-            name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
-            lastCommittedOffset, exception);
-        raftClient.resign(curClaimEpoch);
-        renounce();
+        if (isActiveController()) {
+            log.warn("{}: failed with unknown server exception {} at epoch {} 
in {} us.  " +
+                    "Renouncing leadership and reverting to the last committed 
offset {}.",
+                    name, exception.getClass().getSimpleName(), curClaimEpoch, 
deltaUs,
+                    lastCommittedOffset, exception);
+            renounce();
+        } else {
+            log.warn("{}: failed with unknown server exception {} in {} us.  " 
+
+                    "The controller is already in standby mode.",
+                    name, exception.getClass().getSimpleName(), deltaUs,
+                    exception);
+        }
         return new UnknownServerException(exception);
     }
 
@@ -702,7 +727,7 @@ public final class QuorumController implements Controller {
             long now = time.nanoseconds();
             controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
             int controllerEpoch = curClaimEpoch;
-            if (controllerEpoch == -1) {
+            if (!isActiveController()) {
                 throw newNotControllerException();
             }
             startProcessingTimeNs = OptionalLong.of(now);
@@ -728,9 +753,26 @@ public final class QuorumController implements Controller {
                         "reaches offset {}", this, resultAndOffset.offset());
                 }
             } else {
-                // If the operation returned a batch of records, those records 
need to be
-                // written before we can return our result to the user.  Here, 
we hand off
-                // the batch of records to the raft client.  They will be 
written out
+                // Start by trying to apply the record to our in-memory state. 
This should always
+                // succeed; if it does not, that's a fatal error. It is 
important to do this before
+                // scheduling the record for Raft replication.
+                int i = 1;
+                for (ApiMessageAndVersion message : result.records()) {
+                    try {
+                        replay(message.message(), Optional.empty());
+                    } catch (Throwable e) {
+                        String failureMessage = String.format("Unable to apply 
%s record, which was " +
+                            "%d of %d record(s) in the batch following last 
writeOffset %d.",
+                            message.message().getClass().getSimpleName(), i, 
result.records().size(),
+                            writeOffset);
+                        fatalFaultHandler.handleFault(failureMessage, e);
+                    }
+                    i++;
+                }
+
+                // If the operation returned a batch of records, and those 
records could be applied,
+                // they need to be written before we can return our result to 
the user.  Here, we
+                // hand off the batch of records to the raft client.  They 
will be written out
                 // asynchronously.
                 final long offset;
                 if (result.isAtomic()) {
@@ -741,9 +783,6 @@ public final class QuorumController implements Controller {
                 op.processBatchEndOffset(offset);
                 updateWriteOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
-                for (ApiMessageAndVersion message : result.records()) {
-                    replay(message.message(), Optional.empty(), offset);
-                }
                 snapshotRegistry.getOrCreateSnapshot(offset);
 
                 log.debug("Read-write operation {} will be completed when the 
log " +
@@ -789,9 +828,9 @@ public final class QuorumController implements Controller {
         return event.future();
     }
 
-    private <T> CompletableFuture<T> appendWriteEvent(String name,
-                                                      OptionalLong deadlineNs,
-                                                      
ControllerWriteOperation<T> op) {
+    <T> CompletableFuture<T> appendWriteEvent(String name,
+                                              OptionalLong deadlineNs,
+                                              ControllerWriteOperation<T> op) {
         ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
         if (deadlineNs.isPresent()) {
             queue.appendWithDeadline(deadlineNs.getAsLong(), event);
@@ -841,11 +880,20 @@ public final class QuorumController implements Controller 
{
                                         "offset {} and epoch {}.", offset, 
epoch);
                                 }
                             }
-                            for (ApiMessageAndVersion messageAndVersion : 
messages) {
-                                replay(messageAndVersion.message(), 
Optional.empty(), offset);
+                            int i = 1;
+                            for (ApiMessageAndVersion message : messages) {
+                                try {
+                                    replay(message.message(), 
Optional.empty());
+                                } catch (Throwable e) {
+                                    String failureMessage = 
String.format("Unable to apply %s record on standby " +
+                                            "controller, which was %d of %d 
record(s) in the batch with baseOffset %d.",
+                                            
message.message().getClass().getSimpleName(), i, messages.size(),
+                                            batch.baseOffset());
+                                    
metadataFaultHandler.handleFault(failureMessage, e);
+                                }
+                                i++;
                             }
                         }
-
                         updateLastCommittedState(offset, epoch, 
batch.appendTimestamp());
                         processedRecordsSize += batch.sizeInBytes();
                     }
@@ -862,13 +910,9 @@ public final class QuorumController implements Controller {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", 
reader.snapshotId()), () -> {
                 try {
                     if (isActiveController()) {
-                        throw new IllegalStateException(
-                            String.format(
-                                "Asked to load snapshot (%s) when it is the 
active controller (%d)",
-                                reader.snapshotId(),
-                                curClaimEpoch
-                            )
-                        );
+                        fatalFaultHandler.handleFault(String.format("Asked to 
load snapshot " +
+                            "(%s) when it is the active controller (%d)", 
reader.snapshotId(),
+                            curClaimEpoch));
                     }
                     log.info("Starting to replay snapshot ({}), from last 
commit offset ({}) and epoch ({})",
                         reader.snapshotId(), lastCommittedOffset, 
lastCommittedEpoch);
@@ -882,26 +926,28 @@ public final class QuorumController implements Controller 
{
 
                         if (log.isDebugEnabled()) {
                             if (log.isTraceEnabled()) {
-                                log.trace(
-                                    "Replaying snapshot ({}) batch with last 
offset of {}: {}",
-                                    reader.snapshotId(),
-                                    offset,
-                                    messages
-                                        .stream()
-                                        .map(ApiMessageAndVersion::toString)
-                                        .collect(Collectors.joining(", "))
-                                );
+                                log.trace("Replaying snapshot ({}) batch with 
last offset of {}: {}",
+                                    reader.snapshotId(), offset, 
messages.stream().map(ApiMessageAndVersion::toString).
+                                        collect(Collectors.joining(", ")));
                             } else {
-                                log.debug(
-                                    "Replaying snapshot ({}) batch with last 
offset of {}",
-                                    reader.snapshotId(),
-                                    offset
-                                );
+                                log.debug("Replaying snapshot ({}) batch with 
last offset of {}",
+                                    reader.snapshotId(), offset);
                             }
                         }
 
-                        for (ApiMessageAndVersion messageAndVersion : 
messages) {
-                            replay(messageAndVersion.message(), 
Optional.of(reader.snapshotId()), offset);
+                        int i = 1;
+                        for (ApiMessageAndVersion message : messages) {
+                            try {
+                                replay(message.message(), 
Optional.of(reader.snapshotId()));
+                            } catch (Throwable e) {
+                                String failureMessage = String.format("Unable 
to apply %s record " +
+                                        "from snapshot %s on standby 
controller, which was %d of " +
+                                        "%d record(s) in the batch with 
baseOffset %d.",
+                                        
message.message().getClass().getSimpleName(), reader.snapshotId(),
+                                        i, messages.size(), 
batch.baseOffset());
+                                
metadataFaultHandler.handleFault(failureMessage, e);
+                            }
+                            i++;
                         }
                     }
                     updateLastCommittedState(
@@ -968,10 +1014,14 @@ public final class QuorumController implements 
Controller {
                             if (exception != null) {
                                 log.error("Failed to bootstrap metadata.", 
exception);
                                 appendRaftEvent("bootstrapMetadata[" + 
curClaimEpoch + "]", () -> {
-                                    log.warn("Renouncing the leadership at 
oldEpoch {} since we could not bootstrap " +
-                                             "metadata. Reverting to last 
committed offset {}.",
-                                        curClaimEpoch, lastCommittedOffset);
-                                    renounce();
+                                    if (isActiveController()) {
+                                        log.warn("Renouncing the leadership at 
oldEpoch {} since we could not bootstrap " +
+                                                        "metadata. Reverting 
to last committed offset {}.",
+                                                curClaimEpoch, 
lastCommittedOffset);
+                                        renounce();
+                                    } else {
+                                        log.warn("Unable to bootstrap metadata 
on standby controller.");
+                                    }
                                 });
                             }
                         });
@@ -998,10 +1048,12 @@ public final class QuorumController implements 
Controller {
                 });
             } else if (isActiveController()) {
                 appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> 
{
-                    log.warn("Renouncing the leadership at oldEpoch {} due to 
a metadata " +
-                             "log event. Reverting to last committed offset 
{}.", curClaimEpoch,
-                        lastCommittedOffset);
-                    renounce();
+                    if (isActiveController()) {
+                        log.warn("Renouncing the leadership at oldEpoch {} due 
to a metadata " +
+                                "log event. Reverting to last committed offset 
{}.", curClaimEpoch,
+                                lastCommittedOffset);
+                        renounce();
+                    }
                 });
             }
         }
@@ -1078,26 +1130,34 @@ public final class QuorumController implements 
Controller {
     }
 
     private void renounce() {
-        curClaimEpoch = -1;
-        controllerMetrics.setActive(false);
-        purgatory.failAll(newNotControllerException());
-
-        if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-            newBytesSinceLastSnapshot = 0;
-            snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-            authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
-        } else {
-            resetState();
-            raftClient.unregister(metaLogListener);
-            metaLogListener = new QuorumMetaLogListener();
-            raftClient.register(metaLogListener);
+        try {
+            if (curClaimEpoch == -1) {
+                throw new RuntimeException("Cannot renounce leadership because 
we are not the " +
+                        "current leader.");
+            }
+            raftClient.resign(curClaimEpoch);
+            curClaimEpoch = -1;
+            controllerMetrics.setActive(false);
+            purgatory.failAll(newNotControllerException());
+
+            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                newBytesSinceLastSnapshot = 0;
+                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+                authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
+            } else {
+                resetState();
+                raftClient.unregister(metaLogListener);
+                metaLogListener = new QuorumMetaLogListener();
+                raftClient.register(metaLogListener);
+            }
+            updateWriteOffset(-1);
+            clusterControl.deactivate();
+            cancelMaybeFenceReplicas();
+            cancelMaybeBalancePartitionLeaders();
+            cancelNextWriteNoOpRecord();
+        } catch (Throwable e) {
+            fatalFaultHandler.handleFault("exception while renouncing 
leadership", e);
         }
-
-        updateWriteOffset(-1);
-        clusterControl.deactivate();
-        cancelMaybeFenceReplicas();
-        cancelMaybeBalancePartitionLeaders();
-        cancelNextWriteNoOpRecord();
     }
 
     private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
@@ -1246,70 +1306,60 @@ public final class QuorumController implements 
Controller {
     }
 
     @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId, long offset) {
-        try {
-            MetadataRecordType type = 
MetadataRecordType.fromId(message.apiKey());
-            switch (type) {
-                case REGISTER_BROKER_RECORD:
-                    clusterControl.replay((RegisterBrokerRecord) message);
-                    break;
-                case UNREGISTER_BROKER_RECORD:
-                    clusterControl.replay((UnregisterBrokerRecord) message);
-                    break;
-                case TOPIC_RECORD:
-                    replicationControl.replay((TopicRecord) message);
-                    break;
-                case PARTITION_RECORD:
-                    replicationControl.replay((PartitionRecord) message);
-                    break;
-                case CONFIG_RECORD:
-                    configurationControl.replay((ConfigRecord) message);
-                    break;
-                case PARTITION_CHANGE_RECORD:
-                    replicationControl.replay((PartitionChangeRecord) message);
-                    break;
-                case FENCE_BROKER_RECORD:
-                    clusterControl.replay((FenceBrokerRecord) message);
-                    break;
-                case UNFENCE_BROKER_RECORD:
-                    clusterControl.replay((UnfenceBrokerRecord) message);
-                    break;
-                case REMOVE_TOPIC_RECORD:
-                    replicationControl.replay((RemoveTopicRecord) message);
-                    break;
-                case FEATURE_LEVEL_RECORD:
-                    featureControl.replay((FeatureLevelRecord) message);
-                    handleFeatureControlChange();
-                    break;
-                case CLIENT_QUOTA_RECORD:
-                    clientQuotaControlManager.replay((ClientQuotaRecord) 
message);
-                    break;
-                case PRODUCER_IDS_RECORD:
-                    producerIdControlManager.replay((ProducerIdsRecord) 
message);
-                    break;
-                case BROKER_REGISTRATION_CHANGE_RECORD:
-                    clusterControl.replay((BrokerRegistrationChangeRecord) 
message);
-                    break;
-                case ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((AccessControlEntryRecord) 
message, snapshotId);
-                    break;
-                case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((RemoveAccessControlEntryRecord) 
message, snapshotId);
-                    break;
-                case NO_OP_RECORD:
-                    // NoOpRecord is an empty record and doesn't need to be 
replayed
-                    break;
-                default:
-                    throw new RuntimeException("Unhandled record type " + 
type);
-            }
-        } catch (Exception e) {
-            if (snapshotId.isPresent()) {
-                log.error("Error replaying record {} from snapshot {} at last 
offset {}.",
-                    message.toString(), snapshotId.get(), offset, e);
-            } else {
-                log.error("Error replaying record {} at last offset {}.",
-                    message.toString(), offset, e);
-            }
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                clusterControl.replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                clusterControl.replay((UnregisterBrokerRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replicationControl.replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replicationControl.replay((PartitionRecord) message);
+                break;
+            case CONFIG_RECORD:
+                configurationControl.replay((ConfigRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replicationControl.replay((PartitionChangeRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                clusterControl.replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                clusterControl.replay((UnfenceBrokerRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replicationControl.replay((RemoveTopicRecord) message);
+                break;
+            case FEATURE_LEVEL_RECORD:
+                featureControl.replay((FeatureLevelRecord) message);
+                handleFeatureControlChange();
+                break;
+            case CLIENT_QUOTA_RECORD:
+                clientQuotaControlManager.replay((ClientQuotaRecord) message);
+                break;
+            case PRODUCER_IDS_RECORD:
+                producerIdControlManager.replay((ProducerIdsRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                clusterControl.replay((BrokerRegistrationChangeRecord) 
message);
+                break;
+            case ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((AccessControlEntryRecord) message, 
snapshotId);
+                break;
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((RemoveAccessControlEntryRecord) 
message, snapshotId);
+                break;
+            case NO_OP_RECORD:
+                // NoOpRecord is an empty record and doesn't need to be 
replayed
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
         }
     }
 
@@ -1344,8 +1394,24 @@ public final class QuorumController implements 
Controller {
         updateLastCommittedState(-1, -1, -1);
     }
 
+    /**
+     * Handles faults that should normally be fatal to the process.
+     */
+    private final FaultHandler fatalFaultHandler;
+
+    /**
+     * Handles faults in metadata handling that are normally not fatal.
+     */
+    private final FaultHandler metadataFaultHandler;
+
+    /**
+     * The slf4j log context, used to create new loggers.
+     */
     private final LogContext logContext;
 
+    /**
+     * The slf4j logger.
+     */
     private final Logger log;
 
     /**
@@ -1530,28 +1596,34 @@ public final class QuorumController implements 
Controller {
 
     private final BootstrapMetadata bootstrapMetadata;
 
-    private QuorumController(LogContext logContext,
-                             int nodeId,
-                             String clusterId,
-                             KafkaEventQueue queue,
-                             Time time,
-                             KafkaConfigSchema configSchema,
-                             RaftClient<ApiMessageAndVersion> raftClient,
-                             QuorumFeatures quorumFeatures,
-                             short defaultReplicationFactor,
-                             int defaultNumPartitions,
-                             ReplicaPlacer replicaPlacer,
-                             long snapshotMaxNewRecordBytes,
-                             OptionalLong leaderImbalanceCheckIntervalNs,
-                             OptionalLong maxIdleIntervalNs,
-                             long sessionTimeoutNs,
-                             ControllerMetrics controllerMetrics,
-                             Optional<CreateTopicPolicy> createTopicPolicy,
-                             Optional<AlterConfigPolicy> alterConfigPolicy,
-                             ConfigurationValidator configurationValidator,
-                             Optional<ClusterMetadataAuthorizer> authorizer,
-                             Map<String, Object> staticConfig,
-                             BootstrapMetadata bootstrapMetadata) {
+    private QuorumController(
+        FaultHandler fatalFaultHandler,
+        FaultHandler metadataFaultHandler,
+        LogContext logContext,
+        int nodeId,
+        String clusterId,
+        KafkaEventQueue queue,
+        Time time,
+        KafkaConfigSchema configSchema,
+        RaftClient<ApiMessageAndVersion> raftClient,
+        QuorumFeatures quorumFeatures,
+        short defaultReplicationFactor,
+        int defaultNumPartitions,
+        ReplicaPlacer replicaPlacer,
+        long snapshotMaxNewRecordBytes,
+        OptionalLong leaderImbalanceCheckIntervalNs,
+        OptionalLong maxIdleIntervalNs,
+        long sessionTimeoutNs,
+        ControllerMetrics controllerMetrics,
+        Optional<CreateTopicPolicy> createTopicPolicy,
+        Optional<AlterConfigPolicy> alterConfigPolicy,
+        ConfigurationValidator configurationValidator,
+        Optional<ClusterMetadataAuthorizer> authorizer,
+        Map<String, Object> staticConfig,
+        BootstrapMetadata bootstrapMetadata
+    ) {
+        this.fatalFaultHandler = fatalFaultHandler;
+        this.metadataFaultHandler = metadataFaultHandler;
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
new file mode 100644
index 0000000000..c57ce46fb3
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+
+/**
+ * A fault that we encountered while we replayed cluster metadata.
+ */
+public class MetadataFaultException extends RuntimeException {
+    public MetadataFaultException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MetadataFaultException(String message) {
+        super(message);
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
new file mode 100644
index 0000000000..e9f71b80e6
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles faults in Kafka metadata management.
+ */
+public class MetadataFaultHandler implements FaultHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(MetadataFaultHandler.class);
+
+    @Override
+    public void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        throw new MetadataFaultException("Encountered metadata fault: " + 
failureMessage, cause);
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a62b1f682f..5e395cebcb 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
@@ -1181,6 +1182,30 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testFatalMetadataReplayErrorOnActive() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv(logEnv, b -> {
+            })) {
+                QuorumController active = controlEnv.activeController();
+                CompletableFuture<Void> future = 
active.appendWriteEvent("errorEvent",
+                        OptionalLong.empty(), () -> {
+                            return 
ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
+                                    new ConfigRecord().
+                                            setName(null).
+                                            setResourceName(null).
+                                            setResourceType((byte) 255).
+                                            setValue(null), (short) 0)), null);
+                        });
+                assertThrows(ExecutionException.class, () -> future.get());
+                assertEquals(NullPointerException.class,
+                        
controlEnv.fatalFaultHandler().firstException().getCause().getClass());
+                controlEnv.fatalFaultHandler().setIgnore(true);
+                controlEnv.metadataFaultHandler().setIgnore(true);
+            }
+        }
+    }
+
     private static void 
assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
         for (int i = 0; i < authorizers.size(); i++) {
             assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 4cc45a9774..40dd21c88d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -34,6 +34,7 @@ import org.apache.kafka.controller.QuorumController.Builder;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ public class QuorumControllerTestEnv implements AutoCloseable 
{
 
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
+    private final MockFaultHandler fatalFaultHandler = new 
MockFaultHandler("fatalFaultHandler");
+    private final MockFaultHandler metadataFaultHandler = new 
MockFaultHandler("metadataFaultHandler");
 
     public QuorumControllerTestEnv(
         LocalLogManagerTestEnv logEnv,
@@ -84,6 +87,8 @@ public class QuorumControllerTestEnv implements AutoCloseable 
{
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, 
TimeUnit.MILLISECONDS));
                 });
+                builder.setFatalFaultHandler(fatalFaultHandler);
+                builder.setMetadataFaultHandler(metadataFaultHandler);
                 builderConsumer.accept(builder);
                 this.controllers.add(builder.build());
             }
@@ -117,6 +122,14 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
         return controllers;
     }
 
+    public MockFaultHandler fatalFaultHandler() {
+        return fatalFaultHandler;
+    }
+
+    public MockFaultHandler metadataFaultHandler() {
+        return metadataFaultHandler;
+    }
+
     @Override
     public void close() throws InterruptedException {
         for (QuorumController controller : controllers) {
@@ -125,5 +138,7 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
         for (QuorumController controller : controllers) {
             controller.close();
         }
+        fatalFaultHandler.maybeRethrowFirstException();
+        metadataFaultHandler.maybeRethrowFirstException();
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
new file mode 100644
index 0000000000..4c03eacc32
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+
+
+/**
+ * Handle a server fault.
+ */
+public interface FaultHandler {
+    /**
+     * Handle a fault.
+     *
+     * @param failureMessage        The failure message to log.
+     */
+    default void handleFault(String failureMessage) {
+        handleFault(failureMessage, null);
+    }
+
+    /**
+     * Handle a fault.
+     *
+     * @param failureMessage        The failure message to log.
+     * @param cause                 The exception that caused the problem, or 
null.
+     */
+    void handleFault(String failureMessage, Throwable cause);
+
+    /**
+     * Log a failure message about a fault.
+     *
+     * @param log               The log4j logger.
+     * @param failureMessage    The failure message.
+     * @param cause             The exception which caused the failure, or 
null.
+     */
+    static void logFailureMessage(Logger log, String failureMessage, Throwable 
cause) {
+        if (cause == null) {
+            log.error("Encountered fatal fault: {}", failureMessage);
+        } else {
+            log.error("Encountered fatal fault: {}", failureMessage, cause);
+        }
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
new file mode 100644
index 0000000000..e3b9f25a3b
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.common.utils.Exit;
+
+
+/**
+ * This is a fault handler which exits the Java process.
+ */
+public class ProcessExitingFaultHandler implements FaultHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(ProcessExitingFaultHandler.class);
+
+    @Override
+    public void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        Exit.exit(1);
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
 
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
new file mode 100644
index 0000000000..39b3ed0784
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a fault handler suitable for use in JUnit tests. It will store the 
result of the first
+ * call to handleFault that was made.
+ */
+public class MockFaultHandler implements FaultHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(MockFaultHandler.class);
+
+    private final String name;
+    private MockFaultHandlerException firstException = null;
+    private boolean ignore = false;
+
+    public MockFaultHandler(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public synchronized void handleFault(String failureMessage, Throwable 
cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        MockFaultHandlerException e = (cause == null) ?
+                new MockFaultHandlerException(name + ": " + failureMessage) :
+                new MockFaultHandlerException(name + ": " + failureMessage +
+                        ": " + cause.getMessage(), cause);
+        if (firstException == null) {
+            firstException = e;
+        }
+        throw e;
+    }
+
+    public synchronized void maybeRethrowFirstException() {
+        if (firstException != null && !ignore) {
+            throw firstException;
+        }
+    }
+
+    public synchronized MockFaultHandlerException firstException() {
+        return firstException;
+    }
+
+    public synchronized void setIgnore(boolean ignore) {
+        this.ignore = ignore;
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
 
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
new file mode 100644
index 0000000000..ef9b11bdeb
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+
+/**
+ * An exception thrown by MockFaultHandler.
+ */
+public class MockFaultHandlerException extends RuntimeException {
+    public MockFaultHandlerException(String failureMessage, Throwable cause) {
+        super(failureMessage, cause);
+        // If a cause exception was provided, set our the stack trace its 
stack trace. This is
+        // useful in junit tests where a limited number of stack frames are 
printed, and usually
+        // the stack frames of cause exceptions get trimmed.
+        if (cause != null) {
+            setStackTrace(cause.getStackTrace());
+        }
+    }
+
+    public MockFaultHandlerException(String failureMessage) {
+        this(failureMessage, null);
+    }
+}

Reply via email to