This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit f59e348f5f4d181bfa3fcaa6bea137fc5675c942
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Jul 2 17:16:27 2019 +0800

    SCB-1321 Add SagaActor call compensate
---
 alpha/alpha-fsm/pom.xml                            |   4 +
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |   3 +
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  62 +++++++++----
 .../apache/servicecomb/pack/alpha/fsm/TxState.java |   2 +-
 .../pack/alpha/fsm/domain/AddTxEventDomain.java    |  42 ++++++++-
 ...mponsitedEvent.java => TxCompensatedEvent.java} |  16 ++--
 .../pack/alpha/fsm/event/TxStartedEvent.java       | 103 +++++++++++++++++++++
 .../pack/alpha/fsm/event/base/TxEvent.java         |  18 ++++
 .../fsm/event/consumer/SagaEventConsumer.java      |   5 +-
 .../servicecomb/pack/alpha/fsm/model/TxEntity.java |  70 ++++++++++++++
 .../fsm/spring/integration/akka/LogExtension.java  |  31 -------
 ...ogExtensionImpl.java => SagaDataExtension.java} |  25 +++--
 .../integration/akka/SpringAkkaExtension.java      |  77 +++++++++++++++
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  |  32 +++----
 .../pack/alpha/fsm/SagaEventSender.java            |  50 +++++-----
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  26 +++---
 .../apache/servicecomb/pack/common/EventType.java  |   3 +-
 17 files changed, 447 insertions(+), 122 deletions(-)

diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index 8788430..b6c09d3 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -63,6 +63,10 @@
       <artifactId>pack-common</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>alpha-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index c5082ab..c92804d 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.pack.alpha.fsm;
 
+import static 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
+
 import akka.actor.ActorSystem;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
