[ 
https://issues.apache.org/jira/browse/SCB-909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631160#comment-16631160
 ] 

ASF GitHub Bot commented on SCB-909:
------------------------------------

WillemJiang closed pull request #315: SCB-909 Add fail fast mechanism when all 
alpha is unreachable.
URL: https://github.com/apache/incubator-servicecomb-saga/pull/315
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index b8c1be2b..bc4bc593 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -35,6 +35,7 @@
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
@@ -99,7 +100,14 @@ TxConsistentService txConsistentService(
   }
 
   @Bean
-  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService) 
{
+  TccPendingTaskRunner tccPendingTaskRunner() {
+    return new TccPendingTaskRunner(delay);
+  }
+
+  @Bean
+  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, 
TccPendingTaskRunner tccPendingTaskRunner) {
+    tccPendingTaskRunner.start();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> 
tccPendingTaskRunner.shutdown()));
     return new GrpcTccEventService(tccTxEventService);
   }
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
index 64d624a2..475489dc 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
+import java.util.concurrent.BlockingQueue;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.springframework.stereotype.Component;
@@ -24,14 +25,30 @@
 @Component
 public class OmegaCallbackWrapper implements OmegaCallback {
 
+  private final BlockingQueue<Runnable> pendingTasks;
+
+  public OmegaCallbackWrapper(
+      TccPendingTaskRunner tccPendingTaskRunner) {
+    this.pendingTasks = tccPendingTaskRunner.getPendingTasks();
+  }
+
+
   @Override
   public void invoke(ParticipatedEvent event, TransactionStatus status) {
-    OmegaCallback omegaCallback = 
OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId());
+    OmegaCallback omegaCallback;
+    try {
+      omegaCallback = OmegaCallbacksRegistry.retrieve(event.getServiceName(), 
event.getInstanceId());
+    } catch (Exception e) {
+      pendingTasks.offer(() -> invoke(event, status));
+      throw e;
+    }
+
     try {
       omegaCallback.invoke(event, status);
-    } catch (Exception ex) {
+    } catch (Exception e) {
       OmegaCallbacksRegistry.remove(event.getServiceName(), 
event.getInstanceId());
-      throw ex;
+      pendingTasks.offer(() -> invoke(event, status));
+      throw e;
     }
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
index 48cbdbae..1a55d198 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
@@ -47,6 +47,7 @@
   @Override
   public boolean execute(GlobalTxEvent request) {
     boolean result = true;
+    // TODO if tcc end event was triggered by many times, we should ensure 
coordinated event won't be invoke again.
     List<TccTxEvent> events = 
tccTxEventRepository.findByGlobalTxIdAndTxType(request.getGlobalTxId(), 
TccTxType.PARTICIPATED).orElse(
         Lists.newArrayList());
     for (TccTxEvent event : events) {
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccPendingTaskRunner.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccPendingTaskRunner.java
new file mode 100644
index 00000000..b847f650
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccPendingTaskRunner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TccPendingTaskRunner {
+
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  private final BlockingQueue<Runnable> pendingTasks = new 
LinkedBlockingQueue<>();
+
+  private final int delay;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public TccPendingTaskRunner(int delay) {
+    this.delay = delay;
+  }
+
+  public void start() {
+    scheduler.scheduleWithFixedDelay(() -> {
+      try {
+        pendingTasks.take().run();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      }
+    }, 0, delay, MILLISECONDS);
+  }
+
+  public void shutdown() {
+    scheduler.shutdown();
+  }
+
+  public BlockingQueue<Runnable> getPendingTasks() {
+    return pendingTasks;
+  }
+}
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngineTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngineTest.java
new file mode 100644
index 00000000..62789023
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngineTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.stub.StreamObserver;
+import java.util.UUID;
+import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
+import 
org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {AlphaApplication.class},
+    properties = {
+        "alpha.server.host=0.0.0.0",
+        "alpha.server.port=8092",
+        "alpha.compensation.retry.delay=30"
+    })
+public class TccCallbackEngineTest {
+
+  @Autowired
+  private TccCallbackEngine tccCallbackEngine;
+
+  @Autowired
+  private TccTxEventService tccTxEventService;
+
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+  private final String confirmMethod = "confirm";
+  private final String cancelMethod = "cancel";
+
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
+
+  private final GrpcServiceConfig serviceConfig = 
GrpcServiceConfig.newBuilder()
+      .setServiceName(serviceName)
+      .setInstanceId(instanceId)
+      .build();
+
+  private final GrpcServiceConfig serviceConfig2 = 
GrpcServiceConfig.newBuilder()
+      .setServiceName(serviceName)
+      .setInstanceId(uniquify("instanceId"))
+      .build();
+
+  private ParticipatedEvent participatedEvent;
+  private GlobalTxEvent tccEndEvent;
+
+  @Before
+  public void init() {
+    participatedEvent = new ParticipatedEvent(serviceName, instanceId, 
globalTxId, localTxId,
+        parentTxId, confirmMethod, cancelMethod, 
TransactionStatus.Succeed.name());
+
+    tccEndEvent = new GlobalTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, TccTxType.ENDED.name(), 
TransactionStatus.Succeed.name());
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void sendCoordinateCommandAfterTccEnd() {
+    StreamObserver responseObserver = mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, responseObserver);
+
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+
+    verify(responseObserver).onNext(any());
+  }
+
+  @Test
+  public void sendCoordinateFailedForOmegaDown() throws InterruptedException {
+    StreamObserver responseObserver = mock(StreamObserver.class);
+    
doThrow(IllegalArgumentException.class).when(responseObserver).onNext(any());
+    OmegaCallbacksRegistry.register(serviceConfig, responseObserver);
+
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    boolean result = tccCallbackEngine.execute(tccEndEvent);
+    assertThat(result, is(false));
+
+    Thread.sleep(1000);
+    verify(responseObserver).onNext(any());
+
+    try {
+      OmegaCallbacksRegistry.retrieve(serviceName, instanceId);
+    } catch (Exception ex) {
+      assertThat(ex.getMessage().startsWith("No such omega callback found for 
service"), is(true));
+    }
+  }
+
+  @Test
+  public void doRetryCoordinateTillOmegaReceived() throws InterruptedException 
{
+    StreamObserver failedResponseObserver = mock(StreamObserver.class);
+    
doThrow(IllegalArgumentException.class).when(failedResponseObserver).onNext(any());
+    OmegaCallbacksRegistry.register(serviceConfig, failedResponseObserver);
+
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    boolean result = tccCallbackEngine.execute(tccEndEvent);
+    assertThat(result, is(false));
+
+    Thread.sleep(1000);
+
+    StreamObserver succeedResponseObserver = mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig2, succeedResponseObserver);
+
+    Thread.sleep(1000);
+    verify(failedResponseObserver).onNext(any());
+    verify(succeedResponseObserver).onNext(any());
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/GrpcOnErrorHandler.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/GrpcOnErrorHandler.java
index 7ab02a15..2a54f967 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/GrpcOnErrorHandler.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/GrpcOnErrorHandler.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 
@@ -32,10 +33,11 @@
 
   private final GrpcRetryContext grpcRetryContext;
 
-  public GrpcOnErrorHandler(BlockingQueue<Runnable> pendingTasks, 
Map<MessageSender, Long> senders) {
+  public GrpcOnErrorHandler(BlockingQueue<Runnable> pendingTasks,
+      Map<MessageSender, Long> senders, int timeoutSeconds) {
     this.pendingTasks = pendingTasks;
     this.senders = senders;
-    this.grpcRetryContext = new GrpcRetryContext();
+    this.grpcRetryContext = new GrpcRetryContext(timeoutSeconds);
   }
 
   public void handle(MessageSender messageSender) {
@@ -58,19 +60,29 @@ public GrpcRetryContext getGrpcRetryContext() {
 
   public static class GrpcRetryContext {
 
+    private final int timeoutSeconds;
+
     private final BlockingQueue<MessageSender> reconnectedSenders = new 
LinkedBlockingQueue<>();
 
     private final Supplier<MessageSender> defaultMessageSender = new 
Supplier<MessageSender>() {
       @Override
       public MessageSender get() {
         try {
-          return reconnectedSenders.take();
+          MessageSender messageSender = 
reconnectedSenders.poll(timeoutSeconds, TimeUnit.SECONDS);
+          if (null == messageSender) {
+            throw new OmegaException("Failed to get reconnected sender, all 
alpha server is down.");
+          }
+          return messageSender;
         } catch (InterruptedException e) {
           throw new OmegaException("Failed to get reconnected sender", e);
         }
       }
     };
 
+    public GrpcRetryContext(int timeoutSeconds) {
+      this.timeoutSeconds = timeoutSeconds;
+    }
+
     public BlockingQueue<MessageSender> getReconnectedSenders() {
       return reconnectedSenders;
     }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContext.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContext.java
index 2918be85..e2425fc0 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContext.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContext.java
@@ -33,11 +33,11 @@
   private final GrpcOnErrorHandler grpcOnErrorHandler;
 
   public LoadBalanceContext(Map<MessageSender, Long> senders,
-      Collection<ManagedChannel> channels, int reconnectDelay) {
+      Collection<ManagedChannel> channels, int reconnectDelay, int 
timeoutSeconds) {
     this.senders = senders;
     this.channels = channels;
     this.pendingTaskRunner = new PendingTaskRunner(reconnectDelay);
-    this.grpcOnErrorHandler = new 
GrpcOnErrorHandler(pendingTaskRunner.getPendingTasks(), senders);
+    this.grpcOnErrorHandler = new 
GrpcOnErrorHandler(pendingTaskRunner.getPendingTasks(), senders, 
timeoutSeconds);
     pendingTaskRunner.start();
   }
 
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContextBuilder.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContextBuilder.java
index 3f3d542a..2768135c 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContextBuilder.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContextBuilder.java
@@ -49,14 +49,17 @@
 
   private final int reconnectDelay;
 
+  private final int timeoutSeconds;
+
   private final TransactionType transactionType;
 
   public LoadBalanceContextBuilder(TransactionType transactionType,
-      AlphaClusterConfig clusterConfig, ServiceConfig serviceConfig, int 
reconnectDelay) {
+      AlphaClusterConfig clusterConfig, ServiceConfig serviceConfig, int 
reconnectDelay, int timeoutSeconds) {
     this.transactionType = transactionType;
     this.clusterConfig = clusterConfig;
     this.serviceConfig = serviceConfig;
     this.reconnectDelay = reconnectDelay;
+    this.timeoutSeconds = timeoutSeconds;
   }
 
   public LoadBalanceContext build() {
@@ -67,7 +70,7 @@ public LoadBalanceContext build() {
     Optional<SslContext> sslContext = buildSslContext(clusterConfig);
     Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
     Collection<ManagedChannel> channels = new 
ArrayList<>(clusterConfig.getAddresses().size());
-    LoadBalanceContext loadContext = new LoadBalanceContext(senders, channels, 
reconnectDelay);
+    LoadBalanceContext loadContext = new LoadBalanceContext(senders, channels, 
reconnectDelay, timeoutSeconds);
 
     for (String address : clusterConfig.getAddresses()) {
       ManagedChannel channel = buildChannel(address, sslContext);
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceContextBuilderTest.java
similarity index 98%
rename from 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
rename to 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceContextBuilderTest.java
index 6be785bb..9d8ef1dc 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceContextBuilderTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -66,9 +66,9 @@ public void setup() throws IOException {
     
when(clusterConfig.getAddresses()).thenReturn(Lists.newArrayList(addresses));
     when(clusterConfig.getTccMessageHandler()).thenReturn(tccMessageHandler);
     tccLoadBalanceContextBuilder =
-        new LoadBalanceContextBuilder(TransactionType.TCC, clusterConfig, 
serviceConfig, 30);
+        new LoadBalanceContextBuilder(TransactionType.TCC, clusterConfig, 
serviceConfig, 30, 10);
     sagaLoadBalanceContextBuilder =
-        new LoadBalanceContextBuilder(TransactionType.SAGA, clusterConfig, 
serviceConfig, 30);
+        new LoadBalanceContextBuilder(TransactionType.SAGA, clusterConfig, 
serviceConfig, 30,10);
   }
 
   @After
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceSenderTestBase.java
similarity index 96%
rename from 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
rename to 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceSenderTestBase.java
index a2691a7a..f7c81111 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceSenderTestBase.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import io.grpc.Server;
 import java.util.HashMap;
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/RetryableMessageSenderTest.java
similarity index 98%
rename from 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
rename to 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/RetryableMessageSenderTest.java
index 9f97eb62..6f297088 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/RetryableMessageSenderTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
similarity index 93%
rename from 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
rename to 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
index c9d0e02e..4b53e83e 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
@@ -38,16 +38,16 @@
 import java.util.Arrays;
 import java.util.concurrent.Callable;
 import javax.net.ssl.SSLException;
+import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
 import org.apache.servicecomb.saga.omega.connector.grpc.core.FastestSender;
 import 
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
 import 
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
 import org.apache.servicecomb.saga.omega.connector.grpc.core.TransactionType;
-import 
org.apache.servicecomb.saga.omega.connector.grpc.saga.SagaLoadBalanceSender;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class LoadBalanceClusterMessageSenderWithTLSTest extends 
LoadBalancedClusterMessageSenderTestBase {
+public class SagaLoadBalanceSenderWithTLSTest extends 
SagaLoadBalancedSenderTestBase {
 
   @Override
   protected SagaLoadBalanceSender newMessageSender(String[] addresses) {
@@ -67,7 +67,7 @@ protected SagaLoadBalanceSender newMessageSender(String[] 
addresses) {
     LoadBalanceContext loadContext = new LoadBalanceContextBuilder(
         TransactionType.SAGA,
         clusterConfig,
-        new ServiceConfig(serviceName), 100).build();
+        new ServiceConfig(serviceName), 100, 4).build();
 
     return new SagaLoadBalanceSender(loadContext, new FastestSender());
   }
@@ -101,7 +101,7 @@ private static void startServerOnPort(int port) {
   }
 
   private static SslContextBuilder getSslContextBuilder() {
-    ClassLoader classLoader = 
LoadBalanceClusterMessageSenderWithTLSTest.class.getClassLoader();
+    ClassLoader classLoader = 
SagaLoadBalanceSenderWithTLSTest.class.getClassLoader();
     SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(
         new File(classLoader.getResource("server.crt").getFile()),
         new File(classLoader.getResource("server.pem").getFile()))
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
similarity index 92%
rename from 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
rename to 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
index 1ffdb3b2..ce6b96e3 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.lang.Thread.State.WAITING;
+import static java.lang.Thread.State.TERMINATED;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
@@ -36,11 +36,11 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
 import org.apache.servicecomb.saga.omega.connector.grpc.core.FastestSender;
 import 
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
 import 
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
 import org.apache.servicecomb.saga.omega.connector.grpc.core.TransactionType;
-import 
org.apache.servicecomb.saga.omega.connector.grpc.saga.SagaLoadBalanceSender;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
@@ -52,7 +52,7 @@
 import org.junit.Test;
 import org.mockito.Mockito;
 
-public class LoadBalancedClusterMessageSenderTest extends 
LoadBalancedClusterMessageSenderTestBase {
+public class SagaLoadBalancedSenderTest extends SagaLoadBalancedSenderTestBase 
{
   @Override
   protected SagaLoadBalanceSender newMessageSender(String[] addresses) {
     AlphaClusterConfig clusterConfig = AlphaClusterConfig.builder()
@@ -67,7 +67,7 @@ protected SagaLoadBalanceSender newMessageSender(String[] 
addresses) {
     LoadBalanceContext loadContext = new LoadBalanceContextBuilder(
         TransactionType.SAGA,
         clusterConfig,
-        new ServiceConfig(serviceName), 100).build();
+        new ServiceConfig(serviceName), 100, 4).build();
 
     return new SagaLoadBalanceSender(loadContext, new FastestSender());
   }
@@ -157,7 +157,11 @@ public void stopSendingOnInterruption() throws Exception {
     Thread thread = new Thread(new Runnable() {
       @Override
       public void run() {
-        messageSender.send(event);
+        try {
+          messageSender.send(event);
+        } catch (Exception ex) {
+          assertThat(ex.getMessage().endsWith("Failed to get reconnected 
sender"), is(true));
+        }
       }
     });
     thread.start();
@@ -254,16 +258,20 @@ public void stopSendingWhenClusterIsDown() throws 
Exception {
     final Thread thread = new Thread(new Runnable() {
       @Override
       public void run() {
-        messageSender.send(event);
+        try {
+          messageSender.send(event);
+        } catch (OmegaException ex) {
+          assertThat(ex.getMessage().endsWith("all alpha server is down."), 
is(true));
+        }
       }
     });
     thread.start();
 
     // we don't want to keep sending on cluster down
-    await().atMost(2, SECONDS).until(new Callable<Boolean>() {
+    await().atMost(10, SECONDS).until(new Callable<Boolean>() {
       @Override
       public Boolean call() throws Exception {
-        return thread.isAlive() && thread.getState().equals(WAITING);
+        return thread.getState().equals(TERMINATED);
       }
     });
 
@@ -276,12 +284,14 @@ public Boolean call() throws Exception {
     startServerOnPort(8080);
     startServerOnPort(8090);
 
+   messageSender.send(event);
     await().atMost(2, SECONDS).until(new Callable<Boolean>() {
       @Override
       public Boolean call() throws Exception {
         return connected.get(8080).size() == 2 || connected.get(8090).size() 
== 2;
       }
     });
+
     await().atMost(2, SECONDS).until(new Callable<Boolean>() {
       @Override
       public Boolean call() throws Exception {
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTestBase.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
similarity index 98%
rename from 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTestBase.java
rename to 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
index 4e96bcd6..efefc156 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTestBase.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.junit.Assert.fail;
@@ -46,7 +46,7 @@
 import io.grpc.Server;
 import io.grpc.stub.StreamObserver;
 
-public abstract class LoadBalancedClusterMessageSenderTestBase {
+public abstract class SagaLoadBalancedSenderTestBase {
   protected static final int[] ports = {8080, 8090};
 
   protected static final Map<Integer, Server> servers = new HashMap<>();
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccClientMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSenderTest.java
similarity index 99%
rename from 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccClientMessageSenderTest.java
rename to 
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSenderTest.java
index 931e2759..0c425b0e 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccClientMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSenderTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.hamcrest.MatcherAssert.assertThat;
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
deleted file mode 100644
index 3adeee7d..00000000
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
-
-import org.junit.After;
-import org.junit.Before;
-
-public class SagaLoadBalanceSenderTest {
-
-  @Before
-  public void setup() {
-
-  }
-
-  @After
-  public void teardown() {
-
-  }
-
-}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
index ea7cebf5..569203d1 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
@@ -35,6 +35,7 @@
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.netty.NettyServerBuilder;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map;
@@ -43,6 +44,7 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import 
org.apache.servicecomb.saga.omega.connector.grpc.LoadBalanceSenderTestBase;
 import org.apache.servicecomb.saga.omega.connector.grpc.core.FastestSender;
 import 
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
 import 
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
@@ -126,7 +128,7 @@ public void setup() {
     when(clusterConfig.isEnableSSL()).thenReturn(false);
 
     loadContext =
-        new LoadBalanceContextBuilder(TransactionType.TCC, clusterConfig, 
serviceConfig, 30).build();
+        new LoadBalanceContextBuilder(TransactionType.TCC, clusterConfig, 
serviceConfig, 30, 4).build();
     tccLoadBalanceSender = new TccLoadBalanceSender(loadContext, new 
FastestSender());
     participatedEvent = new ParticipatedEvent(globalTxId, localTxId, 
parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed);
     tccStartedEvent = new TccStartedEvent(globalTxId, localTxId);
@@ -210,6 +212,31 @@ public Boolean call() {
     assertThat(eventsMap.get(8080).size(), is(3));
   }
 
+  @Test
+  public void failFastWhenAllServerWasDown() throws IOException {
+    tccLoadBalanceSender.onConnected();
+    await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+      @Override
+      public Boolean call() {
+        return connected.get(8080).size() == 1 && connected.get(8090).size() 
== 1;
+      }
+    });
+    assertThat((connected.get(8080).size() == 1 && connected.get(8090).size() 
== 1), is(true));
+
+    for (Server each : servers.values()) {
+      each.shutdownNow();
+    }
+
+    try {
+      tccLoadBalanceSender.participate(participatedEvent);
+    } catch (OmegaException ex) {
+      assertThat(ex.getMessage().endsWith("all alpha server is down."), 
is(true));
+    }
+    for (Integer each : ports) {
+      startServerOnPort(each);
+    }
+  }
+
   @Test(expected = OmegaException.class)
   public void participateFailedThenAbort() {
     TccMessageSender failedSender = mock(GrpcTccClientMessageSender.class);
@@ -284,5 +311,4 @@ public Boolean call() throws Exception {
     Assert.assertThat(connected.get(8080), contains("Connected " + 
serviceName, "Disconnected " + serviceName));
     Assert.assertThat(connected.get(8090), contains("Connected " + 
serviceName, "Disconnected " + serviceName));
   }
-
 }
diff --git 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index b1fe7e95..02593e15 100644
--- 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -108,12 +108,14 @@ AlphaClusterConfig alphaClusterConfig(
   LoadBalanceContext sagaLoadBalanceSenderContext(
       AlphaClusterConfig alphaClusterConfig,
       ServiceConfig serviceConfig,
-      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay) {
+      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay,
+      @Value("${omega.connection.sending.timeout:8}") int timeoutSeconds) {
     LoadBalanceContext loadBalanceSenderContext = new 
LoadBalanceContextBuilder(
         TransactionType.SAGA,
         alphaClusterConfig,
         serviceConfig,
-        reconnectDelay).build();
+        reconnectDelay,
+        timeoutSeconds).build();
     return loadBalanceSenderContext;
   }
 
@@ -135,12 +137,14 @@ public void run() {
   LoadBalanceContext loadBalanceSenderContext(
       AlphaClusterConfig alphaClusterConfig,
       ServiceConfig serviceConfig,
-      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay) {
+      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay,
+      @Value("${omega.connection.sending.timeout:8}") int timeoutSeconds) {
     LoadBalanceContext loadBalanceSenderContext = new 
LoadBalanceContextBuilder(
         TransactionType.TCC,
         alphaClusterConfig,
         serviceConfig,
-        reconnectDelay).build();
+        reconnectDelay,
+        timeoutSeconds).build();
     return loadBalanceSenderContext;
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add fault tolerance for service comb TCC
> ----------------------------------------
>
>                 Key: SCB-909
>                 URL: https://issues.apache.org/jira/browse/SCB-909
>             Project: Apache ServiceComb
>          Issue Type: New Feature
>          Components: Saga
>    Affects Versions: saga-0.3.0
>            Reporter: cherrylzhao
>            Assignee: cherrylzhao
>            Priority: Major
>             Fix For: saga-0.3.0
>
>
> TCC fault tolerance incude following point.
> # omega can switch to another available alpha when sending message failed.
> # if omega resend logic (different alpha) failed, omega can rollback local 
> data automatically.
> # alpha do resend logic (same omega) when ACK failed, if resend failed, dirty 
> data can left in database, this will be handled by scanner.
> # design transaction timeout mechanics, if scanner found event won't 
> completed within expected time, sending componsate command to omega, it has 
> different type compare with normal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to