[ 
https://issues.apache.org/jira/browse/SCB-909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644964#comment-16644964
 ] 

ASF GitHub Bot commented on SCB-909:
------------------------------------

WillemJiang closed pull request #318: SCB-909 Add alpha TCC event scanner for 
timeout and clean up completed globaltx
URL: https://github.com/apache/incubator-servicecomb-saga/pull/318
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index bc4bc593..ad39d62f 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -36,6 +36,7 @@
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
 import 
org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccEventScanner;
 import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
@@ -51,6 +52,9 @@
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
+  @Value("${alpha.tx.timeout-seconds:600}")
+  private int globalTxTimeoutSeconds;
+
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
     return new ConcurrentHashMap<>();
@@ -105,18 +109,30 @@ TccPendingTaskRunner tccPendingTaskRunner() {
   }
 
   @Bean
-  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, 
TccPendingTaskRunner tccPendingTaskRunner) {
-    tccPendingTaskRunner.start();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> 
tccPendingTaskRunner.shutdown()));
+  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService) 
{
     return new GrpcTccEventService(tccTxEventService);
   }
 
+  @Bean
+  TccEventScanner tccEventScanner(TccTxEventService tccTxEventService) {
+    return new TccEventScanner(tccTxEventService, delay, 
globalTxTimeoutSeconds);
+  }
+
   @Bean
   ServerStartable serverStartable(GrpcServerConfig serverConfig, 
TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks, 
GrpcTccEventService grpcTccEventService) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks, 
GrpcTccEventService grpcTccEventService,
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner 
tccEventScanner) {
     ServerStartable bootstrap = new GrpcStartable(serverConfig,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks), 
grpcTccEventService);
     new Thread(bootstrap::start).start();
+
+    tccPendingTaskRunner.start();
+    tccEventScanner.start();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      tccPendingTaskRunner.shutdown();
+      tccEventScanner.shutdown();
+    }));
+
     return bootstrap;
   }
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
index 1a55d198..dfd1e363 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
@@ -18,21 +18,15 @@
 package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
 import java.lang.invoke.MethodHandles;
-import java.util.List;
-
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
 import 