@@ -38,6 +40,7 @@ public class FsmAutoConfiguration {
   @Bean
   public ActorSystem actorSystem(ConfigurableApplicationContext 
applicationContext, ConfigurableEnvironment environment) {
     ActorSystem system = ActorSystem.create("alpha-akka", 
akkaConfiguration(applicationContext,environment));
+    SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
     return system;
   }
 
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index d22cb79..72be072 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -32,13 +32,14 @@ import 
org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent;
 import 
org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
-import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -81,8 +82,13 @@ public class SagaActor extends
     when(SagaActorState.READY,
         matchEvent(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new 
AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
                     .applying(domainEvent)
@@ -131,8 +137,13 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new 
AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               if (data.getExpirationTime() > 0) {
                 return stay()
                     .applying(domainEvent)
@@ -165,8 +176,13 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_COMMITTED,
         matchEvent(TxStartedEvent.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new 
AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
                     .applying(domainEvent)
@@ -230,7 +246,7 @@ public class SagaActor extends
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
-        ).event(TxComponsitedEvent.class, SagaData.class,
+        ).event(TxCompensatedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new 
UpdateTxEventDomain(event.getParentTxId(),
                   event.getLocalTxId(), TxState.COMPENSATED);
@@ -240,8 +256,9 @@ public class SagaActor extends
             }
         ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
             (event, data) -> {
-              if ((!data.isTerminated() && 
data.getCompensationRunningCounter().intValue() > 0)
-                  || hasCommittedTx(data)) {
+//              if ((!data.isTerminated() && 
data.getCompensationRunningCounter().intValue() > 0)
+//                  || hasCommittedTx(data)) {
+              if (hasCompensationSentTx(data)) {
                 return stay().replying(data);
               } else {
                 return goTo(SagaActorState.COMPENSATED)
@@ -255,7 +272,7 @@ public class SagaActor extends
               if (hasCommittedTx(data)) {
                 SagaEndedDomain domainEvent = new 
SagaEndedDomain(SagaActorState.FAILED);
                 return stay().replying(data).applying(domainEvent);
-              } else if(hasCompensationSentTx(data)){
+              } else if (hasCompensationSentTx(data)) {
                 return stay().replying(data);
               } else {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(
@@ -268,8 +285,13 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new 
AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               return stay().applying(domainEvent);
             }
         ).event(TxEndedEvent.class, SagaData.class,
@@ -332,7 +354,7 @@ public class SagaActor extends
     whenUnhandled(
         matchAnyEvent((event, data) -> {
           LOG.error("Unhandled event {}", event);
-          return goTo(SagaActorState.SUSPENDED).replying(data);
+          return stay();
         })
     );
 
@@ -340,7 +362,7 @@ public class SagaActor extends
         matchState(null, null, (from, to) -> {
           if (stateData().getGlobalTxId() != null) {
             stateData().setLastState(to);
-            LogExtension.LogExtensionProvider.get(getContext().getSystem())
+            
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                 .putSagaData(stateData().getGlobalTxId(), stateData());
           }
           LOG.info("transition {} {} -> {}", getSelf(), from, to);
@@ -353,7 +375,7 @@ public class SagaActor extends
               LOG.info("stop {} {}", data.getGlobalTxId(), state);
               data.setTerminated(true);
               data.setLastState(state);
-              LogExtension.LogExtensionProvider.get(getContext().getSystem())
+              
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                   .putSagaData(data.getGlobalTxId(), data);
             }
         )
@@ -372,8 +394,13 @@ public class SagaActor extends
       AddTxEventDomain domainEvent = (AddTxEventDomain) event;
       if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) {
         TxEntity txEntity = TxEntity.builder()
+            .serviceName(domainEvent.getServiceName())
+            .instanceId(domainEvent.getInstanceId())
+            .globalTxId(data.getGlobalTxId())
             .localTxId(domainEvent.getLocalTxId())
             .parentTxId(domainEvent.getParentTxId())
+            .compensationMethod(domainEvent.getCompensationMethod())
+            .payloads(domainEvent.getPayloads())
             .state(domainEvent.getState())
             .build();
         data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
@@ -451,6 +478,7 @@ public class SagaActor extends
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
     //TODO call omega compensate method
+    
SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
     LOG.info("compensate {}", txEntity.getLocalTxId());
     txEntity.setState(TxState.COMPENSATION_SENT);
   }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
index af02ce8..655db30 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
@@ -21,6 +21,6 @@ public enum TxState {
   ACTIVE,
   FAILED,
   COMMITTED,
-  COMPENSATION_SENT, // The compensation method has been called to wait for 
TxComponsitedEvent
+  COMPENSATION_SENT, // The compensation method has been called to wait for 
TxCompensatedEvent
   COMPENSATED
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
index c7c65a3..af5936d 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
@@ -20,13 +20,37 @@ package org.apache.servicecomb.pack.alpha.fsm.domain;
 import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
 public class AddTxEventDomain implements DomainEvent {
+  private String serviceName;
+  private String instanceId;
   private String parentTxId;
   private String localTxId;
   private TxState state = TxState.ACTIVE;
+  private String compensationMethod;
+  private byte[] payloads;
 
-  public AddTxEventDomain(String parentTxId, String localTxId) {
+  public AddTxEventDomain(String serviceName, String instanceId, String 
parentTxId, String localTxId, byte[] payloads, String compensationMethod) {
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
     this.parentTxId = parentTxId;
     this.localTxId = localTxId;
+    this.compensationMethod = compensationMethod;
+    this.payloads = payloads;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
   }
 
   public String getParentTxId() {
@@ -52,4 +76,20 @@ public class AddTxEventDomain implements DomainEvent {
   public void setState(TxState state) {
     this.state = state;
   }
+
+  public String getCompensationMethod() {
+    return compensationMethod;
+  }
+
+  public void setCompensationMethod(String compensationMethod) {
+    this.compensationMethod = compensationMethod;
+  }
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java
similarity index 77%
rename from 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java
index 1230fff..c007993 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event;
 
 import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 
-public class TxComponsitedEvent extends TxEvent {
+public class TxCompensatedEvent extends TxEvent {
 
   public static Builder builder() {
     return new Builder();
@@ -27,29 +27,29 @@ public class TxComponsitedEvent extends TxEvent {
 
   public static final class Builder {
 
-    private TxComponsitedEvent txComponsitedEvent;
+    private TxCompensatedEvent txCompensatedEvent;
 
     private Builder() {
-      txComponsitedEvent = new TxComponsitedEvent();
+      txCompensatedEvent = new TxCompensatedEvent();
     }
 
     public Builder parentTxId(String parentTxId) {
-      txComponsitedEvent.setParentTxId(parentTxId);
+      txCompensatedEvent.setParentTxId(parentTxId);
       return this;
     }
 
     public Builder localTxId(String localTxId) {
-      txComponsitedEvent.setLocalTxId(localTxId);
+      txCompensatedEvent.setLocalTxId(localTxId);
       return this;
     }
 
     public Builder globalTxId(String globalTxId) {
-      txComponsitedEvent.setGlobalTxId(globalTxId);
+      txCompensatedEvent.setGlobalTxId(globalTxId);
       return this;
     }
 
-    public TxComponsitedEvent build() {
-      return txComponsitedEvent;
+    public TxCompensatedEvent build() {
+      return txCompensatedEvent;
     }
   }
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
index bf259d0..5f173ef 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
@@ -17,9 +17,77 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.event;
 
+import java.util.Date;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 
 public class TxStartedEvent extends TxEvent {
+  private String serviceName;
+  private String instanceId;
+  private String compensationMethod;
+  private byte[] payloads;
+  private Date creationTime;
+  private String retryMethod;
+  private int retries;
+
+  @Override
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  @Override
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  @Override
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  @Override
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  public String getCompensationMethod() {
+    return compensationMethod;
+  }
+
+  public void setCompensationMethod(String compensationMethod) {
+    this.compensationMethod = compensationMethod;
+  }
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
+
+  public Date getCreationTime() {
+    return creationTime;
+  }
+
+  public void setCreationTime(Date creationTime) {
+    this.creationTime = creationTime;
+  }
+
+  public String getRetryMethod() {
+    return retryMethod;
+  }
+
+  public void setRetryMethod(String retryMethod) {
+    this.retryMethod = retryMethod;
+  }
+
+  public int getRetries() {
+    return retries;
+  }
+
+  public void setRetries(int retries) {
+    this.retries = retries;
+  }
 
   public static Builder builder() {
     return new Builder();
@@ -48,6 +116,41 @@ public class TxStartedEvent extends TxEvent {
       return this;
     }
 
+    public Builder compensationMethod(String compensationMethod) {
+      txStartedEvent.setCompensationMethod(compensationMethod);
+      return this;
+    }
+
+    public Builder payloads(byte[] payloads) {
+      txStartedEvent.setPayloads(payloads);
+      return this;
+    }
+
+    public Builder serviceName(String serviceName) {
+      txStartedEvent.setServiceName(serviceName);
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      txStartedEvent.setInstanceId(instanceId);
+      return this;
+    }
+
+    public Builder creationTime(Date creationTime) {
+      txStartedEvent.setCreationTime(creationTime);
+      return this;
+    }
+
+    public Builder retryMethod(String retryMethod) {
+      txStartedEvent.setRetryMethod(retryMethod);
+      return this;
+    }
+
+    public Builder retries(int retries) {
+      txStartedEvent.setRetries(retries);
+      return this;
+    }
+
     public TxStartedEvent build() {
       return txStartedEvent;
     }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
index ceaa242..61c3656 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
@@ -18,6 +18,8 @@
 package org.apache.servicecomb.pack.alpha.fsm.event.base;
 
 public abstract class TxEvent extends BaseEvent {
+  private String serviceName;
+  private String instanceId;
   private String parentTxId;
   private String localTxId;
 
@@ -37,6 +39,22 @@ public abstract class TxEvent extends BaseEvent {
     this.localTxId = localTxId;
   }
 
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + "{" +
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
index aae9419..7ddbc1a 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -37,19 +37,18 @@ import scala.concurrent.Future;
 public class SagaEventConsumer {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final Timeout TIMEOUT = new Timeout(1000, 
TimeUnit.MILLISECONDS);
+  public static final Timeout TIMEOUT = new Timeout(5, TimeUnit.SECONDS);
 
   @Autowired
   ActorSystem system;
 
   /**
-   * Receive saga message
+   * Receive fsm message
    * */
   @Subscribe
   public void receiveSagaEvent(BaseEvent event) throws Exception {
     LOG.info("receive {} ", event.toString());
     try{
-      //TODO Write-Ahead Logging
       ActorRef saga;
       String actorPath = "/user/" + event.getGlobalTxId();
       Optional<ActorRef> optional = this.getActorRefFromPath(actorPath);
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
index 1eb3020..0122adc 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -21,11 +21,32 @@ import java.io.Serializable;
 import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
 public class TxEntity implements Serializable {
+  private String serviceName;
+  private String instanceId;
+  private String globalTxId;
   private long beginTime = System.currentTimeMillis();
   private long endTime;
   private String parentTxId;
   private String localTxId;
   private TxState state;
+  private String compensationMethod;
+  private byte[] payloads;
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
 
   public long getBeginTime() {
     return beginTime;
@@ -43,6 +64,14 @@ public class TxEntity implements Serializable {
     this.endTime = endTime;
   }
 
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public void setGlobalTxId(String globalTxId) {
+    this.globalTxId = globalTxId;
+  }
+
   public String getParentTxId() {
     return parentTxId;
   }
@@ -67,6 +96,22 @@ public class TxEntity implements Serializable {
     this.state = state;
   }
 
+  public String getCompensationMethod() {
+    return compensationMethod;
+  }
+
+  public void setCompensationMethod(String compensationMethod) {
+    this.compensationMethod = compensationMethod;
+  }
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -89,6 +134,11 @@ public class TxEntity implements Serializable {
       return this;
     }
 
+    public Builder globalTxId(String globalTxId) {
+      txEntity.setGlobalTxId(globalTxId);
+      return this;
+    }
+
     public Builder parentTxId(String parentTxId) {
       txEntity.setParentTxId(parentTxId);
       return this;
@@ -99,11 +149,31 @@ public class TxEntity implements Serializable {
       return this;
     }
 
+    public Builder compensationMethod(String compensationMethod) {
+      txEntity.setCompensationMethod(compensationMethod);
+      return this;
+    }
+
+    public Builder payloads(byte[] payloads) {
+      txEntity.setPayloads(payloads);
+      return this;
+    }
+
     public Builder state(TxState state) {
       txEntity.setState(state);
       return this;
     }
 
+    public Builder serviceName(String serviceName) {
+      txEntity.setServiceName(serviceName);
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      txEntity.setInstanceId(instanceId);
+      return this;
+    }
+
     public TxEntity build() {
       return txEntity;
     }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
deleted file mode 100644
index 9c4d27d..0000000
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
-
-import akka.actor.AbstractExtensionId;
-import akka.actor.ExtendedActorSystem;
-
-public class LogExtension extends AbstractExtensionId<LogExtensionImpl> {
-
-  public static final LogExtension LogExtensionProvider = new LogExtension();
-
-  @Override
-  public LogExtensionImpl createExtension(ExtendedActorSystem system) {
-    return new LogExtensionImpl();
-  }
-}
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
similarity index 56%
rename from 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index 4bf00b4..774a2e2 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -17,19 +17,32 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
 
+import akka.actor.AbstractExtensionId;
+import akka.actor.ExtendedActorSystem;
 import akka.actor.Extension;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
 
-public class LogExtensionImpl implements Extension {
-  private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
-  public void putSagaData(String globalTxId, SagaData sagaData){
-    sagaDataMap.put(globalTxId, sagaData);
+  public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new 
SagaDataExtension();
+
+  @Override
+  public SagaDataExt createExtension(ExtendedActorSystem system) {
+    return new SagaDataExt();
   }
 
-  public SagaData getSagaData(String globalTxId){
-    return sagaDataMap.get(globalTxId);
+  public static class SagaDataExt implements Extension {
+    private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+
+    public void putSagaData(String globalTxId, SagaData sagaData){
+      sagaDataMap.put(globalTxId, sagaData);
+    }
+
+    public SagaData getSagaData(String globalTxId){
+      return sagaDataMap.get(globalTxId);
+    }
   }
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
new file mode 100644
index 0000000..6994084
--- /dev/null
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import static org.apache.servicecomb.pack.common.EventType.TxStartedEvent;
+
+import akka.actor.AbstractExtensionId;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Extension;
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.core.TxEvent;
+import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
+import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SpringExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+
+public class SpringAkkaExtension extends AbstractExtensionId<SpringExt> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final SpringAkkaExtension SPRING_EXTENSION_PROVIDER = new 
SpringAkkaExtension();
+
+  @Override
+  public SpringExt createExtension(ExtendedActorSystem system) {
+    return new SpringExt();
+  }
+
+  public static class SpringExt implements Extension {
+
+    private static final String omegaCallbackBeanName = "omegaCallback";
+    private volatile ApplicationContext applicationContext;
+    private OmegaCallback omegaCallback;
+
+    public void compensate(TxEntity txEntity) {
+      if (applicationContext != null) {
+        if (applicationContext.containsBean(omegaCallbackBeanName)) {
+          omegaCallback = applicationContext.getBean(omegaCallbackBeanName, 
OmegaCallback.class);
+          TxEvent event = new TxEvent(
+              txEntity.getServiceName(),
+              txEntity.getInstanceId(),
+              txEntity.getGlobalTxId(),
+              txEntity.getLocalTxId(),
+              txEntity.getParentTxId(),
+              TxStartedEvent.name(),
+              txEntity.getCompensationMethod(),
+              txEntity.getPayloads());
+          omegaCallback.compensate(event);
+          LOG.info(omegaCallback.toString());
+        } else {
+          LOG.warn("Spring Bean {} doesn't exist in ApplicationContext", 
omegaCallbackBeanName);
+        }
+      } else {
+        LOG.warn("Spring ApplicationContext is null");
+      }
+    }
+
+    public void initialize(ApplicationContext applicationContext) {
+      this.applicationContext = applicationContext;
+    }
+  }
+}
diff --git 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 313c526..71962df 100644
--- 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -31,7 +31,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -186,7 +186,7 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, 
SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
 
-      //expectTerminated(saga);
+      //expectTerminated(fsm);
 
       ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), 
"recoveredSaga");
       watch(recoveredSaga);
@@ -352,7 +352,7 @@ public class SagaActorTest {
    * 3. TxEndedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxAbortedEvent-12
-   * 6. TxComponsitedEvent-11
+   * 6. TxCompensatedEvent-11
    * 7. SagaAbortedEvent-1
    */
   @Test
@@ -414,8 +414,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -485,8 +485,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -538,7 +538,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+      sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), 
TxState.COMPENSATED);
@@ -558,8 +558,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -619,9 +619,9 @@ public class SagaActorTest {
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxComponsitedEvent-11
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 9. TxCompensatedEvent-11
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    */
   @Test
   public void sagaAbortedEventAfterAllTxEndedTest() {
@@ -673,7 +673,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), 
TxState.COMPENSATED);
@@ -940,8 +940,8 @@ public class SagaActorTest {
    * 3. TxEndedEvent-11
    * 5. TxEndedEvent-12
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
diff --git 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
index d303f86..925bbc2 100644
--- 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
+++ 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -24,7 +24,7 @@ import 
org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
@@ -79,7 +79,7 @@ public class SagaEventSender {
    * 3. TxEndedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxAbortedEvent-12
-   * 6. TxComponsitedEvent-11
+   * 6. TxCompensatedEvent-11
    * 7. SagaAbortedEvent-1
    */
   public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, 
String localTxId_1, String localTxId_2){
@@ -89,7 +89,7 @@ public class SagaEventSender {
     
sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     
sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     
sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -102,8 +102,8 @@ public class SagaEventSender {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String 
localTxId_1, String localTxId_2, String localTxId_3){
@@ -115,8 +115,8 @@ public class SagaEventSender {
     
sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     
sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     
sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -130,8 +130,8 @@ public class SagaEventSender {
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxComponsitedEvent-11
-   * 10. TxComponsitedEvent-12
+   * 9. TxCompensatedEvent-11
+   * 10. TxCompensatedEvent-12
    */
   public static List<BaseEvent> 
sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String 
localTxId_1, String localTxId_2, String localTxId_3){
     List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -143,8 +143,8 @@ public class SagaEventSender {
     
sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     
sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     return sagaEvents;
   }
 
@@ -156,8 +156,8 @@ public class SagaEventSender {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> 
receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String 
localTxId_1, String localTxId_2, String localTxId_3){
@@ -169,8 +169,8 @@ public class SagaEventSender {
     
sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     
sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     
sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -184,9 +184,9 @@ public class SagaEventSender {
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxComponsitedEvent-11
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 9. TxCompensatedEvent-11
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    */
   public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String 
globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -198,9 +198,9 @@ public class SagaEventSender {
     
sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     
sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     return sagaEvents;
   }
 
@@ -302,8 +302,8 @@ public class SagaEventSender {
    * 3. TxEndedEvent-11
    * 5. TxEndedEvent-12
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> 
lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, 
String localTxId_2, String localTxId_3){
@@ -315,8 +315,8 @@ public class SagaEventSender {
     
sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     
sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     
sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    
sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
diff --git 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 85c8b18..9a65bfb 100644
--- 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -24,7 +24,7 @@ import akka.actor.ActorSystem;
 import com.google.common.eventbus.EventBus;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -58,7 +58,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
             && sagaData.getBeginTime() > 0
@@ -82,7 +82,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -105,7 +105,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -130,7 +130,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -156,7 +156,7 @@ public class SagaIntegrationTest {
       });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -182,7 +182,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -208,7 +208,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -234,7 +234,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
             && sagaData.getBeginTime() > 0
@@ -261,7 +261,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(timeout+1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
             && sagaData.getBeginTime() > 0
@@ -287,7 +287,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
             && sagaData.getBeginTime() > 0
@@ -313,7 +313,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
             && sagaData.getBeginTime() > 0
@@ -339,7 +339,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = 
LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
diff --git 
a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java 
b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index f3d0585..774b2e9 100644
--- 
a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++ 
b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -23,5 +23,6 @@ public enum EventType {
   TxEndedEvent,
   TxAbortedEvent,
   TxCompensatedEvent,
-  SagaEndedEvent
+  SagaEndedEvent,
+  SagaAbortedEvent
 }

Reply via email to