This is an automated email from the ASF dual-hosted git repository.
Croway pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2680601bd627 CAMEL-23494: Do not set preparingShutdown flag during
suspend
2680601bd627 is described below
commit 2680601bd62749875592ecb0ffbc69ac00fc1b91
Author: Croway <[email protected]>
AuthorDate: Wed May 13 10:11:08 2026 +0200
CAMEL-23494: Do not set preparingShutdown flag during suspend
The preparingShutdown flag should only be set during a real shutdown,
not during a suspend (which is temporary and followed by resume).
Skip setting the flag when suspendOnly=true in prepareShutdown() on
RedeliveryErrorHandler and both TransactionErrorHandler implementations.
---
.../apache/camel/jta/TransactionErrorHandler.java | 3 +
.../TransactionErrorHandlerSuspendResumeTest.java | 122 +++++++++++++++++
...sactionalClientDataSourceSuspendResumeTest.java | 94 +++++++++++++
.../camel/spring/spi/TransactionErrorHandler.java | 3 +
.../errorhandler/RedeliveryErrorHandler.java | 3 +
.../RedeliveryErrorHandlerSuspendResumeTest.java | 145 +++++++++++++++++++++
6 files changed, 370 insertions(+)
diff --git
a/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
b/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
index f0c2dc19bc43..99acf2ad241d 100644
---
a/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
+++
b/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
@@ -362,6 +362,9 @@ public class TransactionErrorHandler extends
ErrorHandlerSupport
@Override
public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ if (suspendOnly) {
+ return;
+ }
// prepare for shutdown, eg do not allow redelivery if configured
LOG.trace("Prepare shutdown on error handler {}", this);
preparingShutdown = true;
diff --git
a/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerSuspendResumeTest.java
b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerSuspendResumeTest.java
new file mode 100644
index 000000000000..55830b04eeef
--- /dev/null
+++
b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerSuspendResumeTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jta;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Verifies that JTA transacted routes commit correctly after
context.suspend() + context.resume().
+ *
+ * During suspend, DefaultShutdownStrategy sets preparingShutdown = true on
TransactionErrorHandler via
+ * getChildServices(service, includeErrorHandler=true). During resume, the
error handler is excluded from the service
+ * list (includeErrorHandler=false) and its status stays STARTED, so neither
doStart() nor doResume() is called to reset
+ * the flag. Subsequent transacted exchanges are silently rolled back.
+ */
+public class TransactionErrorHandlerSuspendResumeTest {
+
+ private CamelContext camelContext;
+ private final AtomicInteger commitCount = new AtomicInteger();
+ private final AtomicBoolean lastExchangeRolledBack = new
AtomicBoolean(false);
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ camelContext = new DefaultCamelContext();
+
+ JtaTransactionPolicy testPolicy = new JtaTransactionPolicy() {
+ @Override
+ public void run(Runnable runnable) throws Throwable {
+ try {
+ runnable.run();
+ commitCount.incrementAndGet();
+ lastExchangeRolledBack.set(false);
+ } catch (Throwable t) {
+ lastExchangeRolledBack.set(true);
+ throw t;
+ }
+ }
+ };
+
+ camelContext.getRegistry().bind("PROPAGATION_REQUIRED", testPolicy);
+
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:transacted").routeId("transactedRoute")
+ .transacted()
+ .setBody(constant("done"));
+ }
+ });
+
+ camelContext.start();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (camelContext != null) {
+ camelContext.stop();
+ }
+ }
+
+ @Test
+ public void testTransactionCommitsAfterContextSuspendResume() throws
Exception {
+ // first send — must commit
+ camelContext.createProducerTemplate().sendBody("direct:transacted",
"first");
+ assertEquals(1, commitCount.get(), "First transaction must commit");
+ assertFalse(lastExchangeRolledBack.get(), "First exchange must not be
rolled back");
+
+ // suspend and resume the entire context
+ camelContext.suspend();
+ camelContext.resume();
+
+ // second send — must also commit if preparingShutdown was properly
reset
+ camelContext.createProducerTemplate().sendBody("direct:transacted",
"second");
+ assertEquals(2, commitCount.get(),
+ "After context suspend/resume: second transaction must commit,
"
+ + "but preparingShutdown is still
true causing silent rollback");
+ assertFalse(lastExchangeRolledBack.get(),
+ "After context suspend/resume: exchange must not be rolled
back");
+ }
+
+ @Test
+ public void testTransactionCommitsAfterRouteSuspendResume() throws
Exception {
+ // first send — must commit
+ camelContext.createProducerTemplate().sendBody("direct:transacted",
"first");
+ assertEquals(1, commitCount.get(), "First transaction must commit");
+
+ // suspend and resume a single route
+ camelContext.getRouteController().suspendRoute("transactedRoute");
+ camelContext.getRouteController().resumeRoute("transactedRoute");
+
+ // second send — must also commit
+ camelContext.createProducerTemplate().sendBody("direct:transacted",
"second");
+ assertEquals(2, commitCount.get(),
+ "After route suspend/resume: second transaction must commit");
+ assertFalse(lastExchangeRolledBack.get(),
+ "After route suspend/resume: exchange must not be rolled
back");
+ }
+}
diff --git
a/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceSuspendResumeTest.java
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceSuspendResumeTest.java
new file mode 100644
index 000000000000..12d08e8e70f4
--- /dev/null
+++
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceSuspendResumeTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Verifies that transacted routes commit correctly after context.suspend() +
context.resume().
+ *
+ * During suspend, DefaultShutdownStrategy sets preparingShutdown = true on
TransactionErrorHandler via
+ * getChildServices(service, includeErrorHandler=true). During resume, the
error handler is excluded from the service
+ * list (includeErrorHandler=false) and its status stays STARTED, so neither
doStart() nor doResume() is called to reset
+ * the flag. Subsequent transacted exchanges are silently rolled back.
+ */
+public class TransactionalClientDataSourceSuspendResumeTest extends
TransactionClientDataSourceSupport {
+
+ @Test
+ public void testTransactionCommitsAfterContextSuspendResume() throws
Exception {
+ // baseline: one book inserted via init.sql
+ int count = jdbc.queryForObject("select count(*) from books",
Integer.class);
+ assertEquals(1, count, "Initial number of books");
+
+ // first send — must commit
+ template.sendBody("direct:suspend-resume", "Tiger in Action");
+ count = jdbc.queryForObject("select count(*) from books",
Integer.class);
+ assertEquals(2, count, "After first send");
+
+ // suspend and resume the entire context
+ context.suspend();
+ context.resume();
+
+ // second send — must also commit if preparingShutdown was properly
reset
+ template.sendBody("direct:suspend-resume", "Elephant in Action");
+ count = jdbc.queryForObject("select count(*) from books",
Integer.class);
+ assertEquals(3, count,
+ "After context suspend/resume: transaction should commit but
preparingShutdown"
+ + " is still true, causing silent rollback");
+ }
+
+ @Test
+ public void testTransactionCommitsAfterRouteSuspendResume() throws
Exception {
+ // baseline: one book inserted via init.sql
+ int count = jdbc.queryForObject("select count(*) from books",
Integer.class);
+ assertEquals(1, count, "Initial number of books");
+
+ // first send — must commit
+ template.sendBody("direct:suspend-resume", "Tiger in Action");
+ count = jdbc.queryForObject("select count(*) from books",
Integer.class);
+ assertEquals(2, count, "After first send");
+
+ // suspend and resume a single route
+ context.getRouteController().suspendRoute("suspendResumeRoute");
+ context.getRouteController().resumeRoute("suspendResumeRoute");
+
+ // second send — must also commit
+ template.sendBody("direct:suspend-resume", "Elephant in Action");
+ count = jdbc.queryForObject("select count(*) from books",
Integer.class);
+ assertEquals(3, count, "After route suspend/resume: transaction must
commit");
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new SpringRouteBuilder() {
+ public void configure() throws Exception {
+ SpringTransactionPolicy required =
lookup("PROPAGATION_REQUIRED", SpringTransactionPolicy.class);
+ errorHandler(transactionErrorHandler(required));
+
+ from("direct:suspend-resume").routeId("suspendResumeRoute")
+ .policy(required)
+ .bean("bookService");
+ }
+ };
+ }
+}
diff --git
a/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
b/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index 8afd21871dbe..87986c098365 100644
---
a/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++
b/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -340,6 +340,9 @@ public class TransactionErrorHandler extends
RedeliveryErrorHandler {
@Override
public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ if (suspendOnly) {
+ return;
+ }
super.prepareShutdown(suspendOnly, forced);
if (forced) {
// mark all in-flight transacted exchanges for rollback so the
transaction
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 4d97e6d78be1..a4b1fba908b1 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -242,6 +242,9 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
@Override
public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ if (suspendOnly) {
+ return;
+ }
// prepare for shutdown, eg do not allow redelivery if configured
LOG.trace("Prepare shutdown on error handler: {}", this);
preparingShutdown = true;
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerSuspendResumeTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerSuspendResumeTest.java
new file mode 100644
index 000000000000..87f66f193b02
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerSuspendResumeTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Verifies that redelivery continues to work after context.suspend() +
context.resume() when
+ * allowRedeliveryWhileStopping is set to false.
+ *
+ * During suspend, DefaultShutdownStrategy sets preparingShutdown = true on
RedeliveryErrorHandler via
+ * getChildServices(service, includeErrorHandler=true). During resume, the
error handler is excluded from the service
+ * list (includeErrorHandler=false) and its status stays STARTED, so neither
doStart() nor doResume() is called to reset
+ * the flag. Subsequent exchanges that need redelivery are rejected because
the error handler thinks a shutdown is in
+ * progress.
+ *
+ * The default RedeliveryPolicy has allowRedeliveryWhileStopping=true, which
bypasses the preparingShutdown check. This
+ * test uses allowRedeliveryWhileStopping(false) to expose the bug — a
configuration that is recommended in production
+ * to avoid slow shutdowns caused by long redelivery cycles.
+ */
+public class RedeliveryErrorHandlerSuspendResumeTest extends
ContextTestSupport {
+
+ private final AtomicInteger invocationCount = new AtomicInteger();
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testRedeliveryWorksAfterContextSuspendResume() throws
Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ errorHandler(defaultErrorHandler()
+ .maximumRedeliveries(3)
+ .redeliveryDelay(0)
+ .allowRedeliveryWhileStopping(false));
+
+ from("direct:start").routeId("redeliveryRoute")
+ .process(new FailOnceThenSucceed())
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ // first send — processor fails once, then succeeds on redelivery
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+ template.sendBody("direct:start", "first");
+ assertMockEndpointsSatisfied();
+
+ resetMocks();
+ invocationCount.set(0);
+
+ // suspend and resume the entire context
+ context.suspend();
+ context.resume();
+
+ // second send — redelivery must still work after resume
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+ try {
+ template.sendBody("direct:start", "second");
+ } catch (CamelExecutionException e) {
+ fail("Redelivery should have retried the transient failure after
context suspend/resume, "
+ + "but preparingShutdown is stuck true so redelivery was
rejected: " + e.getMessage());
+ }
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testRedeliveryWorksAfterRouteSuspendResume() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ errorHandler(defaultErrorHandler()
+ .maximumRedeliveries(3)
+ .redeliveryDelay(0)
+ .allowRedeliveryWhileStopping(false));
+
+ from("direct:start").routeId("redeliveryRoute")
+ .process(new FailOnceThenSucceed())
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ // first send — processor fails once, then succeeds on redelivery
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+ template.sendBody("direct:start", "first");
+ assertMockEndpointsSatisfied();
+
+ resetMocks();
+ invocationCount.set(0);
+
+ // suspend and resume a single route
+ context.getRouteController().suspendRoute("redeliveryRoute");
+ context.getRouteController().resumeRoute("redeliveryRoute");
+
+ // second send — redelivery must still work after resume
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+ try {
+ template.sendBody("direct:start", "second");
+ } catch (CamelExecutionException e) {
+ fail("Redelivery should have retried the transient failure after
route suspend/resume, "
+ + "but preparingShutdown is stuck true so redelivery was
rejected: " + e.getMessage());
+ }
+ assertMockEndpointsSatisfied();
+ }
+
+ /**
+ * Processor that throws on the first invocation and succeeds on the
second. Redelivery should make the exchange
+ * succeed.
+ */
+ private class FailOnceThenSucceed implements Processor {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (invocationCount.incrementAndGet() == 1) {
+ throw new IllegalArgumentException("Transient failure — should
succeed on retry");
+ }
+ }
+ }
+}