org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventRepository;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-
-import com.google.common.collect.Lists;
 @Component
 public class TccCallbackEngine implements CallbackEngine {
 
@@ -46,23 +40,18 @@
 
   @Override
   public boolean execute(GlobalTxEvent request) {
-    boolean result = true;
-    // TODO if tcc end event was triggered by many times, we should ensure 
coordinated event won't be invoke again.
-    List<TccTxEvent> events = 
tccTxEventRepository.findByGlobalTxIdAndTxType(request.getGlobalTxId(), 
TccTxType.PARTICIPATED).orElse(
-        Lists.newArrayList());
-    for (TccTxEvent event : events) {
-      ParticipatedEvent participatedEvent = 
EventConverter.convertToParticipatedEvent(event);
-      try {
-        // only invoke the event is succeed
-        if (event.getStatus().equals(TransactionStatus.Succeed.toString())) {
-          omegaCallbackWrapper.invoke(participatedEvent, 
TransactionStatus.valueOf(request.getStatus()));
-        }
-      } catch (Exception ex) {
-        logError(participatedEvent, ex);
-        result = false;
-      }
-    }
-    return result;
+    AtomicBoolean result = new AtomicBoolean(true);
+    
tccTxEventRepository.findParticipatedByGlobalTxId(request.getGlobalTxId()).ifPresent(e
 ->
+        e.stream().filter(d -> 
d.getStatus().equals(TransactionStatus.Succeed.name())).forEach(p -> {
+          try {
+            omegaCallbackWrapper.invoke(p, 
TransactionStatus.valueOf(request.getStatus()));
+          } catch (Exception ex) {
+            logError(p, ex);
+            result.set(false);
+          }
+        })
+    );
+    return result.get();
   }
 
   private void logError(ParticipatedEvent event, Exception ex) {
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
index 439e0f53..8c3023b1 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
@@ -17,8 +17,12 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.jpa;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
+import javax.transaction.Transactional;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
@@ -30,4 +34,17 @@
   @Query(value = "SELECT t FROM GlobalTxEvent AS t WHERE t.globalTxId = ?1 and 
t.localTxId = ?2 and t.txType = ?3")
   Optional<GlobalTxEvent> findByUniqueKey(String globalTxId, String localTxId, 
String txType);
 
+  @Query(value = "SELECT t FROM GlobalTxEvent AS t WHERE t.creationTime < ?1 
and t.txType = ?2 order by t.creationTime asc")
+  Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String 
txType, Pageable pageable);
+
+  @Query(value = "SELECT t.globalTxId from GlobalTxEvent as t GROUP BY 
t.globalTxId HAVING COUNT(t.globalTxId) = 2 "
+      + "AND NOT EXISTS (select 1 from  ParticipatedEvent as b where 
b.globalTxId = t.globalTxId)"
+  )
+  Optional<List<String>> findCompletedGlobalTx(Pageable pageable);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query(value = "DELETE FROM GlobalTxEvent as t where t.globalTxId in (?1)")
+  void deleteByGlobalId(String globalTxId);
+
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
index cce95faa..3bdd873d 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
@@ -19,6 +19,7 @@
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -26,15 +27,15 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Profile;
+import org.springframework.data.domain.Pageable;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -55,6 +56,10 @@ public void saveParticipatedEvent(ParticipatedEvent event) {
     save(EventConverter.convertToTccTxEvent(event));
   }
 
+  @Override
+  public void coordinated(TccTxEvent event) {
+  }
+
   @Override
   public void save(TccTxEvent event) {
     tccEventMap
@@ -72,6 +77,16 @@ public void save(TccTxEvent event) {
     }
   }
 
+  @Override
+  public Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String 
globalTxId) {
+    return Optional.of(
+        findByGlobalTxId(globalTxId)
+            .orElse(new ArrayList<>()).stream()
+        .filter(e -> TccTxType.PARTICIPATED.name().equals(e.getTxType()))
+        
.map(EventConverter::convertToParticipatedEvent).collect(Collectors.toList())
+    );
+  }
+
   @Override
   public Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String 
globalTxId, TccTxType tccTxType) {
     Set<TccTxEvent> events = tccEventMap.get(globalTxId);
@@ -94,11 +109,21 @@ public void save(TccTxEvent event) {
     }
   }
 
