This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git


The following commit(s) were added to refs/heads/SCB-1321 by this push:
     new a620a1f  SCB-1321 Improve test cases for AlphaIntegrationFsmTest
a620a1f is described below

commit a620a1f7756c2ff6d4442186e70052b8b21fa04d
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Jul 2 21:47:07 2019 +0800

    SCB-1321 Improve test cases for AlphaIntegrationFsmTest
---
 .../alpha/server/fsm/GrpcSagaEventService.java     |   3 +
 .../src/main/resources/application.yaml            |  16 +-
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 218 ++++++++++++++++++++-
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  | 122 ++++++++++++
 .../apache/servicecomb/pack/common/EventType.java  |   3 +-
 5 files changed, 358 insertions(+), 4 deletions(-)

diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 2daac50..a54ef83 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -91,6 +91,9 @@ public class GrpcSagaEventService extends 
TxEventServiceImplBase {
     } else if (message.getType().equals(EventType.SagaAbortedEvent.name())) {
       event = 
org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent.builder()
           .globalTxId(message.getGlobalTxId()).build();
+    } else if (message.getType().equals(EventType.SagaTimeoutEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent.builder()
+          .globalTxId(message.getGlobalTxId()).build();
     } else if (message.getType().equals(EventType.TxStartedEvent.name())) {
       event = 
org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent.builder()
           .serviceName(message.getServiceName())
diff --git a/alpha/alpha-server/src/main/resources/application.yaml 
b/alpha/alpha-server/src/main/resources/application.yaml
index 01b03f4..a9946ed 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -98,4 +98,18 @@ akkaConfig:
   akka.persistence.journal.plugin: akka.persistence.journal.inmem
   akka.persistence.journal.leveldb.dir: target/example/journal
   akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
-  akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file
+  akka.persistence.snapshot-store.local.dir: target/example/snapshots
+
+---
+spring:
+  profiles: akka-persistence-redis
+akkaConfig:
+  akka.persistence.journal.plugin: akka-persistence-redis.journal
+  akka.persistence.snapshot-store.plugin: akka-persistence-redis.snapshot
+  akka-persistence-redis:
+    redis:
+      mode: simple
+      host: localhost
+      port: 6379
+      database: 0
+      #password:
\ No newline at end of file
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index f98474d..d4d3e99 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -50,7 +50,7 @@ import org.springframework.test.context.junit4.SpringRunner;
         "alpha.event.pollingInterval=1",
         "spring.main.allow-bean-definition-overriding=true",
         "alpha.model.actor.enabled=true",
-        "spring.profiles.active=akka-persistence-mem"
+        "spring.profiles.active=akka-persistence-redis"
        })
 public class AlphaIntegrationFsmTest {
   private static final OmegaEventSender omegaEventSender = 
OmegaEventSender.builder().build();
@@ -107,6 +107,50 @@ public class AlphaIntegrationFsmTest {
   }
 
   @Test
+  public void firstTxAbortedEventTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().firstTxAbortedEvents(globalTxId, 
localTxId_1).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),1);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED);
+  }
+
+  @Test
+  public void middleTxAbortedEventTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().middleTxAbortedEvents(globalTxId, 
localTxId_1, localTxId_2).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),2);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.FAILED);
+  }
+
+  @Test
   public void lastTxAbortedEventTest() {
     omegaEventSender.onConnected();
     final String globalTxId = UUID.randomUUID().toString();
@@ -116,7 +160,6 @@ public class AlphaIntegrationFsmTest {
     
omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
       omegaEventSender.getBlockingStub().onTxEvent(event);
     });
-    await().atMost(1, SECONDS).until(() -> 
omegaEventSender.getOmegaCallbacks() != null);
     await().atMost(1, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
@@ -131,4 +174,175 @@ public class AlphaIntegrationFsmTest {
     
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
     
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED);
   }
+
+  @Test
+  public void receivedRemainingEventAfterFirstTxAbortedEventTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId,
 localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED);
