This is an automated email from the ASF dual-hosted git repository.
martinweiler pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 1a9a0e4ca1 [incubator-kie-issues#2270] Ensure Kafka event emitter is
participating in the existing transaction (#4239)
1a9a0e4ca1 is described below
commit 1a9a0e4ca1385dad12ec0a32a0a86b4f3c097458
Author: Martin Weiler <[email protected]>
AuthorDate: Wed May 6 13:08:50 2026 -0600
[incubator-kie-issues#2270] Ensure Kafka event emitter is participating in
the existing transaction (#4239)
* [incubator-kie-issues#2270] Ensure Kafka event emitter is participating
in the existing transaction
* Add fallbackExecution in case of missing transaction
* Add check for transaction status, eg. in case of transaction timeout
* Add Quarkus integration test to verify rollback behavior
* Add Spring Boot integration test to verify rollback behavior
* Handle STATUS_NO_TRANSACTION
---
.../events/TxEventEmitterQuarkusTemplate.java | 20 +-
.../events/TxEventEmitterSpringTemplate.java | 40 ++--
.../integration-tests-quarkus-transactions/pom.xml | 174 +++++++++++++++++
.../src/main/resources/application.properties | 39 ++++
.../src/main/resources/tx_rollback_test.bpmn | 145 ++++++++++++++
.../quarkus/TransactionRollbackMessageIT.java | 206 ++++++++++++++++++++
.../src/test/resources/application.properties | 22 +++
quarkus/integration-tests/pom.xml | 1 +
.../pom.xml | 194 +++++++++++++++++++
.../springboot/KogitoSpringbootApplication.java | 30 +++
.../src/main/resources/application.properties | 38 ++++
.../src/main/resources/tx_rollback_test.bpmn | 145 ++++++++++++++
.../springboot/TransactionRollbackMessageTest.java | 212 +++++++++++++++++++++
springboot/integration-tests/pom.xml | 1 +
14 files changed, 1252 insertions(+), 15 deletions(-)
diff --git
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
index b414d323c6..52ced75cd0 100644
---
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
+++
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
@@ -25,7 +25,8 @@ import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.TransactionPhase;
import jakarta.inject.Inject;
import jakarta.inject.Named;
-import jakarta.transaction.Transactional;
+import jakarta.transaction.Status;
+import jakarta.transaction.TransactionManager;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -64,6 +65,9 @@ public class $ClassName$ extends
AbstractQuarkusCloudEventEmitter<$Type$> {
@Inject
MessageDecoratorProvider messageDecorator;
+ @Inject
+ TransactionManager transactionManager;
+
class EmitEventType {
DataEvent<?> data;
@@ -73,18 +77,28 @@ public class $ClassName$ extends
AbstractQuarkusCloudEventEmitter<$Type$> {
}
public void observe(@Observes(during = TransactionPhase.AFTER_SUCCESS)
EmitEventType emitEventType) {
- logger.debug("publishing event {}", emitEventType.data);
try {
+ // Verify transaction was actually committed successfully
+ int status = transactionManager.getStatus();
+ if (status != Status.STATUS_COMMITTED && status !=
Status.STATUS_NO_TRANSACTION) {
+ logger.debug("Skipping event publication - transaction status
is {} (not committed)", status);
+ return;
+ }
+
+ logger.debug("publishing event {}", emitEventType.data);
Message<$Type$> message =
messageDecorator.decorate(getMessage(emitEventType.data));
emitter.send(message);
} catch (IOException e) {
throw new UncheckedIOException(e);
+ } catch (Exception e) {
+ logger.error("Error checking transaction status or publishing
event", e);
+ throw new RuntimeException(e);
}
}
@Override
- @Transactional
public void emit(DataEvent<?> dataEvent) {
+ logger.debug("emit event {}", dataEvent);
event.fire(new EmitEventType(dataEvent));
}
diff --git
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
index f94c607e9f..1ac2ef60e4 100644
---
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
+++
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
@@ -20,42 +20,58 @@ package org.kie.kogito.addon.cloudevents.spring;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.io.UncheckedIOException;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import org.kie.kogito.config.ConfigBean;
import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventEmitter;
import org.kie.kogito.event.EventMarshaller;
-import org.kie.kogito.event.EventUnmarshaller;
-import org.kie.kogito.event.KogitoEventStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.core.env.Environment;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
import org.kie.kogito.addon.cloudevents.spring.KogitoMessaging;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
- * Spring implementation delegating to kafka template
+ * Spring transactional implementation using @TransactionalEventListener
+ * to ensure Kafka messages are only sent after database transaction commits
*/
@Component("Emitter-$ChannelName$")
-@Transactional
public class $ClassName$ implements EventEmitter {
+ private static final Logger logger =
LoggerFactory.getLogger($ClassName$.class);
+
@Autowired
org.springframework.kafka.core.KafkaTemplate<String, $Type$> emitter;
@Autowired
ObjectMapper mapper;
+ @Autowired
+ ApplicationEventPublisher eventPublisher;
+
+ static class EmitEventType {
+ final DataEvent<?> data;
+
+ public EmitEventType(DataEvent<?> data) {
+ this.data = data;
+ }
+ }
+
+ @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT,
fallbackExecution = true)
+ public void observe(EmitEventType emitEventType) {
+ logger.debug("publishing event {}", emitEventType.data);
+ emitter.send("$Topic$", toTopicType(emitEventType.data));
+ }
+
@Override
public void emit(DataEvent<?> event) {
- emitter.send("$Topic$", toTopicType(event));
+ logger.debug("emit event {}", event);
+ eventPublisher.publishEvent(new EmitEventType(event));
}
private $Type$ toTopicTypeCloud(DataEvent<?> event) {
diff --git
a/quarkus/integration-tests/integration-tests-quarkus-transactions/pom.xml
b/quarkus/integration-tests/integration-tests-quarkus-transactions/pom.xml
new file mode 100644
index 0000000000..aab54aa620
--- /dev/null
+++ b/quarkus/integration-tests/integration-tests-quarkus-transactions/pom.xml
@@ -0,0 +1,174 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-quarkus-integration-tests</artifactId>
+ <version>999-SNAPSHOT</version>
+ </parent>
+ <artifactId>integration-tests-quarkus-transactions</artifactId>
+ <name>Kogito :: Integration Tests :: Quarkus :: Transactions</name>
+
+ <properties>
+ <quarkus.test.list.include>false</quarkus.test.list.include>
+
<java.module.name>org.kie.kogito.test.quarkus.transactions</java.module.name>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-quarkus-bom</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jbpm</groupId>
+ <artifactId>jbpm-quarkus</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.kie</groupId>
+ <artifactId>kie-addons-quarkus-messaging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-messaging-kafka</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.kie</groupId>
+ <artifactId>kie-addons-quarkus-persistence-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-jdbc-postgresql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-jackson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-quarkus-test-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-test-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-maven-plugin</artifactId>
+ <configuration>
+ <noDeps>true</noDeps>
+ <skip>${skipTests}</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables combine.children="append">
+
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
+
<container.image.kafka>${container.image.kafka}</container.image.kafka>
+ <kogito.version>${project.version}</kogito.version>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>native</id>
+ <activation>
+ <property>
+ <name>native</name>
+ </property>
+ </activation>
+ <properties>
+ <quarkus.native.enabled>true</quarkus.native.enabled>
+ </properties>
+ </profile>
+ </profiles>
+</project>
+
diff --git
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/application.properties
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/application.properties
new file mode 100644
index 0000000000..132018c817
--- /dev/null
+++
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/application.properties
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+quarkus.log.level=INFO
+#quarkus.log.category."org.kie.kogito".level=DEBUG
+
+# Maximum Java heap to be used during the native image generation
+quarkus.native.native-image-xmx=8g
+
+mp.messaging.incoming.tx_test_continue.connector=smallrye-kafka
+mp.messaging.incoming.tx_test_continue.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+mp.messaging.outgoing.tx_test_end.connector=smallrye-kafka
+mp.messaging.outgoing.tx_test_end.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+
+kogito.transactionEnabled=true
+kogito.faultToleranceEnabled=true
+
+# PostgreSQL persistence configuration for transaction rollback test
+kogito.persistence.type=postgresql
+quarkus.datasource.devservices.image-name=mirror.gcr.io/postgres:15.9-alpine3.20
+kie.flyway.enabled=true
+
diff --git
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/tx_rollback_test.bpmn
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/tx_rollback_test.bpmn
new file mode 100644
index 0000000000..92c9b9196d
--- /dev/null
+++
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/tx_rollback_test.bpmn
@@ -0,0 +1,145 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<bpmn2:definitions xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:bpsim="http://www.bpsim.org/schemas/1.0"
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:di="http://www.omg.org/spec/DD/20100524/DI"
xmlns:drools="http://www.jboss.org/drools" id="_TxRollbackTest" exporter="jBPM
Process Modeler" exporterVersion="2.0"
targetNamespace="http://www.omg.org/bpmn20">
+ <bpmn2:itemDefinition id="_sleepTimeItem" structureRef="Integer"/>
+ <bpmn2:itemDefinition id="_messageItem" structureRef="String"/>
+ <bpmn2:itemDefinition id="__MESSAGE_CATCH_eventOutputXItem"
structureRef="String"/>
+ <bpmn2:itemDefinition id="tx_test_continueType" structureRef="String"/>
+ <bpmn2:itemDefinition id="__END_MESSAGE_eventInputXItem"
structureRef="String"/>
+ <bpmn2:itemDefinition id="tx_test_endType" structureRef="String"/>
+ <bpmn2:message id="_msg_continue" itemRef="tx_test_continueType"
name="tx_test_continue"/>
+ <bpmn2:message id="_msg_end" itemRef="tx_test_endType" name="tx_test_end"/>
+ <bpmn2:process id="tx_rollback_test"
drools:packageName="org.kie.kogito.integrationtests" drools:version="1.0"
drools:adHoc="false" name="tx_rollback_test" isExecutable="true"
processType="Public">
+ <bpmn2:property id="sleepTime" itemSubjectRef="_sleepTimeItem"
name="sleepTime"/>
+ <bpmn2:property id="message" itemSubjectRef="_messageItem" name="message"/>
+ <bpmn2:sequenceFlow id="_SEQ_1" sourceRef="_START_EVENT"
targetRef="_MESSAGE_CATCH"/>
+ <bpmn2:sequenceFlow id="_SEQ_2" sourceRef="_MESSAGE_CATCH"
targetRef="_SLEEP_TASK"/>
+ <bpmn2:sequenceFlow id="_SEQ_3" sourceRef="_SLEEP_TASK"
targetRef="_END_MESSAGE"/>
+ <bpmn2:startEvent id="_START_EVENT" name="Start">
+ <bpmn2:outgoing>_SEQ_1</bpmn2:outgoing>
+ </bpmn2:startEvent>
+ <bpmn2:intermediateCatchEvent id="_MESSAGE_CATCH" name="Wait for Continue">
+ <bpmn2:incoming>_SEQ_1</bpmn2:incoming>
+ <bpmn2:outgoing>_SEQ_2</bpmn2:outgoing>
+ <bpmn2:dataOutput id="_MESSAGE_CATCH_eventOutputX" drools:dtype="String"
itemSubjectRef="__MESSAGE_CATCH_eventOutputXItem" name="event"/>
+ <bpmn2:dataOutputAssociation>
+ <bpmn2:sourceRef>_MESSAGE_CATCH_eventOutputX</bpmn2:sourceRef>
+ <bpmn2:targetRef>message</bpmn2:targetRef>
+ </bpmn2:dataOutputAssociation>
+ <bpmn2:outputSet>
+
<bpmn2:dataOutputRefs>_MESSAGE_CATCH_eventOutputX</bpmn2:dataOutputRefs>
+ </bpmn2:outputSet>
+ <bpmn2:messageEventDefinition drools:msgref="tx_test_continue"
messageRef="_msg_continue"/>
+ </bpmn2:intermediateCatchEvent>
+ <bpmn2:scriptTask id="_SLEEP_TASK" name="Sleep Task"
scriptFormat="http://www.java.com/java">
+ <bpmn2:extensionElements>
+ <drools:metaData name="elementname">
+ <drools:metaValue><![CDATA[Sleep Task]]></drools:metaValue>
+ </drools:metaData>
+ </bpmn2:extensionElements>
+ <bpmn2:incoming>_SEQ_2</bpmn2:incoming>
+ <bpmn2:outgoing>_SEQ_3</bpmn2:outgoing>
+ <bpmn2:script><![CDATA[if (sleepTime != null && sleepTime > 0) {
+ System.out.println("Sleeping for " + sleepTime + " milliseconds");
+ Thread.sleep(sleepTime);
+ System.out.println("Sleep completed");
+}]]></bpmn2:script>
+ </bpmn2:scriptTask>
+ <bpmn2:endEvent id="_END_MESSAGE" name="End">
+ <bpmn2:incoming>_SEQ_3</bpmn2:incoming>
+ <bpmn2:dataInput id="_END_MESSAGE_eventInputX" drools:dtype="String"
itemSubjectRef="__END_MESSAGE_eventInputXItem" name="event"/>
+ <bpmn2:dataInputAssociation>
+ <bpmn2:sourceRef>message</bpmn2:sourceRef>
+ <bpmn2:targetRef>_END_MESSAGE_eventInputX</bpmn2:targetRef>
+ </bpmn2:dataInputAssociation>
+ <bpmn2:inputSet>
+ <bpmn2:dataInputRefs>_END_MESSAGE_eventInputX</bpmn2:dataInputRefs>
+ </bpmn2:inputSet>
+ <bpmn2:messageEventDefinition drools:msgref="tx_test_end"
messageRef="_msg_end"/>
+ </bpmn2:endEvent>
+ </bpmn2:process>
+ <bpmndi:BPMNDiagram>
+ <bpmndi:BPMNPlane bpmnElement="tx_rollback_test">
+ <bpmndi:BPMNShape id="shape__START_EVENT" bpmnElement="_START_EVENT">
+ <dc:Bounds height="56" width="56" x="100" y="100"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__MESSAGE_CATCH" bpmnElement="_MESSAGE_CATCH">
+ <dc:Bounds height="56" width="56" x="230" y="100"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__SLEEP_TASK" bpmnElement="_SLEEP_TASK">
+ <dc:Bounds height="102" width="154" x="360" y="77"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__END_MESSAGE" bpmnElement="_END_MESSAGE">
+ <dc:Bounds height="56" width="56" x="590" y="100"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNEdge id="edge__START_EVENT_to__MESSAGE_CATCH"
bpmnElement="_SEQ_1">
+ <di:waypoint x="156" y="128"/>
+ <di:waypoint x="230" y="128"/>
+ </bpmndi:BPMNEdge>
+ <bpmndi:BPMNEdge id="edge__MESSAGE_CATCH_to__SLEEP_TASK"
bpmnElement="_SEQ_2">
+ <di:waypoint x="286" y="128"/>
+ <di:waypoint x="360" y="128"/>
+ </bpmndi:BPMNEdge>
+ <bpmndi:BPMNEdge id="edge__SLEEP_TASK_to__END_MESSAGE"
bpmnElement="_SEQ_3">
+ <di:waypoint x="514" y="128"/>
+ <di:waypoint x="590" y="128"/>
+ </bpmndi:BPMNEdge>
+ </bpmndi:BPMNPlane>
+ </bpmndi:BPMNDiagram>
+ <bpmn2:relationship type="BPSimData">
+ <bpmn2:extensionElements>
+ <bpsim:BPSimData>
+ <bpsim:Scenario id="default" name="Simulation scenario">
+ <bpsim:ScenarioParameters/>
+ <bpsim:ElementParameters elementRef="_START_EVENT">
+ <bpsim:TimeParameters>
+ <bpsim:ProcessingTime>
+ <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+ </bpsim:ProcessingTime>
+ </bpsim:TimeParameters>
+ </bpsim:ElementParameters>
+ <bpsim:ElementParameters elementRef="_SLEEP_TASK">
+ <bpsim:TimeParameters>
+ <bpsim:ProcessingTime>
+ <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+ </bpsim:ProcessingTime>
+ </bpsim:TimeParameters>
+ <bpsim:ResourceParameters>
+ <bpsim:Availability>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:Availability>
+ <bpsim:Quantity>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:Quantity>
+ </bpsim:ResourceParameters>
+ <bpsim:CostParameters>
+ <bpsim:UnitCost>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:UnitCost>
+ </bpsim:CostParameters>
+ </bpsim:ElementParameters>
+ </bpsim:Scenario>
+ </bpsim:BPSimData>
+ </bpmn2:extensionElements>
+ <bpmn2:source>_TxRollbackTest</bpmn2:source>
+ <bpmn2:target>_TxRollbackTest</bpmn2:target>
+ </bpmn2:relationship>
+</bpmn2:definitions>
+
diff --git
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/java/org/kie/kogito/integrationtests/quarkus/TransactionRollbackMessageIT.java
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/java/org/kie/kogito/integrationtests/quarkus/TransactionRollbackMessageIT.java
new file mode 100644
index 0000000000..03faadceae
--- /dev/null
+++
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/java/org/kie/kogito/integrationtests/quarkus/TransactionRollbackMessageIT.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.integrationtests.quarkus;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.kie.kogito.test.quarkus.QuarkusTestProperty;
+import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;
+import org.kie.kogito.testcontainers.KogitoPostgreSqlContainer;
+import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
+import org.kie.kogito.testcontainers.quarkus.PostgreSqlQuarkusTestResource;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+
+import static io.restassured.RestAssured.given;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Integration test to verify that when a transaction rolls back due to
persistence failure,
+ * no Kafka message is sent. This validates the transactional behavior of the
event emitter.
+ *
+ */
+@QuarkusIntegrationTest
+@QuarkusTestResource(KafkaQuarkusTestResource.class)
+@QuarkusTestResource(TransactionRollbackMessageIT.ExposedPostgreSqlResource.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TransactionRollbackMessageIT {
+
+ private static final String TX_TEST_END_TOPIC = "tx_test_end";
+ private static final String TX_TEST_CONTINUE_TOPIC = "tx_test_continue";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ public KafkaTestClient kafkaClient;
+ private AtomicInteger messageCount;
+ private CountDownLatch messageLatch;
+
+ @QuarkusTestProperty(name = KafkaQuarkusTestResource.KOGITO_KAFKA_PROPERTY)
+ private String kafkaBootstrapServers;
+
+ // Static reference to PostgreSQL container for stopping it during test
+ private static KogitoPostgreSqlContainer postgresContainer;
+
+ static {
+ RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
+ }
+
+ @BeforeEach
+ public void setup() {
+ kafkaClient = new KafkaTestClient(kafkaBootstrapServers);
+ messageCount = new AtomicInteger(0);
+ messageLatch = new CountDownLatch(1);
+
+ // Ensure PostgreSQL is running before each test
+ if (postgresContainer != null && !postgresContainer.isRunning()) {
+ postgresContainer.start();
+ }
+ }
+
+ @Test
+ @Order(1)
+ void testMessageSentWhenTransactionSucceeds() throws Exception {
+ kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+ messageCount.incrementAndGet();
+ messageLatch.countDown();
+ });
+
+ String pId = given().body(String.format("{ \"sleepTime\": 1000 }"))
+ .contentType(ContentType.JSON)
+ .when()
+ .post("/tx_rollback_test")
+ .then()
+ .statusCode(201)
+ .extract().body().path("id");
+
+ await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> given()
+ .contentType(ContentType.JSON)
+ .when()
+ .get("/tx_rollback_test/" + pId)
+ .then()
+ .statusCode(200));
+
+ String continueMessage = createCloudEventsMessage(pId, "continue");
+ kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+ // Wait for the process to complete and message to be sent
+ boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+ // In a successful transaction, the message should be sent
+ assertThat(messageReceived).as("Message should be sent when
transaction succeeds").isTrue();
+ assertThat(messageCount.get()).as("Message count should be at least
1").isGreaterThanOrEqualTo(1);
+ }
+
+ @Test
+ // @Order(2)
+ void testNoMessageSentWhenTransactionRollsBack() throws Exception {
+ kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+ System.out.println("Received end message: " + s);
+ messageCount.incrementAndGet();
+ messageLatch.countDown();
+ });
+
+ String pId = given().body(String.format("{ \"sleepTime\": 5000 }"))
+ .contentType(ContentType.JSON)
+ .when()
+ .post("/tx_rollback_test")
+ .then()
+ .statusCode(201)
+ .extract().body().path("id");
+
+ await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> given()
+ .contentType(ContentType.JSON)
+ .when()
+ .get("/tx_rollback_test/" + pId)
+ .then()
+ .statusCode(200));
+
+ String continueMessage = createCloudEventsMessage(pId, "continue");
+ kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+ // Wait for the message to be consumed and sleep to start
+ Thread.sleep(1000);
+
+ // Stop PostgreSQL during the sleep to simulate database failure
+ if (postgresContainer != null) {
+ postgresContainer.stop();
+ } else {
+ System.out.println("WARNING: PostgreSQL container reference is
null, cannot simulate failure");
+ }
+
+ // Verify that NO message was sent to the tx_test_end topic
+ // because the transaction should have rolled back
+ boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+ // In a successful rollback scenario, no message should be received
+ assertThat(messageReceived).as("No message should be sent when
transaction rolls back").isFalse();
+ assertThat(messageCount.get()).as("Message count should be
0").isEqualTo(0);
+ }
+
+ private String createCloudEventsMessage(String processInstanceId, String
messageContent) throws Exception {
+ Map<String, Object> cloudEvent = new HashMap<>();
+ cloudEvent.put("specversion", "1.0");
+ cloudEvent.put("id", UUID.randomUUID().toString());
+ cloudEvent.put("source", "test");
+ cloudEvent.put("type", "tx_test_continue");
+ cloudEvent.put("data", messageContent);
+ cloudEvent.put("kogitoprocrefid", processInstanceId);
+
+ return MAPPER.writeValueAsString(cloudEvent);
+ }
+
+ /**
+ * Custom PostgreSQL test resource that exposes the container instance
+ * so tests can stop it to simulate database failure.
+ */
+ public static class ExposedPostgreSqlResource extends
PostgreSqlQuarkusTestResource {
+
+ @Override
+ public Map<String, String> start() {
+ Map<String, String> properties = super.start();
+ // Store reference to the container for test access
+ postgresContainer = getTestResource();
+ return properties;
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ postgresContainer = null;
+ }
+ }
+}
diff --git
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/resources/application.properties
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/resources/application.properties
new file mode 100644
index 0000000000..0d0c0da0e1
--- /dev/null
+++
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/resources/application.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Quarkus
+quarkus.http.test-port=0
+quarkus.kafka.devservices.enabled=false
diff --git a/quarkus/integration-tests/pom.xml
b/quarkus/integration-tests/pom.xml
index 7e6f72b621..8b50f72814 100644
--- a/quarkus/integration-tests/pom.xml
+++ b/quarkus/integration-tests/pom.xml
@@ -47,6 +47,7 @@
<module>integration-tests-quarkus-processes</module>
<module>integration-tests-quarkus-processes-reactive</module>
<module>integration-tests-quarkus-processes-persistence</module>
+ <module>integration-tests-quarkus-transactions</module>
<module>integration-tests-quarkus-usertasks</module>
<module>integration-tests-quarkus-usertask-listeners-rollback</module>
<module>integration-tests-quarkus-wshumantasks</module>
diff --git
a/springboot/integration-tests/integration-tests-springboot-transactions-it/pom.xml
b/springboot/integration-tests/integration-tests-springboot-transactions-it/pom.xml
new file mode 100644
index 0000000000..0218b881cf
--- /dev/null
+++
b/springboot/integration-tests/integration-tests-springboot-transactions-it/pom.xml
@@ -0,0 +1,194 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-spring-boot-integration-tests</artifactId>
+ <version>999-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>integration-tests-springboot-transactions-it</artifactId>
+
+ <properties>
+
<java.module.name>integration.tests.springboot.transactions.it</java.module.name>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-spring-boot-bom</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jbpm</groupId>
+ <artifactId>jbpm-with-drools-spring-boot-starter</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kie</groupId>
+ <artifactId>kie-addons-springboot-messaging</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kie</groupId>
+ <artifactId>kie-addons-springboot-events-process-kafka</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kie</groupId>
+ <artifactId>kie-addons-springboot-persistence-jdbc</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-jdbc</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-spring-boot-test-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>${project.artifactId}</finalName>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${version.compiler.plugin}</version>
+ <configuration>
+ <release>${maven.compiler.release}</release>
+ </configuration>
+ <executions>
+ <execution>
+ <id>pre-kogito-generate-model</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-maven-plugin</artifactId>
+ <version>${project.version}</version> <!-- Needed, otherwise it would
use the latest release found on Maven central -->
+ <executions>
+ <execution>
+ <phase>compile</phase>
+ <goals>
+ <goal>generateModel</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${version.org.springframework.boot}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ <configuration>
+ <classifier>executable</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <classesDirectory>${project.build.outputDirectory}</classesDirectory>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${version.surefire.plugin}</version>
+ <configuration>
+ <systemPropertyVariables combine.children="append">
+
<container.image.kafka>${container.image.kafka}</container.image.kafka>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
+
diff --git
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/java/org/kie/kogito/integrationtests/springboot/KogitoSpringbootApplication.java
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/java/org/kie/kogito/integrationtests/springboot/KogitoSpringbootApplication.java
new file mode 100644
index 0000000000..4a460a4976
--- /dev/null
+++
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/java/org/kie/kogito/integrationtests/springboot/KogitoSpringbootApplication.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.integrationtests.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication(scanBasePackages = { "org.kie.kogito.**" })
+public class KogitoSpringbootApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KogitoSpringbootApplication.class, args);
+ }
+}
diff --git
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/application.properties
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/application.properties
new file mode 100644
index 0000000000..d322322578
--- /dev/null
+++
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/application.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+server.address=0.0.0.0
+spring.kafka.consumer.group-id=kogito-group
+spring.kafka.consumer.auto-offset-reset=earliest
+
+# Transaction rollback test topics
+kogito.addon.cloudevents.kafka.kogito_incoming_stream.tx_test_continue=tx_test_continue
+kogito.addon.cloudevents.kafka.kogito_outgoing_stream.tx_test_end=tx_test_end
+
+# PostgreSQL persistence configuration for transaction rollback test
+kogito.persistence.type=postgresql
+kie.flyway.enabled=true
+spring.flyway.enabled=false
+
+kogito.transactionEnabled=true
+kogito.faultToleranceEnabled=true
+
+# Connection URI will be set by test container
+kogito.persistence.postgresql.connection.uri=postgresql://kogito-user:kogito-pass@localhost:5432/kogito
+
diff --git
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/tx_rollback_test.bpmn
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/tx_rollback_test.bpmn
new file mode 100644
index 0000000000..d91589fe80
--- /dev/null
+++
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/tx_rollback_test.bpmn
@@ -0,0 +1,145 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<bpmn2:definitions xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:bpsim="http://www.bpsim.org/schemas/1.0"
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:di="http://www.omg.org/spec/DD/20100524/DI"
xmlns:drools="http://www.jboss.org/drools" id="_TxRollbackTestSpring"
exporter="jBPM Process Modeler" exporterVersion="2.0"
targetNamespace="http://www.omg.org/bpmn20">
+ <bpmn2:itemDefinition id="_sleepTimeItem" structureRef="Integer"/>
+ <bpmn2:itemDefinition id="_messageItem" structureRef="String"/>
+ <bpmn2:itemDefinition id="__MESSAGE_CATCH_eventOutputXItem"
structureRef="String"/>
+ <bpmn2:itemDefinition id="tx_test_continueType" structureRef="String"/>
+ <bpmn2:itemDefinition id="__END_MESSAGE_eventInputXItem"
structureRef="String"/>
+ <bpmn2:itemDefinition id="tx_test_endType" structureRef="String"/>
+ <bpmn2:message id="_msg_continue" itemRef="tx_test_continueType"
name="tx_test_continue"/>
+ <bpmn2:message id="_msg_end" itemRef="tx_test_endType" name="tx_test_end"/>
+ <bpmn2:process id="tx_rollback_test"
drools:packageName="org.kie.kogito.integrationtests.springboot"
drools:version="1.0" drools:adHoc="false" name="tx_rollback_test"
isExecutable="true" processType="Public">
+ <bpmn2:property id="sleepTime" itemSubjectRef="_sleepTimeItem"
name="sleepTime"/>
+ <bpmn2:property id="message" itemSubjectRef="_messageItem" name="message"/>
+ <bpmn2:sequenceFlow id="_SEQ_1" sourceRef="_START_EVENT"
targetRef="_MESSAGE_CATCH"/>
+ <bpmn2:sequenceFlow id="_SEQ_2" sourceRef="_MESSAGE_CATCH"
targetRef="_SLEEP_TASK"/>
+ <bpmn2:sequenceFlow id="_SEQ_3" sourceRef="_SLEEP_TASK"
targetRef="_END_MESSAGE"/>
+ <bpmn2:startEvent id="_START_EVENT" name="Start">
+ <bpmn2:outgoing>_SEQ_1</bpmn2:outgoing>
+ </bpmn2:startEvent>
+ <bpmn2:intermediateCatchEvent id="_MESSAGE_CATCH" name="Wait for Continue">
+ <bpmn2:incoming>_SEQ_1</bpmn2:incoming>
+ <bpmn2:outgoing>_SEQ_2</bpmn2:outgoing>
+ <bpmn2:dataOutput id="_MESSAGE_CATCH_eventOutputX" drools:dtype="String"
itemSubjectRef="__MESSAGE_CATCH_eventOutputXItem" name="event"/>
+ <bpmn2:dataOutputAssociation>
+ <bpmn2:sourceRef>_MESSAGE_CATCH_eventOutputX</bpmn2:sourceRef>
+ <bpmn2:targetRef>message</bpmn2:targetRef>
+ </bpmn2:dataOutputAssociation>
+ <bpmn2:outputSet>
+
<bpmn2:dataOutputRefs>_MESSAGE_CATCH_eventOutputX</bpmn2:dataOutputRefs>
+ </bpmn2:outputSet>
+ <bpmn2:messageEventDefinition drools:msgref="tx_test_continue"
messageRef="_msg_continue"/>
+ </bpmn2:intermediateCatchEvent>
+ <bpmn2:scriptTask id="_SLEEP_TASK" name="Sleep Task"
scriptFormat="http://www.java.com/java">
+ <bpmn2:extensionElements>
+ <drools:metaData name="elementname">
+ <drools:metaValue><![CDATA[Sleep Task]]></drools:metaValue>
+ </drools:metaData>
+ </bpmn2:extensionElements>
+ <bpmn2:incoming>_SEQ_2</bpmn2:incoming>
+ <bpmn2:outgoing>_SEQ_3</bpmn2:outgoing>
+ <bpmn2:script><![CDATA[if (sleepTime != null && sleepTime > 0) {
+ System.out.println("Sleeping for " + sleepTime + " milliseconds");
+ Thread.sleep(sleepTime);
+ System.out.println("Sleep completed");
+}]]></bpmn2:script>
+ </bpmn2:scriptTask>
+ <bpmn2:endEvent id="_END_MESSAGE" name="End">
+ <bpmn2:incoming>_SEQ_3</bpmn2:incoming>
+ <bpmn2:dataInput id="_END_MESSAGE_eventInputX" drools:dtype="String"
itemSubjectRef="__END_MESSAGE_eventInputXItem" name="event"/>
+ <bpmn2:dataInputAssociation>
+ <bpmn2:sourceRef>message</bpmn2:sourceRef>
+ <bpmn2:targetRef>_END_MESSAGE_eventInputX</bpmn2:targetRef>
+ </bpmn2:dataInputAssociation>
+ <bpmn2:inputSet>
+ <bpmn2:dataInputRefs>_END_MESSAGE_eventInputX</bpmn2:dataInputRefs>
+ </bpmn2:inputSet>
+ <bpmn2:messageEventDefinition drools:msgref="tx_test_end"
messageRef="_msg_end"/>
+ </bpmn2:endEvent>
+ </bpmn2:process>
+ <bpmndi:BPMNDiagram>
+ <bpmndi:BPMNPlane bpmnElement="tx_rollback_test">
+ <bpmndi:BPMNShape id="shape__START_EVENT" bpmnElement="_START_EVENT">
+ <dc:Bounds height="56" width="56" x="100" y="100"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__MESSAGE_CATCH" bpmnElement="_MESSAGE_CATCH">
+ <dc:Bounds height="56" width="56" x="230" y="100"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__SLEEP_TASK" bpmnElement="_SLEEP_TASK">
+ <dc:Bounds height="102" width="154" x="360" y="77"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__END_MESSAGE" bpmnElement="_END_MESSAGE">
+ <dc:Bounds height="56" width="56" x="590" y="100"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNEdge id="edge__START_EVENT_to__MESSAGE_CATCH"
bpmnElement="_SEQ_1">
+ <di:waypoint x="156" y="128"/>
+ <di:waypoint x="230" y="128"/>
+ </bpmndi:BPMNEdge>
+ <bpmndi:BPMNEdge id="edge__MESSAGE_CATCH_to__SLEEP_TASK"
bpmnElement="_SEQ_2">
+ <di:waypoint x="286" y="128"/>
+ <di:waypoint x="360" y="128"/>
+ </bpmndi:BPMNEdge>
+ <bpmndi:BPMNEdge id="edge__SLEEP_TASK_to__END_MESSAGE"
bpmnElement="_SEQ_3">
+ <di:waypoint x="514" y="128"/>
+ <di:waypoint x="590" y="128"/>
+ </bpmndi:BPMNEdge>
+ </bpmndi:BPMNPlane>
+ </bpmndi:BPMNDiagram>
+ <bpmn2:relationship type="BPSimData">
+ <bpmn2:extensionElements>
+ <bpsim:BPSimData>
+ <bpsim:Scenario id="default" name="Simulation scenario">
+ <bpsim:ScenarioParameters/>
+ <bpsim:ElementParameters elementRef="_START_EVENT">
+ <bpsim:TimeParameters>
+ <bpsim:ProcessingTime>
+ <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+ </bpsim:ProcessingTime>
+ </bpsim:TimeParameters>
+ </bpsim:ElementParameters>
+ <bpsim:ElementParameters elementRef="_SLEEP_TASK">
+ <bpsim:TimeParameters>
+ <bpsim:ProcessingTime>
+ <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+ </bpsim:ProcessingTime>
+ </bpsim:TimeParameters>
+ <bpsim:ResourceParameters>
+ <bpsim:Availability>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:Availability>
+ <bpsim:Quantity>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:Quantity>
+ </bpsim:ResourceParameters>
+ <bpsim:CostParameters>
+ <bpsim:UnitCost>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:UnitCost>
+ </bpsim:CostParameters>
+ </bpsim:ElementParameters>
+ </bpsim:Scenario>
+ </bpsim:BPSimData>
+ </bpmn2:extensionElements>
+ <bpmn2:source>_TxRollbackTestSpring</bpmn2:source>
+ <bpmn2:target>_TxRollbackTestSpring</bpmn2:target>
+ </bpmn2:relationship>
+</bpmn2:definitions>
+
diff --git
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/test/java/org/kie/kogito/integrationtests/springboot/TransactionRollbackMessageTest.java
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/test/java/org/kie/kogito/integrationtests/springboot/TransactionRollbackMessageTest.java
new file mode 100644
index 0000000000..22e560b61a
--- /dev/null
+++
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/test/java/org/kie/kogito/integrationtests/springboot/TransactionRollbackMessageTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.integrationtests.springboot;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.kie.kogito.test.springboot.kafka.KafkaTestClient;
+import org.kie.kogito.testcontainers.KogitoPostgreSqlContainer;
+import org.kie.kogito.testcontainers.springboot.KafkaSpringBootTestResource;
+import
org.kie.kogito.testcontainers.springboot.PostgreSqlSpringBootTestResource;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.test.context.ContextConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+
+import static io.restassured.RestAssured.given;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Integration test to verify that when a transaction rolls back due to
persistence failure,
+ * no Kafka message is sent. This validates the transactional behavior of the
event emitter.
+ */
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = KogitoSpringbootApplication.class)
+@ContextConfiguration(initializers = { KafkaSpringBootTestResource.class,
TransactionRollbackMessageTest.ExposedPostgreSqlResource.class })
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TransactionRollbackMessageTest {
+
+ private static final String TX_TEST_END_TOPIC = "tx_test_end";
+ private static final String TX_TEST_CONTINUE_TOPIC = "tx_test_continue";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @LocalServerPort
+ int randomServerPort;
+
+ @Autowired
+ private KafkaTestClient kafkaClient;
+
+ private AtomicInteger messageCount;
+ private CountDownLatch messageLatch;
+
+ // Static reference to PostgreSQL container for stopping it during test
+ private static KogitoPostgreSqlContainer postgresContainer;
+
+ static {
+ RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
+ }
+
+ @BeforeEach
+ public void setup() {
+ RestAssured.port = randomServerPort;
+ messageCount = new AtomicInteger(0);
+ messageLatch = new CountDownLatch(1);
+
+ // Ensure PostgreSQL is running before each test
+ if (postgresContainer != null && !postgresContainer.isRunning()) {
+ postgresContainer.start();
+ }
+ }
+
+ @Test
+ @Order(1)
+ void testMessageSentWhenTransactionSucceeds() throws Exception {
+ kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+ messageCount.incrementAndGet();
+ messageLatch.countDown();
+ });
+
+ String pId = given().body(String.format("{ \"sleepTime\": 1000 }"))
+ .contentType(ContentType.JSON)
+ .when()
+ .post("/tx_rollback_test")
+ .then()
+ .statusCode(201)
+ .extract().body().path("id");
+
+ await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> given()
+ .contentType(ContentType.JSON)
+ .when()
+ .get("/tx_rollback_test/" + pId)
+ .then()
+ .statusCode(200));
+
+ String continueMessage = createCloudEventsMessage(pId, "continue");
+ kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+ // Wait for the process to complete and message to be sent
+ boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+ // In a successful transaction, the message should be sent
+ assertThat(messageReceived).as("Message should be sent when
transaction succeeds").isTrue();
+ assertThat(messageCount.get()).as("Message count should be at least
1").isGreaterThanOrEqualTo(1);
+ }
+
+ @Test
+ @Order(2)
+ void testNoMessageSentWhenTransactionRollsBack() throws Exception {
+ kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+ System.out.println("Received end message: " + s);
+ messageCount.incrementAndGet();
+ messageLatch.countDown();
+ });
+
+ String pId = given().body(String.format("{ \"sleepTime\": 5000 }"))
+ .contentType(ContentType.JSON)
+ .when()
+ .post("/tx_rollback_test")
+ .then()
+ .statusCode(201)
+ .extract().body().path("id");
+
+ await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> given()
+ .contentType(ContentType.JSON)
+ .when()
+ .get("/tx_rollback_test/" + pId)
+ .then()
+ .statusCode(200));
+
+ String continueMessage = createCloudEventsMessage(pId, "continue");
+ kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+ // Wait for the message to be consumed and sleep to start
+ Thread.sleep(1000);
+
+ // Stop PostgreSQL during the sleep to simulate database failure
+ if (postgresContainer != null) {
+ postgresContainer.stop();
+ } else {
+ System.out.println("WARNING: PostgreSQL container reference is
null, cannot simulate failure");
+ }
+
+ // Verify that NO message was sent to the tx_test_end topic
+ // because the transaction should have rolled back
+ boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+ // In a successful rollback scenario, no message should be received
+ assertThat(messageReceived).as("No message should be sent when
transaction rolls back").isFalse();
+ assertThat(messageCount.get()).as("Message count should be
0").isEqualTo(0);
+ }
+
+ private String createCloudEventsMessage(String processInstanceId, String
messageContent) throws Exception {
+ Map<String, Object> cloudEvent = new HashMap<>();
+ cloudEvent.put("specversion", "1.0");
+ cloudEvent.put("id", UUID.randomUUID().toString());
+ cloudEvent.put("source", "test");
+ cloudEvent.put("type", "tx_test_continue");
+ cloudEvent.put("data", messageContent);
+ cloudEvent.put("kogitoprocrefid", processInstanceId);
+
+ return MAPPER.writeValueAsString(cloudEvent);
+ }
+
+ /**
+ * Custom PostgreSQL test resource that exposes the container instance
+ * so tests can stop it to simulate database failure.
+ */
+ public static class ExposedPostgreSqlResource extends
PostgreSqlSpringBootTestResource
+ implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
+
+ @Override
+ public void initialize(ConfigurableApplicationContext
applicationContext) {
+ // Call parent to start the container and configure properties
+ super.initialize(applicationContext);
+ // Store reference to the container for test access
+ postgresContainer = getTestResource();
+ }
+
+ @Override
+ public void onApplicationEvent(ContextClosedEvent event) {
+ // Call parent to stop the container
+ super.onApplicationEvent(event);
+ postgresContainer = null;
+ }
+ }
+}
diff --git a/springboot/integration-tests/pom.xml
b/springboot/integration-tests/pom.xml
index d39f56167f..2ba2b9046d 100644
--- a/springboot/integration-tests/pom.xml
+++ b/springboot/integration-tests/pom.xml
@@ -42,6 +42,7 @@
<module>integration-tests-springboot-norest-it</module>
<module>integration-tests-springboot-processes-it</module>
<module>integration-tests-springboot-processes-persistence-it</module>
+ <module>integration-tests-springboot-transactions-it</module>
<module>integration-tests-springboot-usertasks-it</module>
<module>integration-tests-springboot-usertask-listeners-rollback-it</module>
<module>integration-tests-springboot-wshumantasks-it</module>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]