This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 2eb0d66750 AMQ-9747 - Handle IOExceptionHandler thrown exceptions in
KahaDB (#1474)
2eb0d66750 is described below
commit 2eb0d6675008a5e7fd6cbec4a177ab13fde3b8f2
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Jul 23 17:10:31 2025 -0400
AMQ-9747 - Handle IOExceptionHandler thrown exceptions in KahaDB (#1474)
Update the KahaDB scheduled tasks to catch any runtime exceptions thrown
by the configured IOExceptionHandler. This will prevent the tasks from
being killed and no longer running if the IOExceptionHandler does
not shut down the broker.
---
.../activemq/store/kahadb/MessageDatabase.java | 35 +++++++++--
.../apache/activemq/store/kahadb/KahaDBTest.java | 69 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 7 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index dbab306fc7..088d9f297d 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -448,10 +448,10 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
}
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
- brokerService.handleIOException(ioe);
+ handleIOException("CheckpointRunner", ioe);
} catch (Throwable e) {
LOG.error("Checkpoint failed", e);
- brokerService.handleIOException(IOExceptionSupport.create(e));
+ handleIOException("CheckpointRunner",
IOExceptionSupport.create(e));
}
}
}
@@ -2120,10 +2120,10 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
forwarded = true;
} catch (IOException ioe) {
LOG.error("Forwarding of acks failed", ioe);
- brokerService.handleIOException(ioe);
+ handleIOException("AckCompactionRunner", ioe);
} catch (Throwable e) {
LOG.error("Forwarding of acks failed", e);
-
brokerService.handleIOException(IOExceptionSupport.create(e));
+ handleIOException("AckCompactionRunner",
IOExceptionSupport.create(e));
}
} finally {
checkpointLock.readLock().unlock();
@@ -2136,10 +2136,10 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
}
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
- brokerService.handleIOException(ioe);
+ handleIOException("AckCompactionRunner", ioe);
} catch (Throwable e) {
LOG.error("Checkpoint failed", e);
- brokerService.handleIOException(IOExceptionSupport.create(e));
+ handleIOException("AckCompactionRunner",
IOExceptionSupport.create(e));
}
}
}
@@ -4274,4 +4274,27 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
}
}
+
+ /*
+ * Execute the configured IOExceptionHandler when an IOException is thrown
during
+ * task execution and handle any runtime exceptions that the handler
itself might throw.
+ *
+ * By default, the DefaultIOExceptionHandler will stop the broker when
handling an IOException,
+ * however, if DefaultIOExceptionHandler is configured with
startStopConnectors to be true
+ * it will throw a SuppressReplyException and not stop the broker. It's
also possible another
+ * custom implementation of IOExceptionHandler could throw a runtime
exception.
+ *
+ * This method will now handle and log those runtime exceptions so that
the task will not
+ * die and will continue to execute future iterations if the broker is not
shut down.
+ */
+ private void handleIOException(String taskName, IOException ioe) {
+ try {
+ brokerService.handleIOException(ioe);
+ } catch (RuntimeException e) {
+ LOG.warn("IOException handler threw exception in task {} with "
+ + "error: {}, continuing.", taskName,
+ e.getMessage());
+ LOG.debug(e.getMessage(), e);
+ }
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
index e1fc1d7781..d06544f673 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
@@ -27,11 +27,15 @@ import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
+import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
@@ -236,6 +240,69 @@ public class KahaDBTest extends TestCase {
assertFalse("Did not replay any records from the journal",
didSomeRecovery.get());
}
+
+ /**
+ * Test the checkpoint runner task continues to run if the configured
+ * IOExceptionHandler throws a runtime exception while processing
+ * the IOException it is handling and the broker is not shut down. This
+ * could happen if using the DefaultIOExceptionHandler and
startStopConnectors
+ * is set to true, or if a user provides their own IOExceptionHandler that
+ * throws an exception.
+ */
+ public void testCheckpointExceptionKeepRunning() throws Exception {
+ testCheckpointIOException(true);
+ }
+
+ /**
+ * Test the broker shuts down by when DefaultIOExceptionHandler
+ * handles an IOException thrown by the checkpoint runner task. This is the
+ * default behavior of the broker if not configured with a custom
+ * IOExceptionHandler and startStopConnectors is false
+ */
+ public void testCheckpointExceptionShutdown() throws Exception {
+ testCheckpointIOException(false);
+ }
+
+ private void testCheckpointIOException(boolean startStopConnectors) throws
Exception {
+ final AtomicInteger iterations = new AtomicInteger();
+ // Create a store that always throws an IOException when checkpoint is
called
+ final KahaDBStore kaha = new KahaDBStore() {
+ @Override
+ protected void checkpointCleanup(boolean cleanup) throws
IOException {
+ iterations.incrementAndGet();
+ throw new IOException("fail");
+ }
+ };
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ kaha.deleteAllMessages();
+ // Set the checkpoint interval to be very short so we can quickly
+ // check number of iterations
+ kaha.setCheckpointInterval(100);
+
+ BrokerService broker = createBroker(kaha);
+ DefaultIOExceptionHandler ioExceptionHandler = new
DefaultIOExceptionHandler();
+ ioExceptionHandler.setStopStartConnectors(startStopConnectors);
+ broker.setIoExceptionHandler(ioExceptionHandler);
+ broker.start();
+
+ try {
+ if (startStopConnectors) {
+ // If startStopConnectors is true, the task should continue
with future iterations
+ // as the SuppressReplyException that will be thrown is now
handled so just verify
+ // we see 10 iterations which should happen quickly
+ assertTrue(Wait.waitFor(() -> iterations.get() == 10, 2000,
100));
+ // broker should not be stopped
+ assertFalse(broker.isStopped());
+ } else {
+ // If startStopConnectors is false, an IOException should shut
down the broker
+ // which is the normal behavior
+ assertTrue(Wait.waitFor(broker::isStopped, 2000, 100));
+ }
+ } finally {
+ broker.stop();
+ }
+ }
+
private void assertExistsAndDelete(File file) {
assertTrue(file.exists());
file.delete();
@@ -281,4 +348,4 @@ public class KahaDBTest extends TestCase {
return sb.toString();
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact