This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
The following commit(s) were added to refs/heads/SCB-1321 by this push:
new a620a1f SCB-1321 Improve test cases for AlphaIntegrationFsmTest
a620a1f is described below
commit a620a1f7756c2ff6d4442186e70052b8b21fa04d
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Jul 2 21:47:07 2019 +0800
SCB-1321 Improve test cases for AlphaIntegrationFsmTest
---
.../alpha/server/fsm/GrpcSagaEventService.java | 3 +
.../src/main/resources/application.yaml | 16 +-
.../alpha/server/fsm/AlphaIntegrationFsmTest.java | 218 ++++++++++++++++++++-
.../alpha/server/fsm/OmegaEventSagaSimulator.java | 122 ++++++++++++
.../apache/servicecomb/pack/common/EventType.java | 3 +-
5 files changed, 358 insertions(+), 4 deletions(-)
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 2daac50..a54ef83 100644
---
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -91,6 +91,9 @@ public class GrpcSagaEventService extends
TxEventServiceImplBase {
} else if (message.getType().equals(EventType.SagaAbortedEvent.name())) {
event =
org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent.builder()
.globalTxId(message.getGlobalTxId()).build();
+ } else if (message.getType().equals(EventType.SagaTimeoutEvent.name())) {
+ event =
org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent.builder()
+ .globalTxId(message.getGlobalTxId()).build();
} else if (message.getType().equals(EventType.TxStartedEvent.name())) {
event =
org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent.builder()
.serviceName(message.getServiceName())
diff --git a/alpha/alpha-server/src/main/resources/application.yaml
b/alpha/alpha-server/src/main/resources/application.yaml
index 01b03f4..a9946ed 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -98,4 +98,18 @@ akkaConfig:
akka.persistence.journal.plugin: akka.persistence.journal.inmem
akka.persistence.journal.leveldb.dir: target/example/journal
akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
- akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file
+ akka.persistence.snapshot-store.local.dir: target/example/snapshots
+
+---
+spring:
+ profiles: akka-persistence-redis
+akkaConfig:
+ akka.persistence.journal.plugin: akka-persistence-redis.journal
+ akka.persistence.snapshot-store.plugin: akka-persistence-redis.snapshot
+ akka-persistence-redis:
+ redis:
+ mode: simple
+ host: localhost
+ port: 6379
+ database: 0
+ #password:
\ No newline at end of file
diff --git
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index f98474d..d4d3e99 100644
---
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -50,7 +50,7 @@ import org.springframework.test.context.junit4.SpringRunner;
"alpha.event.pollingInterval=1",
"spring.main.allow-bean-definition-overriding=true",
"alpha.model.actor.enabled=true",
- "spring.profiles.active=akka-persistence-mem"
+ "spring.profiles.active=akka-persistence-redis"
})
public class AlphaIntegrationFsmTest {
private static final OmegaEventSender omegaEventSender =
OmegaEventSender.builder().build();
@@ -107,6 +107,50 @@ public class AlphaIntegrationFsmTest {
}
@Test
+ public void firstTxAbortedEventTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().firstTxAbortedEvents(globalTxId,
localTxId_1).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMPENSATED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),1);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED);
+ }
+
+ @Test
+ public void middleTxAbortedEventTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().middleTxAbortedEvents(globalTxId,
localTxId_1, localTxId_2).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMPENSATED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),2);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.FAILED);
+ }
+
+ @Test
public void lastTxAbortedEventTest() {
omegaEventSender.onConnected();
final String globalTxId = UUID.randomUUID().toString();
@@ -116,7 +160,6 @@ public class AlphaIntegrationFsmTest {
omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
omegaEventSender.getBlockingStub().onTxEvent(event);
});
- await().atMost(1, SECONDS).until(() ->
omegaEventSender.getOmegaCallbacks() != null);
await().atMost(1, SECONDS).until(() -> {
SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMPENSATED;
@@ -131,4 +174,175 @@ public class AlphaIntegrationFsmTest {
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED);
}
+
+ @Test
+ public void receivedRemainingEventAfterFirstTxAbortedEventTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMPENSATED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED);
+ }
+
+ @Test
+ public void sagaAbortedEventAfterAllTxEndedTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().sagaAbortedEventAfterAllTxEndedsEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMPENSATED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED);
+ }
+
+ @Test
+ public void omegaSendSagaTimeoutEventTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().omegaSendSagaTimeoutEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.SUSPENDED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+ }
+
+ @Test
+ public void sagaActorTriggerTimeoutTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ final int timeout = 5; // second
+
omegaEventSender.getOmegaEventSagaSimulator().sagaActorTriggerTimeoutEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(timeout + 1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.SUSPENDED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+ }
+
+ @Test
+ public void successfulWithTxConcurrentTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().successfulWithTxConcurrentEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMMITTED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+ }
+
+ @Test
+ public void successfulWithTxConcurrentCrossTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().successfulWithTxConcurrentCrossEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMMITTED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+ }
+
+ @Test
+ public void lastTxAbortedEventWithTxConcurrentTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEventWithTxConcurrentEvents(globalTxId,
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null &&
sagaData.getLastState()==SagaActorState.COMPENSATED;
+ });
+ SagaData sagaData =
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED);
+ }
}
diff --git
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 6e3c3ce..22b5cac 100644
---
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -41,6 +41,26 @@ public class OmegaEventSagaSimulator {
return sagaEvents;
}
+ public List<GrpcTxEvent> firstTxAbortedEvents(String globalTxId, String
localTxId_1){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(sagaAbortedEvent(globalTxId));
+ return sagaEvents;
+ }
+
+ public List<GrpcTxEvent> middleTxAbortedEvents(String globalTxId, String
localTxId_1, String localTxId_2){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service
a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(sagaAbortedEvent(globalTxId));
+ return sagaEvents;
+ }
+
public List<GrpcTxEvent> lastTxAbortedEvents(String globalTxId, String
localTxId_1, String localTxId_2, String localTxId_3){
List<GrpcTxEvent> sagaEvents = new ArrayList<>();
sagaEvents.add(sagaStartedEvent(globalTxId));
@@ -54,12 +74,108 @@ public class OmegaEventSagaSimulator {
return sagaEvents;
}
+ public List<GrpcTxEvent>
receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String
localTxId_1, String localTxId_2, String localTxId_3){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service
b".getBytes(), "method b"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service
c".getBytes(), "method c"));
+ sagaEvents.add(sagaAbortedEvent(globalTxId));
+ return sagaEvents;
+ }
+
+ public List<GrpcTxEvent> sagaAbortedEventAfterAllTxEndedsEvents(String
globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service
a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service
b".getBytes(), "method b"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service
c".getBytes(), "method c"));
+ sagaEvents.add(sagaAbortedEvent(globalTxId));
+ return sagaEvents;
+ }
+
+ public List<GrpcTxEvent> omegaSendSagaTimeoutEvents(String globalTxId,
String localTxId_1, String localTxId_2, String localTxId_3){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service
a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service
b".getBytes(), "method b"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service
c".getBytes(), "method c"));
+ sagaEvents.add(sagaTimeoutEvent(globalTxId));
+ return sagaEvents;
+ }
+
+ public List<GrpcTxEvent> sagaActorTriggerTimeoutEvents(String globalTxId,
String localTxId_1, String localTxId_2, String localTxId_3, int timeout){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId,timeout));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service
a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service
b".getBytes(), "method b"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service
c".getBytes(), "method c"));
+ return sagaEvents;
+ }
+
+ public List<GrpcTxEvent> successfulWithTxConcurrentEvents(String globalTxId,
String localTxId_1, String localTxId_2, String localTxId_3){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service
a".getBytes(), "method a"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service
b".getBytes(), "method b"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service
c".getBytes(), "method c"));
+ sagaEvents.add(sagaEndedEvent(globalTxId));
+ return sagaEvents;
+ }
+
+ public List<GrpcTxEvent> successfulWithTxConcurrentCrossEvents(String
globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service
a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service
b".getBytes(), "method b"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service
c".getBytes(), "method c"));
+ sagaEvents.add(sagaEndedEvent(globalTxId));
+ return sagaEvents;
+ }
+
+ public List<GrpcTxEvent> lastTxAbortedEventWithTxConcurrentEvents(String
globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId,
"service a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId,
"service b".getBytes(), "method b"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service
a".getBytes(), "method a"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service
b".getBytes(), "method b"));
+ sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId,
"service c".getBytes(), "method c"));
+ sagaEvents.add(sagaAbortedEvent(globalTxId));
+ return sagaEvents;
+ }
+
private GrpcTxEvent sagaStartedEvent(String globalTxId) {
return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
0);
}
+ private GrpcTxEvent sagaStartedEvent(String globalTxId, int timeout) {
+ return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
+ null, new byte[0], "", timeout, "",
+ 0);
+ }
+
private GrpcTxEvent sagaEndedEvent(String globalTxId) {
return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
@@ -72,6 +188,12 @@ public class OmegaEventSagaSimulator {
0);
}
+ private GrpcTxEvent sagaTimeoutEvent(String globalTxId) {
+ return eventOf(EventType.SagaTimeoutEvent, globalTxId, globalTxId,
+ null, new byte[0], "", 0, "",
+ 0);
+ }
+
private GrpcTxEvent txStartedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String
compensationMethod) {
return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
diff --git
a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index 774b2e9..df985f4 100644
---
a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++
b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -24,5 +24,6 @@ public enum EventType {
TxAbortedEvent,
TxCompensatedEvent,
SagaEndedEvent,
- SagaAbortedEvent
+ SagaAbortedEvent,
+ SagaTimeoutEvent
}