[ 
https://issues.apache.org/jira/browse/SCB-224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457481#comment-16457481
 ] 

ASF GitHub Bot commented on SCB-224:
------------------------------------

WillemJiang closed pull request #138: SCB-224 retry sub-transaction on failure
URL: https://github.com/apache/incubator-servicecomb-saga/pull/138
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
 
b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
index 4f2f729a..f9af438a 100644
--- 
a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
+++ 
b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
@@ -41,7 +41,7 @@
 import cucumber.api.java8.En;
 
 public class PackStepdefs implements En {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final String ALPHA_REST_ADDRESS = "alpha.rest.address";
   private static final String CAR_SERVICE_ADDRESS = "car.service.address";
@@ -73,7 +73,7 @@ public PackStepdefs() {
     });
 
     Given("^Install the byteman script ([A-Za-z0-9_\\.]+) to ([A-Za-z]+) 
Service$", (String script, String service) -> {
-      log.info("Install the byteman script {} to {} service", script, service);
+      LOG.info("Install the byteman script {} to {} service", script, service);
       List<String> rules = new ArrayList<>();
       rules.add("target/test-classes/" + script);
       Submit bm = getBytemanSubmit(service);
@@ -81,7 +81,7 @@ public PackStepdefs() {
     });
 
     When("^User ([A-Za-z]+) requests to book ([0-9]+) cars and ([0-9]+) rooms 
(success|fail)$", (username, cars, rooms, result) -> {
-      log.info("Received request from user {} to book {} cars and {} rooms", 
username, cars, rooms);
+      LOG.info("Received request from user {} to book {} cars and {} rooms", 
username, cars, rooms);
 
       Response resp = given()
           .pathParam("name", username)
@@ -116,7 +116,7 @@ public PackStepdefs() {
 
   @After
   public void cleanUp() {
-    log.info("Cleaning up services");
+    LOG.info("Cleaning up services");
     for (String address : addresses) {
       given()
           .when()
@@ -135,7 +135,7 @@ public void cleanUp() {
       try {
         bm.deleteAllRules();
       } catch (Exception e) {
-        log.warn("Fail to delete the byteman rules " + e);
+        LOG.warn("Fail to delete the byteman rules " + e);
       }
     }
   }
@@ -156,7 +156,7 @@ private void dataMatches(String address, DataTable 
dataTable, Consumer<Map<Strin
       return;
     }
 
-    log.info("Retrieved data {} from service", actualMaps);
+    LOG.info("Retrieved data {} from service", actualMaps);
     dataTable.diff(DataTable.create(actualMaps));
   }
 
@@ -180,12 +180,12 @@ private void probe(String address) {
     if (isEmpty(infoURI)) {
       infoURI = "/info";
     }
-    log.info("The info service uri is " + infoURI);
+    LOG.info("The info service uri is " + infoURI);
     probe(address, infoURI);
   }
 
   private void probe(String address, String infoURI) {
-    log.info("Connecting to service address {}", address);
+    LOG.info("Connecting to service address {}", address);
     given()
         .when()
         .get(address + infoURI)
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 0f016d38..6c8f3708 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -76,7 +76,7 @@ private Command(long id,
     this.lastModified = new Date();
   }
 
-  private Command(long id,
+  public Command(long id,
       String serviceName,
       String instanceId,
       String globalTxId,
@@ -115,11 +115,11 @@ public String localTxId() {
     return localTxId;
   }
 
-  String parentTxId() {
+  public String parentTxId() {
     return parentTxId;
   }
 
-  String compensationMethod() {
+  public String compensationMethod() {
     return compensationMethod;
   }
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index d3fba313..31688709 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -31,7 +31,7 @@
 import org.slf4j.LoggerFactory;
 
 public class EventScanner implements Runnable {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final ScheduledExecutorService scheduler;
@@ -83,7 +83,7 @@ private void pollEvents() {
   private void findTimeoutEvents() {
     eventRepository.findTimeoutEvents()
         .forEach(event -> {
-          log.info("Found timeout event {}", event);
+          LOG.info("Found timeout event {}", event);
           timeoutRepository.save(txTimeoutOf(event));
         });
   }
@@ -93,9 +93,9 @@ private void updateTimeoutStatus() {
   }
 
   private void saveUncompensatedEventsToCommands() {
-    
eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId)
-        .ifPresent(event -> {
-          log.info("Found uncompensated event {}", event);
+    
eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, 
TxEndedEvent.name())
+        .forEach(event -> {
+          LOG.info("Found uncompensated event {}", event);
           nextEndedEventId = event.id();
           commandRepository.saveCompensationCommands(event.globalTxId());
         });
@@ -104,7 +104,7 @@ private void saveUncompensatedEventsToCommands() {
   private void updateCompensatedCommands() {
     
eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
         .ifPresent(event -> {
-          log.info("Found compensated event {}", event);
+          LOG.info("Found compensated event {}", event);
           nextCompensatedEventId = event.id();
           updateCompensationStatus(event);
         });
@@ -114,13 +114,13 @@ private void deleteDuplicateSagaEndedEvents() {
     try {
       eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
     } catch (Exception e) {
-      log.warn("Failed to delete duplicate event", e);
+      LOG.warn("Failed to delete duplicate event", e);
     }
   }
 
   private void updateCompensationStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
-    log.info("Transaction with globalTxId {} and localTxId {} was compensated",
+    LOG.info("Transaction with globalTxId {} and localTxId {} was compensated",
         event.globalTxId(),
         event.localTxId());
 
@@ -129,7 +129,7 @@ private void updateCompensationStatus(TxEvent event) {
 
   private void abortTimeoutEvents() {
     timeoutRepository.findFirstTimeout().forEach(timeout -> {
-      log.info("Found timeout event {} to abort", timeout);
+      LOG.info("Found timeout event {} to abort", timeout);
 
       eventRepository.save(toTxAbortedEvent(timeout));
 
@@ -152,7 +152,7 @@ private void markSagaEnded(TxEvent event) {
 
   private void markGlobalTxEnd(TxEvent event) {
     eventRepository.save(toSagaEndedEvent(event));
-    log.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
+    LOG.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
   }
 
   private TxEvent toTxAbortedEvent(TxTimeout timeout) {
@@ -182,7 +182,7 @@ private TxEvent toSagaEndedEvent(TxEvent event) {
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
-          log.info("Compensating transaction with globalTxId {} and localTxId 
{}",
+          LOG.info("Compensating transaction with globalTxId {} and localTxId 
{}",
               command.globalTxId(),
               command.localTxId());
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 3b27c148..9556d7ca 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -24,7 +24,7 @@
 import org.slf4j.LoggerFactory;
 
 public class PushBackOmegaCallback implements OmegaCallback {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final BlockingQueue<Runnable> pendingCompensations;
   private final OmegaCallback underlying;
@@ -45,11 +45,12 @@ public void compensate(TxEvent event) {
   }
 
   private void logError(TxEvent event, Exception e) {
-    log.error(
-        "Failed to compensate service [{}] instance [{}] with method [{}], 
global tx id [{}] and local tx id [{}]",
+    LOG.error(
+        "Failed to {} service [{}] instance [{}] with method [{}], global tx 
id [{}] and local tx id [{}]",
+        event.retries() == 0 ? "compensate" : "retry",
         event.serviceName(),
         event.instanceId(),
-        event.compensationMethod(),
+        event.retries() == 0 ? event.compensationMethod() : 
event.retryMethod(),
         event.globalTxId(),
         event.localTxId(),
         e);
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 968e5b78..9a7f82ba 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -23,14 +23,13 @@
 
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TxConsistentService {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventRepository eventRepository;
 
@@ -42,20 +41,11 @@ public TxConsistentService(TxEventRepository 
eventRepository) {
 
   public boolean handle(TxEvent event) {
     if (types.contains(event.type()) && isGlobalTxAborted(event)) {
-      log.info("Transaction event {} rejected, because its parent with 
globalTxId {} was already aborted", event.type(), event.globalTxId());
+      LOG.info("Transaction event {} rejected, because its parent with 
globalTxId {} was already aborted",
+          event.type(), event.globalTxId());
       return false;
     }
 
-    if (SagaEndedEvent.name().equals(event.type()) && 
!event.expiryTime().equals(new Date(TxEvent.MAX_TIMESTAMP))) {
-      // if we get the SagaEndedEvent and the expiryTime is not MAX_TIME, we 
need to check if it is timeout
-      if (eventRepository.findTimeoutEvents().stream()
-          .filter(txEvent -> txEvent.globalTxId().equals(event.globalTxId()))
-          .count() == 1) {
-        log.warn("Transaction {} is timeout and will be handled by the event 
scanner", event.globalTxId());
-        return false;
-      }
-    }
-
     eventRepository.save(event);
 
     return true;
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 76dfca75..17b059ce 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -47,6 +47,8 @@
   private String type;
   private String compensationMethod;
   private Date expiryTime;
+  private String retryMethod;
+  private int retries;
   private byte[] payloads;
 
   private TxEvent() {
@@ -63,6 +65,8 @@ public TxEvent(TxEvent event) {
         event.type,
         event.compensationMethod,
         event.expiryTime,
+        event.retryMethod,
+        event.retries,
         event.payloads);
   }
 
@@ -75,7 +79,8 @@ public TxEvent(
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this(serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, 0, payloads);
+    this(serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, 0, "", 0,
+        payloads);
   }
 
   public TxEvent(
@@ -87,9 +92,11 @@ public TxEvent(
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, timeout,
-        payloads);
+        retryMethod, retries, payloads);
   }
 
   public TxEvent(
@@ -102,9 +109,11 @@ public TxEvent(
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, 
parentTxId, type, compensationMethod,
-        timeout, payloads);
+        timeout, retryMethod, retries, payloads);
   }
 
   TxEvent(Long surrogateId,
@@ -117,10 +126,14 @@ public TxEvent(
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(surrogateId, serviceName, instanceId, creationTime, globalTxId, 
localTxId, parentTxId, type,
         compensationMethod,
         timeout == 0 ? new Date(MAX_TIMESTAMP) : new 
Date(creationTime.getTime() + SECONDS.toMillis(timeout)),
+        retryMethod,
+        retries,
         payloads);
   }
 
@@ -134,6 +147,8 @@ public TxEvent(
       String type,
       String compensationMethod,
       Date expiryTime,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this.surrogateId = surrogateId;
     this.serviceName = serviceName;
@@ -145,6 +160,8 @@ public TxEvent(
     this.type = type;
     this.compensationMethod = compensationMethod;
     this.expiryTime = expiryTime;
+    this.retryMethod = retryMethod;
+    this.retries = retries;
     this.payloads = payloads;
   }
 
@@ -192,6 +209,14 @@ public Date expiryTime() {
     return expiryTime;
   }
 
+  public String retryMethod() {
+    return retryMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
   @Override
   public String toString() {
     return "TxEvent{" +
@@ -204,7 +229,9 @@ public String toString() {
         ", parentTxId='" + parentTxId + '\'' +
         ", type='" + type + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", expiryTime='" + expiryTime + '\'' +
+        ", expiryTime=" + expiryTime +
+        ", retryMethod='" + retryMethod + '\'' +
+        ", retries=" + retries +
         '}';
   }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 9eceaddd..c481226e 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -97,7 +97,7 @@
    * @param id
    * @return
    */
-  Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id);
+  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String 
type);
 
   /**
    * Find a {@link TxEvent} which satisfies below requirements:
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index da360665..8faf0e88 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -73,8 +73,8 @@ public void save(TxEvent event) {
     }
 
     @Override
-    public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long 
id) {
-      return Optional.empty();
+    public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, 
String type) {
+      return emptyList();
     }
 
     @Override
@@ -132,6 +132,19 @@ public void skipTxStartedEvent_IfGlobalTxAlreadyFailed() {
     assertThat(events.size(), is(2));
   }
 
+  @Test
+  public void skipSagaEndedEvent_IfGlobalTxAlreadyFailed() {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(eventOf(SagaStartedEvent, localTxId1));
+    events.add(eventOf(TxAbortedEvent, localTxId1));
+
+    TxEvent event = eventOf(SagaEndedEvent, localTxId1);
+
+    consistentService.handle(event);
+
+    assertThat(events.size(), is(2));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, globalTxId, localTxId, 
parentTxId, eventType.name(), compensationMethod,
         payloads);
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 737fd117..53110bf5 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -31,12 +31,25 @@
 
 public interface CommandEntityRepository extends CrudRepository<Command, Long> 
{
 
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
+      + "SET c.status = :toStatus "
+      + "WHERE c.globalTxId = :globalTxId "
+      + "  AND c.localTxId = :localTxId "
+      + "  AND c.status = :fromStatus")
+  void updateStatusByGlobalTxIdAndLocalTxId(
+      @Param("fromStatus") String fromStatus,
+      @Param("toStatus") String toStatus,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
   @Transactional
   @Modifying(clearAutomatically = true)
   @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
       + "SET c.status = :status "
       + "WHERE c.globalTxId = :globalTxId "
-      + "AND c.localTxId = :localTxId")
+      + "  AND c.localTxId = :localTxId")
   void updateStatusByGlobalTxIdAndLocalTxId(
       @Param("status") String status,
       @Param("globalTxId") String globalTxId,
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 5a952818..a54fa66c 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -42,7 +42,7 @@ public void compensate(TxEvent event) {
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
         .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
-        .setCompensateMethod(event.compensationMethod())
+        .setCompensationMethod(event.compensationMethod())
         .setPayloads(ByteString.copyFrom(event.payloads()))
         .build();
     observer.onNext(command);
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index ee7e2e44..a3137b45 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -85,6 +85,8 @@ public void onTxEvent(GrpcTxEvent message, 
StreamObserver<GrpcAck> responseObser
         message.getType(),
         message.getCompensationMethod(),
         message.getTimeout(),
+        message.getRetryMethod(),
+        message.getRetries(),
         message.getPayloads().toByteArray()
     ));
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 086f88ec..4aa30d14 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -35,7 +35,7 @@
 import org.slf4j.LoggerFactory;
 
 public class SpringCommandRepository implements CommandRepository {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventEnvelopeRepository eventRepository;
   private final CommandEntityRepository commandRepository;
@@ -57,13 +57,13 @@ public void saveCompensationCommands(String globalTxId) {
     }
 
     for (Command command : commands.values()) {
-      log.info("Saving compensation command {}", command);
+      LOG.info("Saving compensation command {}", command);
       try {
         commandRepository.save(command);
       } catch (Exception e) {
-        log.warn("Failed to save some command {}", command);
+        LOG.warn("Failed to save some command {}", command);
       }
-      log.info("Saved compensation command {}", command);
+      LOG.info("Saved compensation command {}", command);
     }
   }
 
@@ -85,6 +85,7 @@ public void markCommandAsDone(String globalTxId, String 
localTxId) {
 
     commands.forEach(command ->
         commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
+            NEW.name(),
             PENDING.name(),
             command.globalTxId(),
             command.localTxId()));
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index cae64567..e48a7806 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,17 +17,14 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
+
 import java.util.List;
 import java.util.Optional;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.springframework.data.domain.PageRequest;
-import org.springframework.util.CollectionUtils;
-
-import javax.swing.text.html.Option;
-
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 
 class SpringTxEventRepository implements TxEventRepository {
   private static final PageRequest SINGLE_TX_EVENT_REQUEST = new 
PageRequest(0, 1);
@@ -63,12 +60,8 @@ public void save(TxEvent event) {
   }
 
   @Override
-  public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id) 
{
-    List<TxEvent> result = 
eventRepo.findFirstUncompensatedEventByIdGreaterThan(id, 
SINGLE_TX_EVENT_REQUEST);
-    if (CollectionUtils.isEmpty(result)) {
-      return Optional.empty();
-    }
-    return Optional.of(result.get(0));
+  public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, 
String type) {
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, 
SINGLE_TX_EVENT_REQUEST);
   }
 
   @Override
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
index 71951393..53b44431 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -31,7 +31,8 @@
 import org.springframework.data.domain.PageRequest;
 
 public class SpringTxTimeoutRepository implements TxTimeoutRepository {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final TxTimeoutEntityRepository timeoutRepo;
 
   SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) {
@@ -43,7 +44,7 @@ public void save(TxTimeout timeout) {
     try {
       timeoutRepo.save(timeout);
     } catch (Exception ignored) {
-      log.warn("Failed to save some timeout {}", timeout);
+      LOG.warn("Failed to save some timeout {}", timeout);
     }
   }
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 470caa57..6a8a2637 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -35,7 +35,17 @@
       + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
       + "  SELECT t1.globalTxId FROM TxEvent t1"
       + "  WHERE t1.globalTxId = t.globalTxId "
-      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
+      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) AND NOT EXISTS 
( "
+      + "  SELECT t3.globalTxId FROM TxEvent t3 "
+      + "  WHERE t3.globalTxId = t.globalTxId "
+      + "    AND t3.localTxId = t.localTxId "
+      + "    AND t3.surrogateId != t.surrogateId "
+      + "    AND t3.creationTime > t.creationTime) AND (("
+      + "SELECT MIN(t2.retries) FROM TxEvent t2 "
+      + "WHERE t2.globalTxId = t.globalTxId "
+      + "  AND t2.localTxId = t.localTxId "
+      + "  AND t2.type = 'TxStartedEvent') = 0 "
+      + "OR t.globalTxId = t.localTxId)")
   Optional<TxEvent> findFirstAbortedGlobalTxByType();
 
   @Query("SELECT t FROM TxEvent t "
@@ -56,9 +66,13 @@
 
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, 
"
-      + "t.type, t.compensationMethod, t.payloads"
+      + "t.type, t.compensationMethod, t.payloads "
       + ") FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = ?2")
+      + "WHERE t.globalTxId = ?1 AND t.type = ?2 "
+      + "  AND ( SELECT MIN(t1.retries) FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type IN ('TxStartedEvent', 'SagaStartedEvent') ) = 0 ")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String 
type);
 
   @Query("SELECT t FROM TxEvent t "
@@ -78,19 +92,25 @@
   List<TxEvent> 
findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?1 AND EXISTS ( "
-      + "  SELECT t1.globalTxId"
-      + "  FROM TxEvent t1 "
+      + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
-      + "  AND t1.type = 'TxAbortedEvent'"
-      + ") AND NOT EXISTS ( "
-      + "  SELECT t2.globalTxId"
-      + "  FROM TxEvent t2 "
-      + "  WHERE t2.globalTxId = t.globalTxId "
-      + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.type = 'TxCompensatedEvent') "
+      + "    AND t1.type = 'TxAbortedEvent' AND NOT EXISTS ( "
+      + "    SELECT t2.globalTxId FROM TxEvent t2 "
+      + "    WHERE t2.globalTxId = t1.globalTxId "
+      + "      AND t2.localTxId = t1.localTxId "
+      + "      AND t2.type = 'TxStartedEvent' "
+      + "      AND t2.creationTime > t1.creationTime)) AND NOT EXISTS ( "
+      + "  SELECT t3.globalTxId FROM TxEvent t3 "
+      + "  WHERE t3.globalTxId = t.globalTxId "
+      + "    AND t3.localTxId = t.localTxId "
+      + "    AND t3.type = 'TxCompensatedEvent') AND ( "
+      + "  SELECT MIN(t4.retries) FROM TxEvent t4 "
+      + "  WHERE t4.globalTxId = t.globalTxId "
+      + "    AND t4.localTxId = t.localTxId "
+      + "    AND t4.type = 'TxStartedEvent' ) = 0 "
       + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long surrogateId, 
Pageable pageable);
+  List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long 
surrogateId, Pageable pageable);
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long 
surrogateId);
 
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql 
b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index b0bc8d71..3806f6d1 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -41,6 +41,8 @@ CREATE TABLE IF NOT EXISTS Command (
   localTxId varchar(36) NOT NULL,
   parentTxId varchar(36) DEFAULT NULL,
   compensationMethod varchar(256) NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int NOT NULL DEFAULT 0,
   payloads varbinary(10240),
   status varchar(12),
   lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -49,6 +51,7 @@ CREATE TABLE IF NOT EXISTS Command (
   INDEX saga_commands_index (surrogateId, eventId, globalTxId, localTxId, 
status)
 ) DEFAULT CHARSET=utf8;
 
+
 CREATE TABLE IF NOT EXISTS TxTimeout (
   surrogateId bigint NOT NULL AUTO_INCREMENT,
   eventId bigint NOT NULL UNIQUE,
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql 
b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 41815eeb..4ecb1b48 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime timestamp(6) NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int NOT NULL DEFAULT 0,
   payloads bytea
 );
 
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 8b2672ce..10bb2dd0 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -92,6 +92,8 @@
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
   private final String compensationMethod = getClass().getCanonicalName();
+
+  private final String retryMethod = uniquify("retryMethod");
   private final String serviceName = uniquify("serviceName");
   private final String instanceId = uniquify("instanceId");
 
@@ -128,7 +130,9 @@
   private TxConsistentService consistentService;
 
   private static final Queue<GrpcCompensateCommand> receivedCommands = new 
ConcurrentLinkedQueue<>();
-  private final CompensateStreamObserver compensateResponseObserver = new 
CompensateStreamObserver(this::onCompensation);
+
+  private final CompensationStreamObserver compensateResponseObserver = new 
CompensationStreamObserver(
+      this::onCompensation);
 
   @AfterClass
   public static void tearDown() throws Exception {
@@ -205,7 +209,7 @@ public void closeStreamOfDisconnectedClientOnly() {
     await().atMost(1, SECONDS).until(() -> 
omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    CompensateStreamObserver anotherResponseObserver = new 
CompensateStreamObserver();
+    CompensationStreamObserver anotherResponseObserver = new 
CompensationStreamObserver();
     
TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, 
anotherResponseObserver);
 
     await().atMost(1, SECONDS).until(() -> 
omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
@@ -247,7 +251,7 @@ public void 
compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
     assertThat(command.getGlobalTxId(), is(globalTxId));
     assertThat(command.getLocalTxId(), is(localTxId));
     assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getCompensationMethod(), is(compensationMethod));
     assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
@@ -269,9 +273,9 @@ public void doNotCompensateDuplicateTxOnFailure() {
 
     assertThat(receivedCommands, contains(
         
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
-            .setCompensateMethod("method 
b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
+            .setCompensationMethod("method 
b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
         
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
-            .setCompensateMethod("method 
a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
+            .setCompensationMethod("method 
a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
     ));
   }
 
@@ -289,7 +293,7 @@ public void getCompensateCommandOnFailure() {
     assertThat(command.getGlobalTxId(), is(globalTxId));
     assertThat(command.getLocalTxId(), is(localTxId));
     assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getCompensationMethod(), is(compensationMethod));
     assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
@@ -301,7 +305,7 @@ public void compensateOnlyFailedGlobalTransaction() {
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    
TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new 
CompensateStreamObserver());
+    
TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new 
CompensationStreamObserver());
 
     TxEventServiceBlockingStub anotherBlockingStub = 
TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, 
UUID.randomUUID().toString()));
@@ -403,7 +407,7 @@ public void abortTimeoutSagaStartedEvent() {
   @Test
   public void abortTimeoutTxStartedEvent() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, 
globalTxId));
+    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, 
globalTxId, null));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, 
globalTxId, 1));
 
     await().atMost(2, SECONDS).until(() -> {
@@ -429,6 +433,26 @@ public void abortTimeoutTxStartedEvent() {
     });
   }
 
+  @Test
+  public void doNotCompensateRetryingEvents() throws InterruptedException {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 
1));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 
0));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.size(), is(4));
+    assertThat(events.get(0).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
+    assertThat(events.get(2).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(3).type(), is(TxEndedEvent.name()));
+
+    assertThat(receivedCommands.isEmpty(), is(true));
+  }
+
   private boolean waitTillTimeoutDone() {
     for (TxTimeout txTimeout : timeoutEntityRepository.findAll()) {
       if (txTimeout.status().equals(DONE.name())) {
@@ -443,8 +467,8 @@ private GrpcAck onCompensation(GrpcCompensateCommand 
command) {
         eventOf(TxCompensatedEvent,
             command.getLocalTxId(),
             command.getParentTxId(),
-            new byte[0],
-            command.getCompensateMethod()));
+            command.getPayloads().toByteArray(),
+            command.getCompensationMethod()));
   }
 
   private GrpcServiceConfig someServiceConfig() {
@@ -467,23 +491,35 @@ private TxEvent someTxAbortEvent(String serviceName, 
String instanceId) {
   }
 
   private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String 
localTxId, String parentTxId, int timeout) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), timeout);
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), timeout,
+        "", 0);
+  }
+
+  private GrpcTxEvent someGrpcEventWithRetry(EventType type, String 
retryMethod, int retries) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), compensationMethod, 0,
+        retryMethod, retries);
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type) {
-    return eventOf(type, localTxId, parentTxId, payload.getBytes(), 
getClass().getCanonicalName());
+    return someGrpcEvent(type, localTxId);
   }
 
-  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) {
+  private GrpcTxEvent someGrpcEvent(EventType type, String localTxId) {
     return someGrpcEvent(type, globalTxId, localTxId);
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String 
localTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), 0);
+    return someGrpcEvent(type, globalTxId, localTxId, parentTxId);
+  }
+
+  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String 
localTxId, String parentTxId) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), 0, "",
+        0);
   }
 
-  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String 
parentTxId, byte[] payloads, String compensationMethod) {
-    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod, 0);
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String 
parentTxId, byte[] payloads,
+      String compensationMethod) {
+    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod, 0, "", 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -492,7 +528,9 @@ private GrpcTxEvent eventOf(EventType eventType,
       String parentTxId,
       byte[] payloads,
       String compensationMethod,
-      int timeout) {
+      int timeout,
+      String retryMethod,
+      int retries) {
 
     return GrpcTxEvent.newBuilder()
         .setServiceName(serviceName)
@@ -504,19 +542,21 @@ private GrpcTxEvent eventOf(EventType eventType,
         .setType(eventType.name())
         .setCompensationMethod(compensationMethod)
         .setTimeout(timeout)
+        .setRetryMethod(retryMethod)
+        .setRetries(retries)
         .setPayloads(ByteString.copyFrom(payloads))
         .build();
   }
 
-  private static class CompensateStreamObserver implements 
StreamObserver<GrpcCompensateCommand> {
+  private static class CompensationStreamObserver implements 
StreamObserver<GrpcCompensateCommand> {
     private final Consumer<GrpcCompensateCommand> consumer;
     private boolean completed = false;
 
-    private CompensateStreamObserver() {
+    private CompensationStreamObserver() {
       this(command -> {});
     }
 
-    private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) 
{
+    private CompensationStreamObserver(Consumer<GrpcCompensateCommand> 
consumer) {
       this.consumer = consumer;
     }
 
diff --git a/alpha/alpha-server/src/test/resources/schema.sql 
b/alpha/alpha-server/src/test/resources/schema.sql
index a10a4e02..8d708990 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime TIMESTAMP NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int DEFAULT 0 NOT NULL,
   payloads varbinary(10240)
 );
 
diff --git 
a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
new file mode 100644
index 00000000..ad8ae3ae
--- /dev/null
+++ 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.integration.pack.tests;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.springframework.data.repository.CrudRepository;
+
+interface CommandEnvelopeRepository extends CrudRepository<Command, Long> {
+}
diff --git 
a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 2bdd5878..e497cec7 100644
--- 
a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -77,4 +77,12 @@ public GreetingController(GreetingService greetingService, 
RestTemplate restTemp
   ResponseEntity<String> goodNight(@RequestParam String name) {
     return ResponseEntity.ok("Good night, " + name);
   }
+
+  @SagaStart
+  @GetMapping("/open")
+  ResponseEntity<String> open(@RequestParam String name, @RequestParam int 
retries) {
+    String greetings = greetingService.greet(name);
+    String status = greetingService.open(name, retries);
+    return ResponseEntity.ok(greetings + "; " + status);
+  }
 }
diff --git 
a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
index 69a86f6d..554dc15a 100644
--- 
a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
+++ 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
@@ -27,6 +27,9 @@
 class GreetingService {
   private final Queue<String> compensated;
 
+  private final int MAX_COUNT = 3;
+  private int failedCount = 1;
+
   @Autowired
   GreetingService(Queue<String> compensated) {
     this.compensated = compensated;
@@ -59,8 +62,27 @@ String apologize(String name) {
     return appendMessage("My bad, please take the window instead, " + name);
   }
 
+  @Compensable(retries = MAX_COUNT, compensationMethod = "close")
+  String open(String name, int retries) {
+    if (failedCount < retries) {
+      failedCount += 1;
+      throw new IllegalStateException("You know when the zoo opens, " + name);
+    }
+    resetCount();
+    return "Welcome to visit the zoo, " + name;
+  }
+
+  String close(String name, int retries) {
+    resetCount();
+    return appendMessage("Sorry, the zoo has already closed, " + name);
+  }
+
   private String appendMessage(String message) {
     compensated.add(message);
     return message;
   }
+
+  void resetCount() {
+    this.failedCount = 1;
+  }
 }
diff --git 
a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index b3045e38..9199ff64 100644
--- 
a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ 
b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -56,14 +56,23 @@
   private OmegaContext omegaContext;
 
   @Autowired
-  private TxEventEnvelopeRepository repository;
+  private TxEventEnvelopeRepository eventRepo;
+
+  @Autowired
+  private CommandEnvelopeRepository commandRepo;
 
   @Autowired
   private Queue<String> compensatedMessages;
 
+  @Autowired
+  private GreetingService greetingService;
+
   @After
   public void tearDown() throws Exception {
-    repository.deleteAll();
+    eventRepo.deleteAll();
+    commandRepo.deleteAll();
+    compensatedMessages.clear();
+    greetingService.resetCount();
   }
 
   @Test(timeout = 5000)
@@ -75,11 +84,11 @@ public void updatesTxStateToAlpha() throws Exception {
     assertThat(entity.getStatusCode(), is(OK));
     assertThat(entity.getBody(), is("Greetings, mike; Bonjour, mike"));
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = 
repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = 
eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
 
     assertThat(events.size(), is(6));
 
@@ -136,13 +145,13 @@ public void compensatesFailedGlobalTransaction() throws 
Exception {
 
     assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
 
-    await().atMost(2, SECONDS).until(() -> repository.count() == 7);
+    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 7);
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = 
repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = 
eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
     assertThat(events.size(), is(7));
 
     TxEvent sagaStartedEvent = events.get(0);
@@ -184,11 +193,11 @@ public void updatesEmbeddedTxStateToAlpha() throws 
Exception {
     assertThat(entity.getStatusCode(), is(OK));
     assertThat(entity.getBody(), is("Good morning, Bonjour, mike"));
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = 
repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = 
eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
 
     assertThat(events.size(), is(6));
 
@@ -220,4 +229,69 @@ public void updatesEmbeddedTxStateToAlpha() throws 
Exception {
 
     assertThat(compensatedMessages.isEmpty(), is(true));
   }
+
+  @Test(timeout = 5000)
+  public void retrySubTransactionSuccess() {
+    ResponseEntity<String> entity = 
restTemplate.getForEntity("/open?name={name}&retries={retries}",
+        String.class,
+        "eric",
+        2);
+
+    assertThat(entity.getStatusCode(), is(OK));
+    assertThat(entity.getBody(), is("Greetings, eric; Welcome to visit the 
zoo, eric"));
+
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 8);
+
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEvent> events = 
eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    assertThat(events.size(), is(8));
+
+    assertThat(events.get(0).type(), is("SagaStartedEvent"));
+    assertThat(events.get(1).type(), is("TxStartedEvent"));
+    assertThat(events.get(2).type(), is("TxEndedEvent"));
+    assertThat(events.get(3).type(), is("TxStartedEvent"));
+    assertThat(events.get(4).type(), is("TxAbortedEvent"));
+    assertThat(events.get(5).type(), is("TxStartedEvent"));
+    assertThat(events.get(6).type(), is("TxEndedEvent"));
+    assertThat(events.get(7).type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages.isEmpty(), is(true));
+  }
+
+  @Test(timeout = 5000)
+  public void compensateWhenRetryReachesMaximum() throws InterruptedException {
+    // retries 3 times and then compensate
+    ResponseEntity<String> entity = 
restTemplate.getForEntity("/open?name={name}&retries={retries}",
+        String.class,
+        TRESPASSER,
+        5);
+
+    assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
+
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 11);
+
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEvent> events = 
eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    assertThat(events.size(), is(11));
+
+    assertThat(events.get(0).type(), is("SagaStartedEvent"));
+    assertThat(events.get(1).type(), is("TxStartedEvent"));
+    assertThat(events.get(2).type(), is("TxEndedEvent"));
+    assertThat(events.get(3).type(), is("TxStartedEvent"));
+    assertThat(events.get(4).type(), is("TxAbortedEvent"));
+    assertThat(events.get(5).type(), is("TxStartedEvent"));
+    assertThat(events.get(6).type(), is("TxAbortedEvent"));
+    assertThat(events.get(7).type(), is("TxStartedEvent"));
+    assertThat(events.get(8).type(), is("TxAbortedEvent"));
+    assertThat(events.get(9).type(), is("TxCompensatedEvent"));
+    assertThat(events.get(10).type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages, contains("Goodbye, " + TRESPASSER));
+  }
 }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 5712f576..b33eeb38 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -42,13 +42,13 @@
 import io.grpc.ManagedChannel;
 
 public class GrpcClientMessageSender implements MessageSender {
-
   private final String target;
   private final TxEventServiceStub asyncEventService;
 
   private final MessageSerializer serializer;
 
   private final TxEventServiceBlockingStub blockingEventService;
+
   private final GrpcCompensateStreamObserver compensateStreamObserver;
   private final GrpcServiceConfig serviceConfig;
 
@@ -65,7 +65,8 @@ public GrpcClientMessageSender(
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
 
-    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, 
errorHandlerFactory.apply(this), deserializer);
+    this.compensateStreamObserver =
+        new GrpcCompensateStreamObserver(handler, 
errorHandlerFactory.apply(this), deserializer);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), 
serviceConfig.instanceId());
   }
 
@@ -103,6 +104,8 @@ private GrpcTxEvent convertEvent(TxEvent event) {
         .setType(event.type().name())
         .setTimeout(event.timeout())
         .setCompensationMethod(event.compensationMethod())
+        .setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
+        .setRetries(event.retries())
         .setPayloads(payloads);
 
     return builder.build();
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 3cf46f86..9d9c3128 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -46,14 +46,14 @@
 
   @Override
   public void onNext(GrpcCompensateCommand command) {
-    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, 
compensate method: {}",
-        command.getGlobalTxId(), command.getLocalTxId(), 
command.getCompensateMethod());
+    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, 
compensation method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), 
command.getCompensationMethod());
 
     messageHandler.onReceive(
         command.getGlobalTxId(),
         command.getLocalTxId(),
         command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
-        command.getCompensateMethod(),
+        command.getCompensationMethod(),
         deserializer.deserialize(command.getPayloads().toByteArray()));
   }
 
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 9a78a620..afff8e73 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -48,7 +48,7 @@
 import io.grpc.ManagedChannelBuilder;
 
 public class LoadBalancedClusterMessageSender implements MessageSender {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
   private final Collection<ManagedChannel> channels;
 
@@ -104,7 +104,7 @@ public void onConnected() {
       try {
         sender.onConnected();
       } catch (Exception e) {
-        log.error("Failed connecting to alpha at {}", sender.target(), e);
+        LOG.error("Failed connecting to alpha at {}", sender.target(), e);
       }
     });
   }
@@ -115,7 +115,7 @@ public void onDisconnected() {
       try {
         sender.onDisconnected();
       } catch (Exception e) {
-        log.error("Failed disconnecting from alpha at {}", sender.target(), e);
+        LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
       }
     });
   }
@@ -140,7 +140,7 @@ public AlphaResponse send(TxEvent event) {
       } catch (OmegaException e) {
         throw e;
       } catch (Exception e) {
-        log.error("Retry sending event {} due to failure", event, e);
+        LOG.error("Retry sending event {} due to failure", event, e);
 
         // very large latency on exception
         senders.put(messageSender, Long.MAX_VALUE);
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
index f019d107..02571fde 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -26,7 +26,7 @@
 import org.slf4j.LoggerFactory;
 
 class PushBackReconnectRunnable implements Runnable {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final MessageSender messageSender;
   private final Map<MessageSender, Long> senders;
   private final BlockingQueue<Runnable> pendingTasks;
@@ -47,14 +47,14 @@
   @Override
   public void run() {
     try {
-      log.info("Retry connecting to alpha at {}", messageSender.target());
+      LOG.info("Retry connecting to alpha at {}", messageSender.target());
       messageSender.onDisconnected();
       messageSender.onConnected();
       senders.put(messageSender, 0L);
       connectedSenders.offer(messageSender);
-      log.info("Retry connecting to alpha at {} is successful", 
messageSender.target());
+      LOG.info("Retry connecting to alpha at {} is successful", 
messageSender.target());
     } catch (Exception e) {
-      log.error("Failed to reconnect to alpha at {}", messageSender.target(), 
e);
+      LOG.error("Failed to reconnect to alpha at {}", messageSender.target(), 
e);
       pendingTasks.offer(this);
     }
   }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 788cf968..e8b2f345 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -90,15 +90,15 @@
 
   private final List<String> compensated = new ArrayList<>();
 
-  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, 
compensationMethod, payloads) ->
-      compensated.add(globalTxId);
+  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, 
compensationMethod,
+      payloads) -> compensated.add(globalTxId);
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
   private final String compensationMethod = getClass().getCanonicalName();
   private final TxEvent event = new TxEvent(EventType.TxStartedEvent, 
globalTxId, localTxId, parentTxId,
-      compensationMethod, 0, "blah");
+      compensationMethod, 0, "", 0, "blah");
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -189,7 +189,7 @@ public void resetLatencyOnReconnection() throws Exception {
     await().atMost(3, SECONDS).until(() -> compensated.contains(globalTxId));
   }
 
-  @Test (timeout = 1000)
+  @Test(timeout = 1000)
   public void stopSendingOnInterruption() throws Exception {
     MessageSender underlying = Mockito.mock(MessageSender.class);
     doThrow(RuntimeException.class).when(underlying).send(event);
@@ -300,7 +300,7 @@ public void stopSendingWhenClusterIsDown() throws Exception 
{
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, 
parentTxId, "reject", 0, "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, 
parentTxId, "reject", 0, "", 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
@@ -335,6 +335,7 @@ private int killServerReceivedMessage() {
     private final Queue<String> connected;
     private final Queue<TxEvent> events;
     private final int delay;
+
     private StreamObserver<GrpcCompensateCommand> responseObserver;
 
     private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, 
int delay) {
@@ -357,7 +358,9 @@ public void onTxEvent(GrpcTxEvent request, 
StreamObserver<GrpcAck> responseObser
           request.getLocalTxId(),
           request.getParentTxId(),
           request.getCompensationMethod(),
-          0,
+          request.getTimeout(),
+          request.getRetryMethod(),
+          request.getRetries(),
           new String(request.getPayloads().toByteArray())));
 
       sleep();
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 95bda85f..38564150 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,8 @@
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
-  private final TxStartedEvent event = new TxStartedEvent(globalTxId, 
localTxId, null, "method x", 0);
+
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, 
localTxId, null, "method x", 0, null, 0);
 
   @Test
   public void sendEventWhenSenderIsAvailable() {
diff --git 
a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
 
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 067af920..cf16888a 100644
--- 
a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ 
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -28,6 +28,7 @@
 
 public class CompensationContext {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final Map<String, CompensationContextInternal> contexts = new 
ConcurrentHashMap<>();
 
   public void addCompensationContext(Method compensationMethod, Object target) 
{
@@ -35,7 +36,7 @@ public void addCompensationContext(Method compensationMethod, 
Object target) {
     contexts.put(compensationMethod.toString(), new 
CompensationContextInternal(target, compensationMethod));
   }
 
-  public void compensate(String globalTxId, String localTxId, String 
compensationMethod, Object... payloads) {
+  public void apply(String globalTxId, String localTxId, String 
compensationMethod, Object... payloads) {
     CompensationContextInternal contextInternal = 
contexts.get(compensationMethod);
 
     try {
@@ -43,7 +44,7 @@ public void compensate(String globalTxId, String localTxId, 
String compensationM
       LOG.info("Compensated transaction with global tx id [{}], local tx id 
[{}]", globalTxId, localTxId);
     } catch (IllegalAccessException | InvocationTargetException e) {
       LOG.error(
-          "Pre-checking for compensate method " + 
contextInternal.compensationMethod.toString()
+          "Pre-checking for compensation method " + 
contextInternal.compensationMethod.toString()
               + " was somehow skipped, did you forget to configure compensable 
method checking on service startup?",
           e);
     }
@@ -51,6 +52,7 @@ public void compensate(String globalTxId, String localTxId, 
String compensationM
 
   private static final class CompensationContextInternal {
     private final Object target;
+
     private final Method compensationMethod;
 
     private CompensationContextInternal(Object target, Method 
compensationMethod) {
diff --git 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 338751cd..7d7d45ff 100644
--- 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -26,6 +26,7 @@
 class CompensableAnnotationProcessor implements BeanPostProcessor {
 
   private final OmegaContext omegaContext;
+
   private final CompensationContext compensationContext;
 
   CompensableAnnotationProcessor(OmegaContext omegaContext, 
CompensationContext compensationContext) {
diff --git 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 6c0c3331..90d8b060 100644
--- 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -31,6 +31,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Object bean;
+
   private final CompensationContext compensationContext;
 
   CompensableMethodCheckingCallback(Object bean, CompensationContext 
compensationContext) {
@@ -47,9 +48,13 @@ public void doWith(Method method) throws 
IllegalArgumentException {
     String compensationMethod = 
method.getAnnotation(Compensable.class).compensationMethod();
 
     try {
-      Method signature = bean.getClass().getDeclaredMethod(compensationMethod, 
method.getParameterTypes());
-      compensationContext.addCompensationContext(signature, bean);
-      LOG.debug("Found compensation method [{}] in {}", compensationMethod, 
bean.getClass().getCanonicalName());
+      compensationContext.addCompensationContext(method, bean);
+
+      if (!compensationMethod.isEmpty()) {
+        Method signature = 
bean.getClass().getDeclaredMethod(compensationMethod, 
method.getParameterTypes());
+        compensationContext.addCompensationContext(signature, bean);
+        LOG.debug("Found compensation method [{}] in {}", compensationMethod, 
bean.getClass().getCanonicalName());
+      }
     } catch (NoSuchMethodException e) {
       throw new OmegaException(
           "No such compensation method [" + compensationMethod + "] found in " 
+ bean.getClass().getCanonicalName(),
diff --git 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 5358db5a..4fd41881 100644
--- 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -34,7 +34,7 @@
 public class TransactionAspectConfig {
 
   @Bean
-  MessageHandler messageHandler(MessageSender sender, CompensationContext 
context) {
+  MessageHandler messageHandler(MessageSender sender, CompensationContext 
context, OmegaContext omegaContext) {
     return new CompensationMessageHandler(sender, context);
   }
 
@@ -51,7 +51,8 @@ TransactionAspect transactionAspect(MessageSender sender, 
OmegaContext context)
   }
 
   @Bean
-  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext 
omegaContext, CompensationContext compensationContext) {
+  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext 
omegaContext,
+      CompensationContext compensationContext) {
     return new CompensableAnnotationProcessor(omegaContext, 
compensationContext);
   }
 }
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 2ec42a51..6505bfcb 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -24,10 +24,13 @@
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -106,12 +109,18 @@
 
   private String compensationMethod;
 
+  private String compensationMethod2;
+
+  private String retryMethod;
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(globalTxId);
+    retryMethod = TransactionalUserService.class.getDeclaredMethod("add", 
User.class, int.class).toString();
     compensationMethod = 
TransactionalUserService.class.getDeclaredMethod("delete", 
User.class).toString();
+    compensationMethod2 = 
TransactionalUserService.class.getDeclaredMethod("delete", User.class, 
int.class).toString();
   }
 
   @After
@@ -119,6 +128,7 @@ public void tearDown() throws Exception {
     messages.clear();
     userRepository.deleteAll();
     omegaContext.clear();
+    userService.resetCount();
   }
 
   @AfterClass
@@ -130,8 +140,9 @@ public void sendsUserToRemote_AroundTransaction() throws 
Exception {
     User user = userService.add(this.user);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, "", 0,
+                user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -151,8 +162,9 @@ public void sendsAbortEvent_OnSubTransactionFailure() 
throws Exception {
     }
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, illegalUser).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, "", 0,
+                illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -173,10 +185,11 @@ public void compensateOnTransactionException() throws 
Exception {
     assertThat(userRepository.findByUsername(anotherUser.username()), 
is(nullValue()));
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, anotherUser).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, "", 0,
+                anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()
@@ -185,6 +198,60 @@ public void compensateOnTransactionException() throws 
Exception {
     );
   }
 
+  @Test
+  public void retryTillSuccess() {
+    try {
+      userService.add(user, 1);
+    } catch (Exception e) {
+      fail("unexpected exception throw: " + e);
+    }
+
+    assertThat(messages.size(), is(4));
+
+    assertThat(messages.get(0),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod2, 0, retryMethod, 2, user, 1)
+            .toString()));
+
+    String abortedEvent = messages.get(1);
+    assertThat(abortedEvent, allOf(containsString("TxAbortedEvent"), 
containsString("Retry harder")));
+
+    assertThat(messages.get(2),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod2, 0, retryMethod, 1, user, 1)
+            .toString()));
+    assertThat(messages.get(3),
+        is(new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod2).toString()));
+
+    assertThat(userRepository.count(), is(1L));
+    userRepository.findAll().forEach(user -> assertThat(user, is(this.user)));
+  }
+
+  @Test
+  public void retryReachesMaximumThenThrowsException() {
+    try {
+      userService.add(user, 3);
+      expectFailing(IllegalStateException.class);
+    } catch (IllegalStateException e) {
+      assertThat(e.getMessage(), is("Retry harder"));
+    }
+
+    assertThat(messages.size(), is(4));
+    assertThat(messages.get(0),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod2, 0, retryMethod, 2, user, 3)
+            .toString()));
+
+    String abortedEvent1 = messages.get(1);
+    assertThat(abortedEvent1, allOf(containsString("TxAbortedEvent"), 
containsString("Retry harder")));
+
+    assertThat(messages.get(2),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod2, 0, retryMethod, 1, user, 3)
+            .toString()));
+
+    String abortedEvent2 = messages.get(3);
+    assertThat(abortedEvent2, allOf(containsString("TxAbortedEvent"), 
containsString("Retry harder")));
+
+    assertThat(userRepository.count(), is(0L));
+  }
+
   @Test
   public void passesOmegaContextThroughDifferentThreads() throws Exception {
     new Thread(() -> userService.add(user)).start();
@@ -195,10 +262,10 @@ public void passesOmegaContextThroughDifferentThreads() 
throws Exception {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, "", 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -214,10 +281,10 @@ public void passesOmegaContextInThreadPool() throws 
Exception {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, "", 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -236,8 +303,8 @@ public void passesOmegaContextThroughReactiveX() throws 
Exception {
     waitTillSavedUser(username);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -255,11 +322,10 @@ public void passesOmegaContextAmongActors() throws 
Exception {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
-
     actorSystem.terminate();
   }
 
@@ -295,7 +361,7 @@ public Receive createReceive() {
     private final List<String> messages = new ArrayList<>();
 
     @Bean
-    CompensationContext compensationContext() {
+    CompensationContext recoveryContext() {
       return new CompensationContext();
     }
 
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
index c98c6ea5..0618109f 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
@@ -17,21 +17,26 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
-
 @Component
 class TransactionalUserService {
   static final String ILLEGAL_USER = "Illegal User";
   private final UserRepository userRepository;
 
+  private int count = 0;
+
   @Autowired
   TransactionalUserService(UserRepository userRepository) {
     this.userRepository = userRepository;
   }
 
+  void resetCount() {
+    this.count = 0;
+  }
+
   @Compensable(compensationMethod = "delete")
   User add(User user) {
     if (ILLEGAL_USER.equals(user.username())) {
@@ -43,4 +48,19 @@ User add(User user) {
   void delete(User user) {
     userRepository.delete(user);
   }
+
+  @Compensable(retries = 2, compensationMethod = "delete")
+  User add(User user, int count) {
+    if (this.count < count) {
+      this.count += 1;
+      throw new IllegalStateException("Retry harder");
+    }
+    resetCount();
+    return userRepository.save(user);
+  }
+
+  void delete(User user, int count) {
+    resetCount();
+    userRepository.delete(user);
+  }
 }
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
index c5c3d84b..da9d4b2a 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
@@ -62,7 +62,7 @@ public boolean equals(Object o) {
       return false;
     }
     User user = (User) o;
-    return id == user.id &&
+    return id.equals(user.id) &&
         Objects.equals(username, user.username) &&
         Objects.equals(email, user.email);
   }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 53e51581..588d6604 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -24,20 +24,20 @@
   private final MessageSender sender;
 
   CompensableInterceptor(OmegaContext context, MessageSender sender) {
-    this.context = context;
     this.sender = sender;
+    this.context = context;
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
-    return sender.send(new TxStartedEvent(
-        context.globalTxId(), context.localTxId(), parentTxId, 
compensationMethod, timeout, message));
+  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message) {
+    return sender.send(new TxStartedEvent(context.globalTxId(), 
context.localTxId(), parentTxId, compensationMethod,
+        timeout, retriesMethod, retries, message));
   }
 
   @Override
   public void postIntercept(String parentTxId, String compensationMethod) {
     sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), 
parentTxId, compensationMethod));
-
   }
 
   @Override
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 46c1e9b3..fe2eea5f 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -21,6 +21,7 @@
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
+
   private final CompensationContext context;
 
   public CompensationMessageHandler(MessageSender sender, CompensationContext 
context) {
@@ -29,8 +30,9 @@ public CompensationMessageHandler(MessageSender sender, 
CompensationContext cont
   }
 
   @Override
-  public void onReceive(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Object... payloads) {
-    context.compensate(globalTxId, localTxId, compensationMethod, payloads);
+  public void onReceive(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod,
+      Object... payloads) {
+    context.apply(globalTxId, localTxId, compensationMethod, payloads);
     sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod));
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
new file mode 100644
index 00000000..08449813
--- /dev/null
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import javax.transaction.InvalidTransactionException;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DefaultRecovery is used to execute business logic once.
+ * The corresponding events will report to alpha server before and after the 
execution of business logic.
+ * If there are errors while executing the business logic, a TxAbortedEvent 
will be reported to alpha.
+ *
+ *                 pre                       post
+ *     request --------- 2.business logic --------- response
+ *                 \                          |
+ * 1.TxStartedEvent \                        | 3.TxEndedEvent
+ *                   \                      |
+ *                    ----------------------
+ *                            alpha
+ */
+public class DefaultRecovery implements RecoveryPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, 
CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable {
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+    LOG.debug("Intercepting compensable method {} with context {}", 
method.toString(), context);
+
+    String compensationSignature =
+        compensable.compensationMethod().isEmpty() ? "" : 
compensationMethodSignature(joinPoint, compensable, method);
+
+    String retrySignature = (retries != 0 || compensationSignature.isEmpty()) 
? method.toString() : "";
+
+    AlphaResponse response = interceptor.preIntercept(parentTxId, 
compensationSignature, compensable.timeout(),
+        retrySignature, retries, joinPoint.getArgs());
+    if (response.aborted()) {
+      String abortedLocalTxId = context.localTxId();
+      context.setLocalTxId(parentTxId);
+      throw new InvalidTransactionException("Abort sub transaction " + 
abortedLocalTxId +
+          " because global transaction " + context.globalTxId() + " has 
already aborted.");
+    }
+
+    try {
+      Object result = joinPoint.proceed();
+      interceptor.postIntercept(parentTxId, compensationSignature);
+
+      return result;
+    } catch (Throwable throwable) {
+      interceptor.onError(parentTxId, compensationSignature, throwable);
+      throw throwable;
+    }
+  }
+
+  String compensationMethodSignature(ProceedingJoinPoint joinPoint, 
Compensable compensable, Method method)
+      throws NoSuchMethodException {
+    return joinPoint.getTarget()
+        .getClass()
+        .getDeclaredMethod(compensable.compensationMethod(), 
method.getParameterTypes())
+        .toString();
+  }
+}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index bb2cca4b..285d5498 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, String retriesMethod,
+        int retries, Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,7 +34,8 @@ public void onError(String parentTxId, String 
compensationMethod, Throwable thro
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int 
timeout, Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int 
timeout, String retriesMethod,
+      int retries, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
new file mode 100644
index 00000000..d1a28c2e
--- /dev/null
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import javax.transaction.InvalidTransactionException;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ForwardRecovery is used to execute business logic with the given retries 
times.
+ * If retries is above 0, it will retry the given times at most.
+ * If retries == -1, it will retry forever until interrupted.
+ */
+public class ForwardRecovery extends DefaultRecovery {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // TODO: 2018/03/10 we do not support retry with timeout yet
+  @Override
+  public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, 
CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable {
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+    int remains = retries;
+    try {
+      while (true) {
+        try {
+          return super.apply(joinPoint, compensable, interceptor, context, 
parentTxId, remains);
+        } catch (Throwable throwable) {
+          if (throwable instanceof InvalidTransactionException) {
+            throw throwable;
+          }
+
+          remains = remains == -1 ? -1 : remains - 1;
+          if (remains == 0) {
+            LOG.error(
+                "Retried sub tx failed maximum times, global tx id: {}, local 
tx id: {}, method: {}, retried times: {}",
+                context.globalTxId(), context.localTxId(), method.toString(), 
retries);
+            throw throwable;
+          }
+
+          LOG.warn("Retrying sub tx failed, global tx id: {}, local tx id: {}, 
method: {}, remains: {}",
+              context.globalTxId(), context.localTxId(), method.toString(), 
remains);
+          Thread.sleep(compensable.retryDelayInMilliseconds());
+        }
+      }
+    } catch (InterruptedException e) {
+      String errorMessage = "Failed to handle tx because it is interrupted, 
global tx id: " + context.globalTxId()
+          + ", local tx id: " + context.localTxId() + ", method: " + 
method.toString();
+      LOG.error(errorMessage);
+      interceptor.onError(parentTxId, compensationMethodSignature(joinPoint, 
compensable, method), e);
+      throw new OmegaException(errorMessage);
+    }
+  }
+}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
new file mode 100644
index 00000000..bc1d4d8d
--- /dev/null
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+
+public interface RecoveryPolicy {
+  Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, 
CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable;
+}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
new file mode 100644
index 00000000..f59ac2b3
--- /dev/null
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+public class RecoveryPolicyFactory {
+  private static final RecoveryPolicy DEFAULT_RECOVERY = new DefaultRecovery();
+
+  private static final RecoveryPolicy FORWARD_RECOVERY = new ForwardRecovery();
+
+  /**
+   * If retries == 0, use the default recovery to execute only once.
+   * If retries > 0, it will use the forward recovery and retry the given 
times at most.
+   * If retries == -1, it will use the forward recovery and retry forever 
until interrupted.
+   */
+  static RecoveryPolicy getRecoveryPolicy(int retries) {
+    return retries != 0 ? FORWARD_RECOVERY : DEFAULT_RECOVERY;
+  }
+}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
index 8c70e3a1..2e28b5e1 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
@@ -20,8 +20,7 @@
 import org.apache.servicecomb.saga.common.EventType;
 
 public class SagaEndedEvent extends TxEvent {
-
-  public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
+  SagaEndedEvent(String globalTxId, String localTxId) {
+    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index b7afcf53..486f28cc 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,7 +32,8 @@
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message) {
     try {
       return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), 
omegaContext.localTxId(), timeout));
     } catch (OmegaException e) {
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 388f237d..8722deba 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,7 +47,7 @@ Object advise(ProceedingJoinPoint joinPoint, SagaStart 
sagaStart) throws Throwab
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), 
method.toString(), sagaStart.timeout());
+    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), 
method.toString(), sagaStart.timeout(), "", 0);
     LOG.debug("Initialized context {} before execution of method {}", context, 
method.toString());
 
     try {
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
index cb76a265..0e87a97d 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
@@ -22,6 +22,6 @@
 public class SagaStartedEvent extends TxEvent {
   public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
     // use "" instead of null as compensationMethod requires not null in sql
-    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", 
timeout);
+    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", 
timeout, "", 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 932b9901..f7a98ee7 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -20,8 +20,6 @@
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 
-import javax.transaction.InvalidTransactionException;
-
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
@@ -47,42 +45,17 @@ public TransactionAspect(MessageSender sender, OmegaContext 
context) {
   
@Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable
 * *(..)) && @annotation(compensable)")
   Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws 
Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-    LOG.debug("Intercepting compensable method {} with context {}", 
method.toString(), context);
-
-    String signature = compensationMethodSignature(joinPoint, compensable, 
method);
-
     String localTxId = context.localTxId();
     context.newLocalTxId();
-
-    AlphaResponse response = interceptor.preIntercept(localTxId, signature, 
compensable.timeout(), joinPoint.getArgs());
-    if (response.aborted()) {
-      String abortedLocalTxId = context.localTxId();
-      context.setLocalTxId(localTxId);
-      throw new InvalidTransactionException("Abort sub transaction " + 
abortedLocalTxId +
-          " because global transaction " + context.globalTxId() + " has 
already aborted.");
-    }
     LOG.debug("Updated context {} for compensable method {} ", context, 
method.toString());
 
+    int retries = compensable.retries();
+    RecoveryPolicy recoveryPolicy = 
RecoveryPolicyFactory.getRecoveryPolicy(retries);
     try {
-      Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, signature);
-
-      return result;
-    } catch (Throwable throwable) {
-      interceptor.onError(localTxId, signature, throwable);
-      throw throwable;
+      return recoveryPolicy.apply(joinPoint, compensable, interceptor, 
context, localTxId, retries);
     } finally {
       context.setLocalTxId(localTxId);
       LOG.debug("Restored context back to {}", context);
     }
   }
-
-  private String compensationMethodSignature(ProceedingJoinPoint joinPoint, 
Compensable compensable, Method method)
-      throws NoSuchMethodException {
-
-    return joinPoint.getTarget()
-        .getClass()
-        .getDeclaredMethod(compensable.compensationMethod(), 
method.getParameterTypes())
-        .toString();
-  }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index d6aa5333..f0bac541 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -24,7 +24,8 @@
 
 public class TxAbortedEvent extends TxEvent {
   public TxAbortedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Throwable throwable) {
-    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0, stackTrace(throwable));
+    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0, "", 0,
+        stackTrace(throwable));
   }
 
   private static String stackTrace(Throwable e) {
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index 8e288dfb..cd709e40 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@
 
 public class TxCompensatedEvent extends TxEvent {
   public TxCompensatedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod) {
-    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0);
+    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0, "", 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 8d6666a6..f702c438 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@
 
 public class TxEndedEvent extends TxEvent {
   public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod) {
-    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0);
+    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0, "", 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 34be420e..a158af19 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -32,16 +32,21 @@
   private final int timeout;
   private final Object[] payloads;
 
+  private final String retryMethod;
+  private final int retries;
+
   public TxEvent(EventType type, String globalTxId, String localTxId, String 
parentTxId, String compensationMethod,
-      int timeout, Object... payloads) {
+      int timeout, String retryMethod, int retries, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
+    this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
     this.compensationMethod = compensationMethod;
-    this.payloads = payloads;
-    this.globalTxId = globalTxId;
     this.timeout = timeout;
+    this.retryMethod = retryMethod;
+    this.retries = retries;
+    this.payloads = payloads;
   }
 
   public long timestamp() {
@@ -76,6 +81,14 @@ public int timeout() {
     return timeout;
   }
 
+  public String retryMethod() {
+    return retryMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
   @Override
   public String toString() {
     return type.name() + "{" +
@@ -83,7 +96,9 @@ public String toString() {
         ", localTxId='" + localTxId + '\'' +
         ", parentTxId='" + parentTxId + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", timeout='" + timeout + '\'' +
+        ", timeout=" + timeout +
+        ", retryMethod='" + retryMethod + '\'' +
+        ", retries=" + retries +
         ", payloads=" + Arrays.toString(payloads) +
         '}';
   }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 4732d952..5d2ae127 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,8 +21,9 @@
 
 public class TxStartedEvent extends TxEvent {
 
-  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
-      String compensationMethod, int timeout, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, timeout, payloads);
+  public TxStartedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod,
+      int timeout, String retryMethod, int retries, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, timeout, retryMethod,
+        retries, payloads);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index 11ba7c71..78c4b91b 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -36,8 +36,10 @@
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Compensable {
 
+  int retries() default 0;
+
   /**
-   * Compensation method name, should not be null.<br>
+   * Compensation method name.<br>
    * A compensation method should satisfy below requirements:
    * <ol>
    *   <li>has same parameter list as @Compensable method's</li>
@@ -48,7 +50,9 @@
    *
    * @return
    */
-  String compensationMethod();
+  String compensationMethod() default "";
+
+  int retryDelayInMilliseconds() default 0;
 
   /**
    * <code>@Compensable</code> method timeout, in seconds. <br>
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 0ef9d4dc..1e01ea59 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -25,6 +25,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -47,6 +48,8 @@
   };
 
   private final String message = uniquify("message");
+
+  private final String retryMethod = uniquify("retryMethod");
   private final String compensationMethod = getClass().getCanonicalName();
 
   @SuppressWarnings("unchecked")
@@ -62,13 +65,16 @@ public void setUp() throws Exception {
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(parentTxId, compensationMethod, 0, message);
+    int retries = new Random().nextInt();
+    interceptor.preIntercept(parentTxId, compensationMethod, 0, retryMethod, 
retries, message);
 
     TxEvent event = messages.get(0);
 
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.retries(), is(retries));
+    assertThat(event.retryMethod(), is(retryMethod));
     assertThat(event.type(), is(EventType.TxStartedEvent));
     assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index 0b33d4b8..d5d5de55 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -28,6 +28,7 @@
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.junit.Before;
 import org.junit.Test;
 
 public class CompensationMessageHandlerTest {
@@ -41,14 +42,21 @@
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
+
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
   private final CompensationContext context = mock(CompensationContext.class);
+
   private final CompensationMessageHandler handler = new 
CompensationMessageHandler(sender, context);
 
+  @Before
+  public void setUp() {
+    events.clear();
+  }
+
   @Test
-  public void sendsEventOnCompensationCompleted() throws Exception {
+  public void sendsCompensatedEventOnCompensationCompleted() {
     handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, 
payload);
 
     assertThat(events.size(), is(1));
@@ -61,6 +69,6 @@ public void sendsEventOnCompensationCompleted() throws 
Exception {
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(event.payloads().length, is(0));
 
-    verify(context).compensate(globalTxId, localTxId, compensationMethod, 
payload);
+    verify(context).apply(globalTxId, localTxId, compensationMethod, payload);
   }
 }
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
new file mode 100644
index 00000000..75062bc8
--- /dev/null
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import javax.transaction.InvalidTransactionException;
+
+import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultRecoveryTest {
+  private final List<TxEvent> messages = new ArrayList<>();
+
+  private final String globalTxId = UUID.randomUUID().toString();
+
+  private final String localTxId = UUID.randomUUID().toString();
+
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final String newLocalTxId = UUID.randomUUID().toString();
+
+  private final RuntimeException oops = new RuntimeException("oops");
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+  private final ProceedingJoinPoint joinPoint = 
mock(ProceedingJoinPoint.class);
+
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+  private final Compensable compensable = mock(Compensable.class);
+
+  private final MessageSender sender = e -> {
+    messages.add(e);
+    return new AlphaResponse(false);
+  };
+
+  private final CompensableInterceptor interceptor = new 
CompensableInterceptor(omegaContext, sender);
+
+  private final RecoveryPolicy recoveryPolicy = new DefaultRecovery();
+
+  @Before
+  public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
+    when(joinPoint.getSignature()).thenReturn(methodSignature);
+    when(joinPoint.getTarget()).thenReturn(this);
+
+    
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+    when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
+
+    omegaContext.setGlobalTxId(globalTxId);
+    omegaContext.setLocalTxId(localTxId);
+  }
+
+  @Test
+  public void recordEndedEventWhenSuccess() throws Throwable {
+    when(joinPoint.proceed()).thenReturn(null);
+    recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, 
parentTxId, 0);
+
+    assertThat(messages.size(), is(2));
+
+    TxEvent startedEvent = messages.get(0);
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod(), is(""));
+
+    TxEvent endedEvent = messages.get(1);
+    assertThat(endedEvent.globalTxId(), is(globalTxId));
+    assertThat(endedEvent.localTxId(), is(localTxId));
+    assertThat(endedEvent.parentTxId(), is(parentTxId));
+    assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
+  }
+
+  @Test
+  public void recordAbortedEventWhenFailed() throws Throwable {
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, 
parentTxId, 0);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e.getMessage(), is("oops"));
+    }
+
+    assertThat(messages.size(), is(2));
+
+    TxEvent startedEvent = messages.get(0);
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod(), is(""));
+
+    TxEvent abortedEvent = messages.get(1);
+    assertThat(abortedEvent.globalTxId(), is(globalTxId));
+    assertThat(abortedEvent.localTxId(), is(localTxId));
+    assertThat(abortedEvent.parentTxId(), is(parentTxId));
+    assertThat(abortedEvent.type(), is(EventType.TxAbortedEvent));
+  }
+
+  @Test
+  public void returnImmediatelyWhenReceivedRejectResponse() {
+    MessageSender sender = mock(MessageSender.class);
+    when(sender.send(any())).thenReturn(new AlphaResponse(true));
+
+    CompensableInterceptor interceptor = new 
CompensableInterceptor(omegaContext, sender);
+
+    try {
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, 
parentTxId, 0);
+      expectFailing(InvalidTransactionException.class);
+    } catch (InvalidTransactionException e) {
+      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+    } catch (Throwable throwable) {
+      fail("unexpected exception throw: " + throwable);
+    }
+
+    verify(sender, times(1)).send(any());
+  }
+
+  @Test
+  public void recordRetryMethodWhenRetriesIsSet() throws Throwable {
+    int retries = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
+    when(compensable.retries()).thenReturn(retries);
+
+    recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, 
parentTxId, retries);
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(retries));
+    assertThat(startedEvent.retryMethod(), 
is(this.getClass().getDeclaredMethod("doNothing").toString()));
+  }
+
+  private String doNothing() {
+    return "doNothing";
+  }
+}
\ No newline at end of file
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
new file mode 100644
index 00000000..76fe55a0
--- /dev/null
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import javax.transaction.InvalidTransactionException;
+
+import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ForwardRecoveryTest {
+  private final List<TxEvent> messages = new ArrayList<>();
+
+  private final String globalTxId = UUID.randomUUID().toString();
+
+  private final String localTxId = UUID.randomUUID().toString();
+
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final String newLocalTxId = UUID.randomUUID().toString();
+
+  private final RuntimeException oops = new RuntimeException("oops");
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+  private final ProceedingJoinPoint joinPoint = 
mock(ProceedingJoinPoint.class);
+
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+  private final Compensable compensable = mock(Compensable.class);
+
+  private final MessageSender sender = e -> {
+    messages.add(e);
+    return new AlphaResponse(false);
+  };
+
+  private final CompensableInterceptor interceptor = new 
CompensableInterceptor(omegaContext, sender);
+
+  private final RecoveryPolicy recoveryPolicy = new ForwardRecovery();
+
+  private volatile OmegaException exception;
+
+  @Before
+  public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
+    when(joinPoint.getSignature()).thenReturn(methodSignature);
+    when(joinPoint.getTarget()).thenReturn(this);
+
+    
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+    when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
+
+    omegaContext.setGlobalTxId(globalTxId);
+    omegaContext.setLocalTxId(localTxId);
+  }
+
+  @Test
+  public void forwardExceptionWhenGlobalTxAborted() {
+    MessageSender sender = mock(MessageSender.class);
+    when(sender.send(any())).thenReturn(new AlphaResponse(true));
+
+    CompensableInterceptor interceptor = new 
CompensableInterceptor(omegaContext, sender);
+
+    try {
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, 
parentTxId, 0);
+      expectFailing(InvalidTransactionException.class);
+    } catch (InvalidTransactionException e) {
+      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+    } catch (Throwable throwable) {
+      fail("unexpected exception throw: " + throwable);
+    }
+
+    verify(sender, times(1)).send(any());
+  }
+
+  @Test
+  public void throwExceptionWhenRetryReachesMaximum() throws Throwable {
+    when(compensable.retries()).thenReturn(2);
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, 
parentTxId, 2);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e.getMessage(), is("oops"));
+    }
+
+    assertThat(messages.size(), is(4));
+    assertThat(messages.get(0).type(), is(EventType.TxStartedEvent));
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+    assertThat(messages.get(2).type(), is(EventType.TxStartedEvent));
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+  }
+
+  @Test
+  public void keepRetryingTillInterrupted() throws Throwable {
+    when(compensable.retries()).thenReturn(-1);
+    when(compensable.retryDelayInMilliseconds()).thenReturn(1000);
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    Thread thread = new Thread(() -> {
+      try {
+        recoveryPolicy.apply(joinPoint, compensable, interceptor, 
omegaContext, parentTxId, -1);
+        expectFailing(OmegaException.class);
+      } catch (OmegaException e) {
+        exception = e;
+      } catch (Throwable throwable) {
+        fail("unexpected exception throw: " + throwable);
+      }
+    });
+    thread.start();
+
+    thread.interrupt();
+    thread.join();
+
+    assertThat(exception.getMessage().contains("Failed to handle tx because it 
is interrupted"), is(true));
+  }
+
+  private String doNothing() {
+    return "doNothing";
+  }
+}
\ No newline at end of file
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index cc84fc57..8c496f63 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public void setUp() throws Exception {
 
   @Test
   public void sendsSagaStartedEvent() {
-    sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+    sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
 
     TxEvent event = messages.get(0);
 
@@ -99,7 +99,7 @@ public void transformInterceptedException() {
     doThrow(exception).when(sender).send(any());
 
     try {
-      sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+      sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
       expectFailing(TransactionalException.class);
     } catch (TransactionalException e) {
       assertThat(e.getMessage(), is("exception"));
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 31d148fe..0aa9549f 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -20,18 +20,13 @@
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import javax.transaction.InvalidTransactionException;
-
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -40,13 +35,11 @@
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 public class TransactionAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-
   private final String newLocalTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = e -> {
@@ -71,6 +64,7 @@ public void setUp() throws Exception {
 
     
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
@@ -86,6 +80,8 @@ public void newLocalTxIdInCompensable() throws Throwable {
     assertThat(startedEvent.localTxId(), is(newLocalTxId));
     assertThat(startedEvent.parentTxId(), is(localTxId));
     assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod().isEmpty(), is(true));
 
     TxEvent endedEvent = messages.get(1);
 
@@ -123,20 +119,84 @@ public void restoreContextOnCompensableError() throws 
Throwable {
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
-    MessageSender sender = mock(MessageSender.class);
-    when(sender.send(any())).thenReturn(new AlphaResponse(true));
+  public void retryReachesMaximumAndForwardException() throws Throwable {
+    RuntimeException oops = new RuntimeException("oops");
+    when(joinPoint.proceed()).thenThrow(oops);
+    when(compensable.retries()).thenReturn(3);
 
-    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
     try {
       aspect.advise(joinPoint, compensable);
-      expectFailing(InvalidTransactionException.class);
-    } catch (InvalidTransactionException e) {
-      System.out.println(e.getMessage());
-      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e.getMessage(), is("oops"));
     }
 
-    verify(sender, times(1)).send(any());
+    assertThat(messages.size(), is(6));
+
+    TxEvent startedEvent1 = messages.get(0);
+    assertThat(startedEvent1.globalTxId(), is(globalTxId));
+    assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent1.parentTxId(), is(localTxId));
+    assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent1.retries(), is(3));
+    assertThat(startedEvent1.retryMethod(), 
is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent2 = messages.get(2);
+    assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent2.retries(), is(2));
+
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent3 = messages.get(4);
+    assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent3.retries(), is(1));
+
+    assertThat(messages.get(5).type(), is(EventType.TxAbortedEvent));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+  }
+
+  @Test
+  public void keepRetryingTillSuccess() throws Throwable {
+    RuntimeException oops = new RuntimeException("oops");
+    when(joinPoint.proceed()).thenThrow(oops).thenThrow(oops).thenReturn(null);
+    when(compensable.retries()).thenReturn(-1);
+
+    aspect.advise(joinPoint, compensable);
+
+    assertThat(messages.size(), is(6));
+
+    TxEvent startedEvent1 = messages.get(0);
+    assertThat(startedEvent1.globalTxId(), is(globalTxId));
+    assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent1.parentTxId(), is(localTxId));
+    assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent1.retries(), is(-1));
+    assertThat(startedEvent1.retryMethod(), 
is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent2 = messages.get(2);
+    assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent2.retries(), is(-1));
+
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent3 = messages.get(4);
+    assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent3.retries(), is(-1));
+
+    assertThat(messages.get(5).type(), is(EventType.TxEndedEvent));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
   private String doNothing() {
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto 
b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 3944eee9..d2c6f77c 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,9 +22,11 @@ option java_package = 
"org.apache.servicecomb.saga.pack.contract.grpc";
 option java_outer_classname = "TxEventProto";
 
 service TxEventService {
-  rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {}
+  rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {
+  }
   rpc OnTxEvent (GrpcTxEvent) returns (GrpcAck) {}
-  rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck){}
+  rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
+  }
 }
 
 message GrpcServiceConfig {
@@ -47,12 +49,15 @@ message GrpcTxEvent {
   string serviceName = 8;
   string instanceId = 9;
   int32 timeout = 10;
+  int32 retries = 11;
+  string retryMethod = 12;
 }
 
 message GrpcCompensateCommand {
   string globalTxId = 1;
   string localTxId = 2;
   string parentTxId = 3;
-  string compensateMethod = 4;
+  string compensationMethod = 4;
   bytes payloads = 5;
-}
\ No newline at end of file
+}
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [pack] retry sub-transaction on failure
> ---------------------------------------
>
>                 Key: SCB-224
>                 URL: https://issues.apache.org/jira/browse/SCB-224
>             Project: Apache ServiceComb
>          Issue Type: New Feature
>          Components: Saga
>            Reporter: Yin Xiang
>            Assignee: Eric Lee
>            Priority: Major
>             Fix For: saga-0.2.0
>
>
> as a user, i want to retry transaction in my service, so that it can always 
> be done eventually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to