+  @Override
+  public Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, 
String txType, Pageable pageable) {
+    return Optional.empty();
+  }
+
+  @Override
+  public void clearCompletedGlobalTx(Pageable pageable) {
+
+  }
+
   @Override
   public Iterable<TccTxEvent> findAll() {
     List<TccTxEvent> events = new ArrayList<>();
-    for (String golableTxId : tccEventMap.keySet()) {
-      events.addAll(tccEventMap.get(golableTxId));
+    for (String globalTxId : tccEventMap.keySet()) {
+      events.addAll(tccEventMap.get(globalTxId));
     }
     return events;
   }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
index b9fe2a39..9e24a369 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
@@ -17,9 +17,10 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
-
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import 
org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEventRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
@@ -27,10 +28,11 @@
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEventDBRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Profile;
+import org.springframework.data.domain.Pageable;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 @Component
 @Profile("!memory")
@@ -46,6 +48,7 @@
   TccTxEventDBRepository tccTxEventDBRepository;
 
   @Override
+  @Transactional
   public void saveGlobalTxEvent(GlobalTxEvent event) {
     globalTxEventRepository.save(event);
     // saveTccEventHere
@@ -53,12 +56,22 @@ public void saveGlobalTxEvent(GlobalTxEvent event) {
   }
 
   @Override
+  @Transactional
   public void saveParticipatedEvent(ParticipatedEvent event) {
     participatedEventRepository.save(event);
     // saveTccEventHere
     tccTxEventDBRepository.save(EventConverter.convertToTccTxEvent(event));
   }
 
+  @Override
+  @Transactional
+  public void coordinated(TccTxEvent event) {
+    participatedEventRepository.findByUniqueKey(event.getGlobalTxId(), 
event.getLocalTxId()).ifPresent((e) -> {
+      participatedEventRepository.delete(e);
+      tccTxEventDBRepository.save(event);
+    });
+  }
+
   @Override
   public void save(TccTxEvent event) {
     tccTxEventDBRepository.save(event);
@@ -69,6 +82,11 @@ public void save(TccTxEvent event) {
     return tccTxEventDBRepository.findByGlobalTxId(globalTxId);
   }
 
+  @Override
+  public Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String 
globalTxId) {
+    return participatedEventRepository.findByGlobalTxId(globalTxId);
+  }
+
   @Override
   public Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String 
globalTxId, TccTxType tccTxType) {
     return tccTxEventDBRepository.findByGlobalTxIdAndTxType(globalTxId, 
tccTxType.name());
@@ -79,6 +97,17 @@ public void save(TccTxEvent event) {
     return tccTxEventDBRepository.findByUniqueKey(globalTxId, localTxId, 
tccTxType.name());
   }
 
+  @Override
+  public Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, 
String txType, Pageable pageable) {
+    return globalTxEventRepository.findTimeoutGlobalTx(deadLine, txType, 
pageable);
+  }
+
+  @Override
+  public void clearCompletedGlobalTx(Pageable pageable) {
+    globalTxEventRepository.findCompletedGlobalTx(pageable).ifPresent(e -> 
e.forEach(t ->
+        globalTxEventRepository.deleteByGlobalId(t)));
+    }
+
   @Override
   public Iterable<TccTxEvent> findAll() {
     return tccTxEventDBRepository.findAll();
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java
new file mode 100644
index 00000000..f5c930fe
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.service;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TccEventScanner {
+
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  private final TccTxEventService tccTxEventService;
+
+  private final int delay;
+
+  private final long globalTxTimeoutSeconds;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public TccEventScanner(TccTxEventService tccTxEventService, int delay, long 
globalTxTimeoutSeconds) {
+    this.tccTxEventService = tccTxEventService;
+    this.delay = delay;
+    this.globalTxTimeoutSeconds = globalTxTimeoutSeconds;
+  }
+
+  public void start() {
+    scheduler.scheduleWithFixedDelay(() -> {
+      tccTxEventService.handleTimeoutTx(new Date(System.currentTimeMillis() - 
SECONDS.toMillis(globalTxTimeoutSeconds)), 1);
+      tccTxEventService.clearCompletedGlobalTx(1);
+    }, 0, delay, MILLISECONDS);
+  }
+
+  public void shutdown() {
+    scheduler.shutdown();
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
index 3e7ba7ee..ebc19ce4 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
@@ -17,13 +17,14 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
-
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.springframework.data.domain.Pageable;
 
 public interface TccTxEventRepository {
 
@@ -31,14 +32,22 @@
 
   void saveParticipatedEvent(ParticipatedEvent event);
 
+  void coordinated(TccTxEvent event);
+
   void save(TccTxEvent event);
 
   Optional<List<TccTxEvent>> findByGlobalTxId(String globalTxId);
 
+  Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String 
globalTxId);
+
   Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, 
TccTxType tccTxType);
 
   Optional<TccTxEvent> findByUniqueKey(String globalTxId, String localTxId, 
TccTxType tccTxType);
 
+  Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String 
txType, Pageable pageable);
+
+  void clearCompletedGlobalTx(Pageable pageable);
+
   Iterable<TccTxEvent> findAll();
 
   void deleteAll();
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
index 9e7edf51..a109a826 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
@@ -18,14 +18,16 @@
 package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
 import java.lang.invoke.MethodHandles;
-
+import java.util.Date;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.data.domain.PageRequest;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -90,9 +92,8 @@ public boolean onTccEndedEvent(GlobalTxEvent globalTxEvent) {
           globalTxEvent.getGlobalTxId(), globalTxEvent.getLocalTxId(), 
globalTxEvent.getTxType(), ex);
       return false;
     }
-    // Just return the excution result back
+    // Just return the execution result back
     return tccCallbackEngine.execute(globalTxEvent);
-
   }
 
   public boolean onCoordinatedEvent(TccTxEvent tccTxEvent) {
@@ -101,7 +102,7 @@ public boolean onCoordinatedEvent(TccTxEvent tccTxEvent) {
         tccTxEvent.getGlobalTxId(), tccTxEvent.getLocalTxId(), 
tccTxEvent.getParentTxId(),
         tccTxEvent.getTxType(), tccTxEvent.getServiceName(), 
tccTxEvent.getInstanceId());
     try {
-      tccTxEventRepository.save(tccTxEvent);
+      tccTxEventRepository.coordinated(tccTxEvent);
     } catch (Exception ex) {
       LOG.warn("Add coordinatedEvent triggered exception, globalTxId:{}, 
localTxId:{} ",
           tccTxEvent.getGlobalTxId(), tccTxEvent.getLocalTxId(), ex);
@@ -110,4 +111,22 @@ public boolean onCoordinatedEvent(TccTxEvent tccTxEvent) {
     return true;
   }
 
-}
+  public void handleTimeoutTx(Date deadLine, int size) {
+    tccTxEventRepository.findTimeoutGlobalTx(deadLine, 
TccTxType.STARTED.name(), new PageRequest(0, size))
+        .ifPresent(e -> e.forEach(t -> {
+          GlobalTxEvent globalTxEvent = new GlobalTxEvent(
+              t.getServiceName(),
+              t.getInstanceId(),
+              t.getGlobalTxId(),
+              t.getLocalTxId(),
+              t.getParentTxId(),
+              TccTxType.END_TIMEOUT.name(),
+              TransactionStatus.Failed.name());
+          onTccEndedEvent(globalTxEvent);
+        }));
+  }
+
+  public void clearCompletedGlobalTx(int size) {
+    tccTxEventRepository.clearCompletedGlobalTx(new PageRequest(0, size));
+  }
+}
\ No newline at end of file
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/MemoryAlphaTccServerTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/MemoryAlphaTccServerTest.java
index 42bd0713..c7da54d1 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/MemoryAlphaTccServerTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/MemoryAlphaTccServerTest.java
@@ -18,36 +18,23 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.netty.NettyChannelBuilder;
-import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class},
+@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class},
     properties = {
         "alpha.server.host=0.0.0.0",
-        "alpha.server.port=8090"
+        "alpha.server.port=8190"
     })
 @ActiveProfiles("memory")
 public class MemoryAlphaTccServerTest extends AlphaTccServerTestBase {
 
   @BeforeClass
   public static void setupClientChannel() {
-    clientChannel = NettyChannelBuilder.forAddress("localhost", 
8090).usePlaintext().build();
+    clientChannel = NettyChannelBuilder.forAddress("localhost", 
8190).usePlaintext().build();
   }
-
-  /*@Autowired
-  @Qualifier("defaultTccTxEventFacade")
-  private TccTxEventFacade tccTxEventFacade;
-
-  @Override
-  public TccTxEventFacade getTccTxEventFacade() {
-    return tccTxEventFacade;
-  }*/
-  
 }
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/RdbAlphaTccServerTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/RdbAlphaTccServerTest.java
index 04244385..f4cd431f 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/RdbAlphaTccServerTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/RdbAlphaTccServerTest.java
@@ -18,17 +18,13 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.netty.NettyChannelBuilder;
-import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class},
+@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class},
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8091"
@@ -39,7 +35,4 @@
   public static void setupClientChannel() {
     clientChannel = NettyChannelBuilder.forAddress("localhost", 
8091).usePlaintext().build();
   }
