This is an automated email from the ASF dual-hosted git repository. zbendhiba pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus-examples.git
commit efb8993712168f640dbe90ac9db79c6c1b7e0297 Author: Zheng Feng <[email protected]> AuthorDate: Fri Sep 16 16:25:22 2022 +0800 Update jta-jpa to support transaction crash and recovery (#105) * Update jta-jpa to support transaction crash and recovery * add license * update example.json --- docs/modules/ROOT/attachments/examples.json | 2 +- jta-jpa/README.adoc | 105 +++++++++-- jta-jpa/pom.xml | 27 --- jta-jpa/src/main/java/org/acme/CamelRoutes.java | 15 +- .../src/main/java/org/acme/DummyXAResource.java | 204 +++++++++++++++++++++ .../java/org/acme/DummyXAResourceRecovery.java | 81 ++++++++ jta-jpa/src/main/resources/application.properties | 8 +- jta-jpa/src/test/java/org/acme/JtaTest.java | 4 +- 8 files changed, 383 insertions(+), 63 deletions(-) diff --git a/docs/modules/ROOT/attachments/examples.json b/docs/modules/ROOT/attachments/examples.json index 42964a2..d920897 100644 --- a/docs/modules/ROOT/attachments/examples.json +++ b/docs/modules/ROOT/attachments/examples.json @@ -31,7 +31,7 @@ }, { "title": "JTA and JPA", - "description": "Shows how to run a Camel Quarkus application that supports JTA transactions on two external transactional resources: a database (MySQL) and a message broker (Artemis).", + "description": "Shows how to run a Camel Quarkus application that supports JTA transactions on two external transactional resources: a database (MySQL) and a simulate XAResource which can demonstrate the commit, rollback and crash recovery.", "link": "https://github.com/apache/camel-quarkus-examples/tree/main/jta-jpa" }, { diff --git a/jta-jpa/README.adoc b/jta-jpa/README.adoc index 1c2c250..ab5ee6b 100644 --- a/jta-jpa/README.adoc +++ b/jta-jpa/README.adoc @@ -1,5 +1,5 @@ = JTA and JPA: A Camel Quarkus example -:cq-example-description: An example that shows how to run a Camel Quarkus application that supports JTA transactions on two external transactional resources: a database (MySQL) and a message broker (Artemis). +:cq-example-description: An example that shows how to run a Camel Quarkus application that supports JTA transactions on two external transactional resources: a database (MySQL) and a simulate XAResource which can demonstrate the commit, rollback and crash recovery. {cq-description} @@ -14,10 +14,6 @@ and other general information. NOTE: The Narayana `node.identifier` is very important when you scale up in the cloud environment. It must be unique for each node. You can set it by using `quarkus.transaction-manager.node-name` property which the default value is `quarkus`. -.Todo crash recovery issues -- https://github.com/quarkiverse/quarkus-pooled-jms/issues/9[Add XA Recovery helper in quarkus-pooled-jms #9] -- https://github.com/quarkusio/quarkus/issues/26160[Improve recovery manager in quarkus-narayana-jta extension #26160] - == Start in the Development mode [source,shell] @@ -55,14 +51,7 @@ docker exec -it db-mysql mysql -uroot -proot -e \ FLUSH PRIVILEGES;" ---- -Start Artemis: -[source, shell] ----- -docker run --name artemis \ - -e AMQ_USER=admin -e AMQ_PASSWORD=admin \ - -d -p 61616:61616 \ - quay.io/artemiscloud/activemq-artemis-broker ----- + ==== Prerequisites - Make sure `io.quarkus:quarkus-jdbc-mysql` has been added in `pom.xml` @@ -77,11 +66,7 @@ $prod.quarkus.datasource.password=admin %prod.quarkus.datasource.jdbc.url=mysql://localhost:3306/testdb %prod.quarkus.datasource.jdbc.transactions=xa -# Quarkus Artemis and Messaginghub Pooled JMS -%prod.quarkus.artemis.url=tcp://localhost:61616 -%prod.quarkus.artemis.username=admin -%prod.quarkus.artemis.password=admin -quarkus.pooled-jms.xa.enabled=true +%prod.quarkus.hibernate-orm.database.generation=none ---- ==== JVM mode @@ -135,7 +120,7 @@ curl $ADDRESS/api/messages You should get some results like [source] ---- -[{message=hello}, {message=hello-ok}] +[{message=hello}] ---- Test rollback by calling the service with "fail" content: @@ -161,6 +146,88 @@ Stacktrace ---- +Test crash recovery by calling the service with "crash" content: +[source,shell] +---- +curl -X POST $ADDRESS/api/message/crash +---- +The application should be crashed, and you can not see any response. +[source] +---- +curl: (52) Empty reply from server +---- +Now restart the application, and wait about 10 seconds, then you can see the following messages that the application has recovered the transaction. +[source] +---- +2022-09-16 12:35:39,994 INFO [io.quarkus] (main) camel-quarkus-examples-jta-jpa 2.13.0-SNAPSHOT on JVM (powered by Quarkus 2.13.0.CR1) started in 1.755s. Listening on: http://0.0.0.0:8080 +2022-09-16 12:35:39,994 INFO [io.quarkus] (main) Profile prod activated. +2022-09-16 12:35:39,994 INFO [io.quarkus] (main) Installed features: [agroal, camel-attachments, camel-bean, camel-core, camel-direct, camel-jpa, camel-jta, camel-log, camel-microprofile-health, camel-platform-http, camel-rest, cdi, hibernate-orm, jdbc-h2, jdbc-mysql, narayana-jta, smallrye-context-propagation, smallrye-health, vertx] +2022-09-16 12:35:49,251 INFO [org.acm.DummyXAResourceRecovery] (Periodic Recovery) DummyXAResourceRecovery returning list of resources: [org.acme.DummyXAResource@35cdbf7a] +2022-09-16 12:35:49,270 INFO [org.acm.DummyXAResource] (Periodic Recovery) Committing DummyXAResource +---- +check the audit_log table, you should see the message "crash" in the table. + +== Running with Artemis JMS +If you want to use artemis-jms with XA support, you need to add the following dependency in `pom.xml` +[source, xml] +---- +<dependency> + <groupId>io.quarkiverse.artemis</groupId> + <artifactId>quarkus-artemis-jms</artifactId> + <version>1.2.0</version> +</dependency> +<dependency> + <groupId>io.quarkiverse.messaginghub</groupId> + <artifactId>quarkus-pooled-jms</artifactId> + <version>1.0.1</version> +</dependency> +---- + +And you need to add the following configuration in `application.properties` +[source, properties] +---- +# Quarkus Artemis and Messaginghub Pooled JMS +quarkus.artemis.url=tcp://localhost:61616 +quarkus.artemis.username=admin +quarkus.artemis.password=admin +quarkus.pooled-jms.xa.enabled=true +---- + +Start Artemis: +[source, shell] +---- +docker run --name artemis \ + -e AMQ_USER=admin -e AMQ_PASSWORD=admin \ + -d -p 61616:61616 \ + quay.io/artemiscloud/activemq-artemis-broker +---- + +Make some changes in `CamelRoutes` to use camel-quarkus-jms send and receive messages from Artemis. +[source, java] +---- +from("direct:trans") + .transacted() + .setBody(simple("${headers.message}")) + .to("bean:auditLog?method=createAuditLog(${body})") + .to("jpa:org.acme.AuditLog") + .setBody(simple("${headers.message}")) + .to("jms:outbound?disableReplyTo=true") + .choice() + .when(body().startsWith("fail")) + .log("Forced exception") + .process(x -> { + throw new RuntimeException("fail"); + }) + .otherwise() + .log("Message added: ${body}") + .endChoice(); + +from("jms:outbound") + .log("Message out: ${body}") + .to("bean:auditLog?method=createAuditLog(${body}-ok)") + .to("jpa:org.acme.AuditLog"); +---- + == Feedback Please report bugs and propose improvements via https://github.com/apache/camel-quarkus/issues[GitHub issues of Camel Quarkus] project. diff --git a/jta-jpa/pom.xml b/jta-jpa/pom.xml index 82a8cdb..081bf3e 100644 --- a/jta-jpa/pom.xml +++ b/jta-jpa/pom.xml @@ -27,8 +27,6 @@ <properties> <quarkus.platform.version>2.13.0.CR1</quarkus.platform.version> <camel-quarkus.platform.version>2.13.0-SNAPSHOT</camel-quarkus.platform.version> - <quarkus-artemis.version>1.2.0</quarkus-artemis.version> - <quarkus-pooled-jms.version>1.0.1</quarkus-pooled-jms.version> <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id> <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id> @@ -65,13 +63,6 @@ <type>pom</type> <scope>import</scope> </dependency> - <dependency> - <groupId>io.quarkiverse.artemis</groupId> - <artifactId>quarkus-artemis-bom</artifactId> - <version>${quarkus-artemis.version}</version> - <type>pom</type> - <scope>import</scope> - </dependency> </dependencies> </dependencyManagement> <dependencies> @@ -95,10 +86,6 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-bean</artifactId> </dependency> - <dependency> - <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-jms</artifactId> - </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-jpa</artifactId> @@ -119,15 +106,6 @@ <groupId>io.quarkus</groupId> <artifactId>quarkus-jdbc-mysql</artifactId> </dependency> - <dependency> - <groupId>io.quarkiverse.artemis</groupId> - <artifactId>quarkus-artemis-jms</artifactId> - </dependency> - <dependency> - <groupId>io.quarkiverse.messaginghub</groupId> - <artifactId>quarkus-pooled-jms</artifactId> - <version>${quarkus-pooled-jms.version}</version> - </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> @@ -138,11 +116,6 @@ <artifactId>quarkus-test-h2</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>io.quarkiverse.artemis</groupId> - <artifactId>quarkus-test-artemis</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>io.rest-assured</groupId> <artifactId>rest-assured</artifactId> diff --git a/jta-jpa/src/main/java/org/acme/CamelRoutes.java b/jta-jpa/src/main/java/org/acme/CamelRoutes.java index f4544dc..a8efb14 100644 --- a/jta-jpa/src/main/java/org/acme/CamelRoutes.java +++ b/jta-jpa/src/main/java/org/acme/CamelRoutes.java @@ -17,12 +17,17 @@ package org.acme; import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.transaction.TransactionManager; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.rest.RestParamType; @ApplicationScoped public class CamelRoutes extends RouteBuilder { + @Inject + TransactionManager transactionManager; + @Override public void configure() { rest("/messages") @@ -40,10 +45,13 @@ public class CamelRoutes extends RouteBuilder { from("direct:trans") .transacted() .setBody(simple("${headers.message}")) + .process(x -> { + DummyXAResource xaResource = new DummyXAResource("crash".equals(x.getIn().getBody(String.class))); + transactionManager.getTransaction().enlistResource(xaResource); + }) .to("bean:auditLog?method=createAuditLog(${body})") .to("jpa:org.acme.AuditLog") .setBody(simple("${headers.message}")) - .to("jms:outbound?disableReplyTo=true") .choice() .when(body().startsWith("fail")) .log("Forced exception") @@ -53,10 +61,5 @@ public class CamelRoutes extends RouteBuilder { .otherwise() .log("Message added: ${body}") .endChoice(); - - from("jms:outbound") - .log("Message out: ${body}") - .to("bean:auditLog?method=createAuditLog(${body}-ok)") - .to("jpa:org.acme.AuditLog"); } } diff --git a/jta-jpa/src/main/java/org/acme/DummyXAResource.java b/jta-jpa/src/main/java/org/acme/DummyXAResource.java new file mode 100644 index 0000000..fca7860 --- /dev/null +++ b/jta-jpa/src/main/java/org/acme/DummyXAResource.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import com.arjuna.ats.arjuna.common.Uid; +import org.jboss.logging.Logger; + +/** + * This class is used solely for simulating system crash. + * + */ +public class DummyXAResource implements XAResource { + private Logger LOG = Logger.getLogger(DummyXAResource.class); + + public static final String LOG_DIR = "target/DummyXAResource/"; + + private final boolean shouldCrash; + + private Xid xid; + + private File file; + + public DummyXAResource(boolean shouldCrash) { + this.shouldCrash = shouldCrash; + } + + /** + * Constructor used by recovery manager to recreate XAResource + * + * @param file File where Xid of the XAResource is stored + */ + public DummyXAResource(File file) throws IOException { + this.shouldCrash = false; + this.file = file; + this.xid = getXidFromFile(file); + } + + public int prepare(final Xid xid) throws XAException { + LOG.info("Preparing " + DummyXAResource.class.getSimpleName()); + + this.file = writeXidToFile(xid, LOG_DIR); + + return XA_OK; + } + + public void commit(final Xid xid, final boolean arg1) throws XAException { + LOG.info("Committing " + DummyXAResource.class.getSimpleName()); + + if (shouldCrash) { + LOG.info("Crashing the system"); + Runtime.getRuntime().halt(1); + } + + removeFile(file); + this.file = null; + this.xid = null; + } + + public void rollback(final Xid xid) throws XAException { + LOG.info("Rolling back " + DummyXAResource.class.getSimpleName()); + + removeFile(file); + this.file = null; + this.xid = null; + } + + public boolean isSameRM(XAResource xaResource) throws XAException { + if (!(xaResource instanceof DummyXAResource)) { + return false; + } + + DummyXAResource other = (DummyXAResource) xaResource; + + return xid != null && other.xid != null && xid.getFormatId() == other.xid.getFormatId() + && Arrays.equals(xid.getGlobalTransactionId(), other.xid.getGlobalTransactionId()) + && Arrays.equals(xid.getBranchQualifier(), other.xid.getBranchQualifier()); + } + + public Xid[] recover(int flag) throws XAException { + return new Xid[] { xid }; + } + + public void start(Xid xid, int flags) throws XAException { + + } + + public void end(Xid xid, int flags) throws XAException { + + } + + public void forget(Xid xid) throws XAException { + + } + + public int getTransactionTimeout() throws XAException { + return 0; + } + + public boolean setTransactionTimeout(final int seconds) throws XAException { + return true; + } + + private Xid getXidFromFile(File file) throws IOException { + try (DataInputStream inputStream = new DataInputStream(new FileInputStream(file))) { + int formatId = inputStream.readInt(); + int globalTransactionIdLength = inputStream.readInt(); + byte[] globalTransactionId = new byte[globalTransactionIdLength]; + inputStream.read(globalTransactionId, 0, globalTransactionIdLength); + int branchQualifierLength = inputStream.readInt(); + byte[] branchQualifier = new byte[branchQualifierLength]; + inputStream.read(branchQualifier, 0, branchQualifierLength); + + return new XidImpl(formatId, globalTransactionId, branchQualifier); + } + } + + private File writeXidToFile(Xid xid, String directory) throws XAException { + File dir = new File(directory); + + if (!dir.exists() && !dir.mkdirs()) { + throw new XAException(XAException.XAER_RMERR); + } + + File file = new File(dir, new Uid().fileStringForm() + "_"); + + try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(file))) { + outputStream.writeInt(xid.getFormatId()); + outputStream.writeInt(xid.getGlobalTransactionId().length); + outputStream.write(xid.getGlobalTransactionId(), 0, xid.getGlobalTransactionId().length); + outputStream.writeInt(xid.getBranchQualifier().length); + outputStream.write(xid.getBranchQualifier(), 0, xid.getBranchQualifier().length); + outputStream.flush(); + } catch (IOException e) { + throw new XAException(XAException.XAER_RMERR); + } + + return file; + } + + private void removeFile(File file) throws XAException { + if (file != null) { + if (!file.delete()) { + throw new XAException(XAException.XA_RETRY); + } + } + } + + private class XidImpl implements Xid { + + private final int formatId; + + private final byte[] globalTransactionId; + + private final byte[] branchQualifier; + + public XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) { + this.formatId = formatId; + this.globalTransactionId = globalTransactionId; + this.branchQualifier = branchQualifier; + } + + @Override + public int getFormatId() { + return formatId; + } + + @Override + public byte[] getGlobalTransactionId() { + return globalTransactionId; + } + + @Override + public byte[] getBranchQualifier() { + return branchQualifier; + } + + } +} diff --git a/jta-jpa/src/main/java/org/acme/DummyXAResourceRecovery.java b/jta-jpa/src/main/java/org/acme/DummyXAResourceRecovery.java new file mode 100644 index 0000000..c6d3321 --- /dev/null +++ b/jta-jpa/src/main/java/org/acme/DummyXAResourceRecovery.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.transaction.xa.XAResource; + +import io.quarkus.runtime.Startup; +import org.jboss.logging.Logger; +import org.jboss.tm.XAResourceRecovery; +import org.jboss.tm.XAResourceRecoveryRegistry; + +/** + * This class is used solely for simulating system crash. + * + */ +@Startup +public class DummyXAResourceRecovery implements XAResourceRecovery { + private Logger LOG = Logger.getLogger(DummyXAResourceRecovery.class); + + @Inject + XAResourceRecoveryRegistry xaResourceRecoveryRegistry; + + @PostConstruct + void init() { + LOG.info("register DummyXAResourceRecovery"); + xaResourceRecoveryRegistry.addXAResourceRecovery(this); + } + + @Override + public XAResource[] getXAResources() throws RuntimeException { + List<DummyXAResource> resources = Collections.emptyList(); + try { + resources = getXAResourcesFromDirectory(DummyXAResource.LOG_DIR); + } catch (IOException e) { + } + + if (!resources.isEmpty()) { + LOG.info(DummyXAResourceRecovery.class.getSimpleName() + " returning list of resources: " + resources); + } + + return resources.toArray(new XAResource[] {}); + } + + private List<DummyXAResource> getXAResourcesFromDirectory(String directory) throws IOException { + List<DummyXAResource> resources = new ArrayList<>(); + + Files.newDirectoryStream(FileSystems.getDefault().getPath(directory), "*_").forEach(path -> { + try { + resources.add(new DummyXAResource(path.toFile())); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + return resources; + } + +} diff --git a/jta-jpa/src/main/resources/application.properties b/jta-jpa/src/main/resources/application.properties index caa9103..b17bd2b 100644 --- a/jta-jpa/src/main/resources/application.properties +++ b/jta-jpa/src/main/resources/application.properties @@ -34,13 +34,7 @@ quarkus.datasource.jdbc.transactions=xa # Hibernate ORM quarkus.hibernate-orm.database.generation=drop-and-create - -# Quarkus Artemis and Messaginghub Pooled JMS -%test.quarkus.artemis.devservices.enabled=false -#%prod.quarkus.artemis.url=tcp://localhost:61616 -#%prod.quarkus.artemis.username=admin -#%prod.quarkus.artemis.password=admin -quarkus.pooled-jms.xa.enabled=true +#%prod.quarkus.hibernate-orm.database.generation=none # Quarkus Narayana JTA quarkus.transaction-manager.object-store-directory=target/narayana diff --git a/jta-jpa/src/test/java/org/acme/JtaTest.java b/jta-jpa/src/test/java/org/acme/JtaTest.java index 9375ea9..7863866 100644 --- a/jta-jpa/src/test/java/org/acme/JtaTest.java +++ b/jta-jpa/src/test/java/org/acme/JtaTest.java @@ -18,7 +18,6 @@ package org.acme; import java.util.UUID; -import io.quarkus.artemis.test.ArtemisTestResource; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.h2.H2DatabaseTestResource; import io.quarkus.test.junit.QuarkusTest; @@ -29,7 +28,6 @@ import static org.hamcrest.Matchers.is; @QuarkusTest @QuarkusTestResource(H2DatabaseTestResource.class) -@QuarkusTestResource(ArtemisTestResource.class) public class JtaTest { @Test public void testXA() { @@ -43,7 +41,7 @@ public class JtaTest { .when().get("/api/messages") .then() .statusCode(200) - .body(is("[{message=" + body + "}, {message=" + body + "-ok}]")); + .body(is("[{message=" + body + "}]")); } @Test