+  }
+
+  @Test
+  public void sagaAbortedEventAfterAllTxEndedTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().sagaAbortedEventAfterAllTxEndedsEvents(globalTxId,
 localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED);
+  }
+
+  @Test
+  public void omegaSendSagaTimeoutEventTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().omegaSendSagaTimeoutEvents(globalTxId,
 localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.SUSPENDED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+  }
+
+  @Test
+  public void sagaActorTriggerTimeoutTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    final int timeout = 5; // second
+    
omegaEventSender.getOmegaEventSagaSimulator().sagaActorTriggerTimeoutEvents(globalTxId,
 localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(timeout + 1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.SUSPENDED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+  }
+
+  @Test
+  public void successfulWithTxConcurrentTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().successfulWithTxConcurrentEvents(globalTxId,
 localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMMITTED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+  }
+
+  @Test
+  public void successfulWithTxConcurrentCrossTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().successfulWithTxConcurrentCrossEvents(globalTxId,
 localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMMITTED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+  }
+
+  @Test
+  public void lastTxAbortedEventWithTxConcurrentTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEventWithTxConcurrentEvents(globalTxId,
 localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED);
+  }
 }
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 6e3c3ce..22b5cac 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -41,6 +41,26 @@ public class OmegaEventSagaSimulator {
     return sagaEvents;
   }
 
+  public List<GrpcTxEvent> firstTxAbortedEvents(String globalTxId, String 
localTxId_1){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> middleTxAbortedEvents(String globalTxId, String 
localTxId_1, String localTxId_2){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
   public List<GrpcTxEvent> lastTxAbortedEvents(String globalTxId, String 
localTxId_1, String localTxId_2, String localTxId_3){
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
@@ -54,12 +74,108 @@ public class OmegaEventSagaSimulator {
     return sagaEvents;
   }
 
+  public List<GrpcTxEvent> 
receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String 
localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service 
c".getBytes(), "method c"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> sagaAbortedEventAfterAllTxEndedsEvents(String 
globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service 
c".getBytes(), "method c"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> omegaSendSagaTimeoutEvents(String globalTxId, 
String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service 
c".getBytes(), "method c"));
+    sagaEvents.add(sagaTimeoutEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> sagaActorTriggerTimeoutEvents(String globalTxId, 
String localTxId_1, String localTxId_2, String localTxId_3, int timeout){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId,timeout));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service 
c".getBytes(), "method c"));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> successfulWithTxConcurrentEvents(String globalTxId, 
String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service 
c".getBytes(), "method c"));
+    sagaEvents.add(sagaEndedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> successfulWithTxConcurrentCrossEvents(String 
globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service 
c".getBytes(), "method c"));
+    sagaEvents.add(sagaEndedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> lastTxAbortedEventWithTxConcurrentEvents(String 
globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
   private GrpcTxEvent sagaStartedEvent(String globalTxId) {
     return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
         0);
   }
 
+  private GrpcTxEvent sagaStartedEvent(String globalTxId, int timeout) {
+    return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
+        null, new byte[0], "", timeout, "",
+        0);
+  }
+
   private GrpcTxEvent sagaEndedEvent(String globalTxId) {
     return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
@@ -72,6 +188,12 @@ public class OmegaEventSagaSimulator {
         0);
   }
 
+  private GrpcTxEvent sagaTimeoutEvent(String globalTxId) {
+    return eventOf(EventType.SagaTimeoutEvent, globalTxId, globalTxId,
+        null, new byte[0], "", 0, "",
+        0);
+  }
+
   private GrpcTxEvent txStartedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String 
compensationMethod) {
     return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
diff --git 
a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java 
b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index 774b2e9..df985f4 100644
--- 
a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++ 
b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -24,5 +24,6 @@ public enum EventType {
   TxAbortedEvent,
   TxCompensatedEvent,
   SagaEndedEvent,
-  SagaAbortedEvent
+  SagaAbortedEvent,
+  SagaTimeoutEvent
 }

Reply via email to