-
-
-
 }
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
new file mode 100644
index 00000000..d24425cc
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Import;
+
+@SpringBootApplication(scanBasePackageClasses = GrpcTccEventService.class)
+@Import(GrpcServerConfig.class)
+public class TccApplication {
+  public static void main(String[] args) {
+    SpringApplication.run(TccApplication.class, args);
+  }
+}
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngineTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngineTest.java
index 62789023..1d085978 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngineTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngineTest.java
@@ -21,15 +21,12 @@
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyObject;
-import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import io.grpc.stub.StreamObserver;
 import java.util.UUID;
-import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
 import 
org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallbacksRegistry;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
@@ -47,7 +44,7 @@
 import org.springframework.test.context.junit4.SpringRunner;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class},
+@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class},
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8092",
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
new file mode 100644
index 00000000..fd934191
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
+import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
+import org.apache.servicecomb.saga.alpha.server.ServerStartable;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccEventScanner;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class TccConfiguration {
+
+  @Value("${alpha.compensation.retry.delay:3000}")
+  private int delay;
+
+  @Value("${alpha.tx.timeout-seconds:600}")
+  private int globalTxTimeoutSeconds;
+
+  @Bean
+  TccPendingTaskRunner tccPendingTaskRunner() {
+    return new TccPendingTaskRunner(delay);
+  }
+
+  @Bean
+  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService) 
{
+    return new GrpcTccEventService(tccTxEventService);
+  }
+
+  @Bean
+  TccEventScanner tccEventScanner(TccTxEventService tccTxEventService) {
+    return new TccEventScanner(tccTxEventService, delay, 
globalTxTimeoutSeconds);
+  }
+
+  @Bean
+  ServerStartable serverStartable(GrpcServerConfig serverConfig, 
GrpcTccEventService grpcTccEventService,
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner 
tccEventScanner) {
+    ServerStartable bootstrap = new GrpcStartable(serverConfig, 
grpcTccEventService);
+    new Thread(bootstrap::start).start();
+
+    tccPendingTaskRunner.start();
+    tccEventScanner.start();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      tccPendingTaskRunner.shutdown();
+      tccEventScanner.shutdown();
+    }));
+
+    return bootstrap;
+  }
+}
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
new file mode 100644
index 00000000..6993d799
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.service;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import org.apache.servicecomb.saga.alpha.server.tcc.TccApplication;
+import org.apache.servicecomb.saga.alpha.server.tcc.TccConfiguration;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEventRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEventRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class}, 
properties = {
+    "spring.jpa.show-sql=true"
+})
+public class TccTxEventServiceTest {
+
+  @Autowired
+  private TccTxEventService tccTxEventService;
+
+  @Autowired
+  private TccTxEventRepository tccTxEventRepository;
+
+  @Autowired
+  private GlobalTxEventRepository globalTxEventRepository;
+
+  @Autowired
+  private ParticipatedEventRepository participatedEventRepository;
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String confirmMethod = "confirm";
+  private final String cancelMethod = "cancel";
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
+
+  private final GrpcServiceConfig serviceConfig = 
GrpcServiceConfig.newBuilder()
+      .setServiceName(serviceName)
+      .setInstanceId(instanceId)
+      .build();
+
+  private GlobalTxEvent tccStartEvent;
+  private ParticipatedEvent participatedEvent;
+  private GlobalTxEvent tccEndEvent;
+  private TccTxEvent coordinateEvent;
+
+  @Before
+  public void setup() {
+    tccStartEvent = newGlobalTxEvent(TccTxType.STARTED, globalTxId, 
TransactionStatus.Succeed);
+    participatedEvent = newParticipateEvent(globalTxId, 
TransactionStatus.Succeed);
+    tccEndEvent = newGlobalTxEvent(TccTxType.ENDED, globalTxId, 
TransactionStatus.Succeed);
+    coordinateEvent = newTccTxEvent(TccTxType.COORDINATED, globalTxId, 
TransactionStatus.Succeed);
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void onlyCoordinateParticipatedEventOnce() {
+    StreamObserver<GrpcTccCoordinateCommand> observer = 
mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+    tccTxEventService.onCoordinatedEvent(coordinateEvent);
+
+    verify(observer).onNext(any());
+
+    // if end command was send by twice, coordinate should only be executed 
once.
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+    verify(observer).onNext(any());
+  }
+
+  @Test
+  public void handleTimeoutGlobalTraction() throws InterruptedException {
+    StreamObserver<GrpcTccCoordinateCommand> observer = 
mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+
+    Thread.sleep(3000l);
+    Date deadLine = new Date(System.currentTimeMillis() - SECONDS.toMillis(2));
+    tccTxEventService.handleTimeoutTx(deadLine, 1);
+
+    // global tx has timeout, so participated event will be coordinated 
through cancel.
+    Optional<GlobalTxEvent> timeoutEvent = 
globalTxEventRepository.findByUniqueKey(globalTxId, localTxId, 
TccTxType.END_TIMEOUT.name());
+    assertThat(timeoutEvent.isPresent(), is(true));
+    assertThat(timeoutEvent.get().getStatus(), 
is(TransactionStatus.Failed.name()));
+    assertThat(timeoutEvent.get().getTxType(), 
is(TccTxType.END_TIMEOUT.name()));
+    assertThat(timeoutEvent.get().getGlobalTxId(), is(globalTxId));
+    assertThat(timeoutEvent.get().getLocalTxId(), is(localTxId));
+    assertThat(timeoutEvent.get().getParentTxId(), is(parentTxId));
+    assertThat(timeoutEvent.get().getServiceName(), is(serviceName));
+    verify(observer).onNext(any());
+
+    Optional<List<TccTxEvent>> events = 
tccTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(events.get().size(), is(3));
+  }
+
+  @Test
+  public void clearUpCompletedTxFromGlobalTxTable() {
+    StreamObserver<GrpcTccCoordinateCommand> observer = 
mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+    tccTxEventService.onCoordinatedEvent(coordinateEvent);
+
+    tccTxEventService.clearCompletedGlobalTx(1);
+
+    
assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(),
 is(false));
+    
assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), 
is(false));
+
+    Optional<List<TccTxEvent>> events = 
tccTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(events.get().size(), is(4));
+  }
+
+  @Test
+  public void clearUpCompletedTxFromGlobalTxTableMoreThanOne() {
+    StreamObserver<GrpcTccCoordinateCommand> observer = 
mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+    // one global tx
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+    tccTxEventService.onCoordinatedEvent(coordinateEvent);
+
+    // another global tx
+    String globalTxId_2 = uniquify("globalTxId");
+    tccTxEventService.onTccStartedEvent(newGlobalTxEvent(TccTxType.STARTED, 
globalTxId_2, TransactionStatus.Succeed));
+    tccTxEventService.onParticipatedEvent(newParticipateEvent(globalTxId_2, 
TransactionStatus.Succeed));
+    tccTxEventService.onTccEndedEvent(newGlobalTxEvent(TccTxType.ENDED, 
globalTxId_2, TransactionStatus.Succeed));
+    tccTxEventService.onCoordinatedEvent(newTccTxEvent(TccTxType.COORDINATED, 
globalTxId_2, TransactionStatus.Succeed));
+
+    tccTxEventService.clearCompletedGlobalTx(2);
+
+    
assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(),
 is(false));
+    
assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), 
is(false));
+
+    Optional<List<TccTxEvent>> events = 
tccTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(events.get().size(), is(4));
+
+    events = tccTxEventRepository.findByGlobalTxId(globalTxId_2);
+    assertThat(events.get().size(), is(4));
+
+  }
+
+  private ParticipatedEvent newParticipateEvent(String globalTxId, 
TransactionStatus transactionStatus) {
+    return new ParticipatedEvent(serviceName, instanceId, globalTxId, 
localTxId,
+        parentTxId, confirmMethod, cancelMethod, transactionStatus.name());
+  }
+
+  private GlobalTxEvent newGlobalTxEvent(TccTxType tccTxType, String 
globalTxId, TransactionStatus transactionStatus) {
+    return new GlobalTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
+  }
+
+  private TccTxEvent newTccTxEvent(TccTxType tccTxType, String globalTxId, 
TransactionStatus transactionStatus) {
+    return new TccTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
+  }
+}
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTransactionTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTransactionTest.java
new file mode 100644
index 00000000..e1694f81
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTransactionTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.service;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.servicecomb.saga.alpha.server.tcc.TccApplication;
+import org.apache.servicecomb.saga.alpha.server.tcc.TccConfiguration;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEventRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEventRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEventDBRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class})
+public class TccTxEventServiceTransactionTest {
+
+  @Autowired
+  private TccTxEventService tccTxEventService;
+
+  @MockBean
+  private TccTxEventDBRepository tccTxEventDBRepository;
+
+  @Autowired
+  private ParticipatedEventRepository participatedEventRepository;
+
+  @Autowired
+  private GlobalTxEventRepository globalTxEventRepository;
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String confirmMethod = "confirm";
+  private final String cancelMethod = "cancel";
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
+
+  private GlobalTxEvent tccStartEvent;
+  private ParticipatedEvent participatedEvent;
+  private GlobalTxEvent tccEndEvent;
+  private TccTxEvent coordinateEvent;
+
+  @Before
+  public void setup() {
+    tccStartEvent = new GlobalTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, TccTxType.STARTED.name(), 
TransactionStatus.Succeed.name());
+
+    participatedEvent = new ParticipatedEvent(serviceName, instanceId, 
globalTxId, localTxId,
+        parentTxId, confirmMethod, cancelMethod, 
TransactionStatus.Succeed.name());
+
+    tccEndEvent = new GlobalTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, TccTxType.ENDED.name(), 
TransactionStatus.Succeed.name());
+
+    coordinateEvent = new TccTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, TccTxType.COORDINATED.name(), 
TransactionStatus.Succeed.name());
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void rollbackAfterSaveTccTxEventDbFailure() {
+    
doThrow(NullPointerException.class).when(tccTxEventDBRepository).save((TccTxEvent)
 any());
+
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    Optional<List<GlobalTxEvent>> startEvents = 
globalTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(startEvents.isPresent(), is(false));
+
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    Optional<List<ParticipatedEvent>> participates = 
participatedEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(participates.isPresent(), is(false));
+
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+    Optional<List<GlobalTxEvent>> endEvents = 
globalTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(endEvents.isPresent(), is(false));
+
+    participatedEventRepository.save(participatedEvent);
+    tccTxEventService.onCoordinatedEvent(coordinateEvent);
+    participates = participatedEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(participates.isPresent(), is(true));
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add fault tolerance for service comb TCC
> ----------------------------------------
>
>                 Key: SCB-909
>                 URL: https://issues.apache.org/jira/browse/SCB-909
>             Project: Apache ServiceComb
>          Issue Type: New Feature
>          Components: Saga
>    Affects Versions: saga-0.3.0
>            Reporter: cherrylzhao
>            Assignee: cherrylzhao
>            Priority: Major
>             Fix For: saga-0.3.0
>
>
> TCC fault tolerance incude following point.
> # omega can switch to another available alpha when sending message failed.
> # if omega resend logic (different alpha) failed, omega can rollback local 
> data automatically.
> # alpha do resend logic (same omega) when ACK failed, if resend failed, dirty 
> data can left in database, this will be handled by scanner.
> # design transaction timeout mechanics, if scanner found event won't 
> completed within expected time, sending componsate command to omega, it has 
> different type compare with normal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to