This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 0ed1b152a4c30bcb6e89964462267e8fee3cc83e
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Jul 2 17:33:20 2019 +0800

    SCB-1321 Add GrpcSagaEventService for SagaActor Events
---
 alpha/alpha-server/pom.xml                         |  16 ++
 .../servicecomb/pack/alpha/server/AlphaConfig.java |  21 +++
 .../pack/alpha/server/fsm/GrpcOmegaCallback.java   |  50 +++++++
 .../alpha/server/fsm/GrpcSagaEventService.java     | 130 ++++++++++++++++
 .../src/main/resources/application.yaml            |   9 ++
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 134 +++++++++++++++++
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  | 159 ++++++++++++++++++++
 .../pack/alpha/server/fsm/OmegaEventSender.java    | 163 +++++++++++++++++++++
 alpha/alpha-server/src/test/resources/log4j2.xml   |  30 ++++
 pom.xml                                            |   5 +
 10 files changed, 717 insertions(+)

diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 7f40401..df2b47d 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -87,11 +87,21 @@
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>alpha-fsm</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
       <artifactId>alpha-spring-cloud-starter-consul</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
       <artifactId>alpha-spring-cloud-starter-zookeeper</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
@@ -159,6 +169,12 @@
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.springframework.boot</groupId>
+          <artifactId>spring-boot-starter-logging</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.hsqldb</groupId>
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index 211ee07..221bfe5 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,6 +29,7 @@ import javax.annotation.PreDestroy;
 
 import com.google.common.eventbus.EventBus;
 import org.apache.servicecomb.pack.alpha.core.*;
+import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
 import 
org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
 import org.apache.servicecomb.pack.alpha.server.tcc.service.TccEventScanner;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationEventPublisher;
@@ -146,6 +148,7 @@ public class AlphaConfig {
   }
 
   @Bean
+  @ConditionalOnProperty(name = "alpha.model.actor.enabled", havingValue = 
"false", matchIfMissing = true)
   ServerStartable serverStartable(GrpcServerConfig serverConfig, 
TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks, 
GrpcTccEventService grpcTccEventService,
       TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner 
tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus) throws 
IOException {
@@ -162,6 +165,24 @@ public class AlphaConfig {
     return bootstrap;
   }
 
+  @Bean
+  @ConditionalOnProperty(name= "alpha.model.actor.enabled", havingValue = 
"true")
+  ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks, 
GrpcTccEventService grpcTccEventService,
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner 
tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, 
@Qualifier("sagaEventBus") EventBus sagaEventBus) throws IOException {
+    ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
+        new GrpcSagaEventService(sagaEventBus, omegaCallbacks), 
grpcTccEventService);
+    new Thread(bootstrap::start).start();
+    tccPendingTaskRunner.start();
+    tccEventScanner.start();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      tccPendingTaskRunner.shutdown();
+      tccEventScanner.shutdown();
+    }));
+
+    return bootstrap;
+  }
+
   @PostConstruct
   void init() {
     new PendingTaskRunner(pendingCompensations, delay).run();
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
new file mode 100644
index 0000000..f5299da
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.core.TxEvent;
+import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+
+class GrpcOmegaCallback implements OmegaCallback {
+
+  private final StreamObserver<GrpcCompensateCommand> observer;
+
+  GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
+    this.observer = observer;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+        .setGlobalTxId(event.globalTxId())
+        .setLocalTxId(event.localTxId())
+        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
+        .setCompensationMethod(event.compensationMethod())
+        .setPayloads(ByteString.copyFrom(event.payloads()))
+        .build();
+    observer.onNext(command);
+  }
+
+  @Override
+  public void disconnect() {
+    observer.onCompleted();
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
new file mode 100644
index 0000000..2daac50
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import static java.util.Collections.emptyMap;
+
+import com.google.common.eventbus.EventBus;
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import kamon.annotation.Trace;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent;
+import 
org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcSagaEventService extends TxEventServiceImplBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final GrpcAck ALLOW = 
GrpcAck.newBuilder().setAborted(false).build();
+  private static final GrpcAck REJECT = 
GrpcAck.newBuilder().setAborted(true).build();
+
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+  private final EventBus sagaEventBus;
+
+  public GrpcSagaEventService(EventBus sagaEventBus,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+    this.sagaEventBus = sagaEventBus;
+    this.omegaCallbacks = omegaCallbacks;
+  }
+
+  @Override
+  @Trace("alphaConnected")
+  public void onConnected(
+      GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> 
responseObserver) {
+    omegaCallbacks
+        .computeIfAbsent(request.getServiceName(), key -> new 
ConcurrentHashMap<>())
+        .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+  }
+
+  // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback 
may not be registered on disconnected
+  @Override
+  @Trace("alphaDisconnected")
+  public void onDisconnected(GrpcServiceConfig request, 
StreamObserver<GrpcAck> responseObserver) {
+    OmegaCallback callback = 
omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap())
+        .remove(request.getInstanceId());
+
+    if (callback != null) {
+      callback.disconnect();
+    }
+
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  @Trace("onTransactionEvent")
+  public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> 
responseObserver) {
+    LOG.info("onText {}",message);
+    boolean ok = true;
+    BaseEvent event = null;
+    if (message.getType().equals(EventType.SagaStartedEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .timeout(message.getTimeout()).build();
+    } else if (message.getType().equals(EventType.SagaEndedEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent.builder()
+          .globalTxId(message.getGlobalTxId()).build();
+    } else if (message.getType().equals(EventType.SagaAbortedEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent.builder()
+          .globalTxId(message.getGlobalTxId()).build();
+    } else if (message.getType().equals(EventType.TxStartedEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent.builder()
+          .serviceName(message.getServiceName())
+          .instanceId(message.getInstanceId())
+          .creationTime(new Date())
+          .globalTxId(message.getGlobalTxId())
+          .localTxId(message.getLocalTxId())
+          .parentTxId(message.getParentTxId().isEmpty() ? null : 
message.getParentTxId())
+          .compensationMethod(message.getCompensationMethod())
+          .retryMethod(message.getRetryMethod())
+          .retries(message.getRetries())
+          .payloads(message.getPayloads().toByteArray()).build();
+    } else if (message.getType().equals(EventType.TxEndedEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .parentTxId(message.getParentTxId())
+          .localTxId(message.getLocalTxId()).build();
+    } else if (message.getType().equals(EventType.TxAbortedEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .parentTxId(message.getParentTxId())
+          .localTxId(message.getLocalTxId()).build();
+    } else if (message.getType().equals(EventType.TxCompensatedEvent.name())) {
+      event = 
org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .parentTxId(message.getParentTxId())
+          .localTxId(message.getLocalTxId()).build();
+    } else {
+      ok = false;
+    }
+    if (event != null) {
+      this.sagaEventBus.post(event);
+    }
+    responseObserver.onNext(ok ? ALLOW : REJECT);
+    responseObserver.onCompleted();
+  }
+}
diff --git a/alpha/alpha-server/src/main/resources/application.yaml 
b/alpha/alpha-server/src/main/resources/application.yaml
index 55283d3..01b03f4 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -90,3 +90,12 @@ spring:
     properties:
       eclipselink:
         ddl-generation: none
+
+---
+spring:
+  profiles: akka-persistence-mem
+akkaConfig:
+  akka.persistence.journal.plugin: akka.persistence.journal.inmem
+  akka.persistence.journal.leveldb.dir: target/example/journal
+  akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
+  akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
new file mode 100644
index 0000000..f98474d
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import akka.actor.ActorSystem;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
+import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
+    properties = {
+        "alpha.server.host=0.0.0.0",
+        "alpha.server.port=8090",
+        "alpha.event.pollingInterval=1",
+        "spring.main.allow-bean-definition-overriding=true",
+        "alpha.model.actor.enabled=true",
+        "spring.profiles.active=akka-persistence-mem"
+       })
+public class AlphaIntegrationFsmTest {
+  private static final OmegaEventSender omegaEventSender = 
OmegaEventSender.builder().build();
+  private static final int port = 8090;
+
+  @Autowired(required = false)
+  ActorSystem system;
+
+  @Autowired
+  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
+  @BeforeClass
+  public static void beforeClass() {
+    omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", 
port).usePlaintext().build());
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    omegaEventSender.shutdown();
+  }
+
+  @Before
+  public void before() {
+    omegaEventSender.setOmegaCallbacks(omegaCallbacks);
+    omegaEventSender.reset();
+  }
+
+  @After
+  public void after() {
+    omegaEventSender.onDisconnected();
+  }
+
+  @Test
+  public void successfulTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    omegaEventSender.onConnected();
+    
omegaEventSender.getOmegaEventSagaSimulator().sagaSuccessfulEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> 
omegaEventSender.getOmegaCallbacks() != null);
+    await().atMost(1, SECONDS).until(() -> 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId)
 != null);
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMMITTED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+  }
+
+  @Test
+  public void lastTxAbortedEventTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    
omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> 
omegaEventSender.getOmegaCallbacks() != null);
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+    
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED);
+  }
+}
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
new file mode 100644
index 0000000..6e3c3ce
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent;
+
+public class OmegaEventSagaSimulator {
+  private String serviceName;
+  private String instanceId;
+
+  public List<GrpcTxEvent> sagaSuccessfulEvents(String globalTxId, String 
localTxId_1,
+      String localTxId_2, String localTxId_3) {
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service 
c".getBytes(), "method c"));
+    sagaEvents.add(sagaEndedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> lastTxAbortedEvents(String globalTxId, String 
localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  private GrpcTxEvent sagaStartedEvent(String globalTxId) {
+    return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
+        null, new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent sagaEndedEvent(String globalTxId) {
+    return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
+        null, new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent sagaAbortedEvent(String globalTxId) {
+    return eventOf(EventType.SagaAbortedEvent, globalTxId, globalTxId,
+        null, new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent txStartedEvent(String globalTxId,
+      String localTxId, String parentTxId, byte[] payloads, String 
compensationMethod) {
+    return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
+        parentTxId, payloads, compensationMethod, 0, "",
+        0);
+  }
+
+  private GrpcTxEvent txEndedEvent(String globalTxId,
+      String localTxId, String parentTxId, byte[] payloads, String 
compensationMethod) {
+    return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
+        parentTxId, payloads, compensationMethod, 0, "",
+        0);
+  }
+
+  private GrpcTxEvent txAbortedEvent(String globalTxId,
+      String localTxId, String parentTxId, byte[] payloads, String 
compensationMethod) {
+    return eventOf(EventType.TxAbortedEvent, globalTxId, localTxId,
+        parentTxId, payloads, compensationMethod, 0, "",
+        0);
+  }
+
+  public GrpcTxEvent txCompensatedEvent(String globalTxId,
+      String localTxId, String parentTxId) {
+    return eventOf(EventType.TxCompensatedEvent, globalTxId, localTxId,
+        parentTxId,  new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent eventOf(EventType eventType,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      byte[] payloads,
+      String compensationMethod,
+      int timeout,
+      String retryMethod,
+      int retries) {
+
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setParentTxId(parentTxId == null ? "" : parentTxId)
+        .setType(eventType.name())
+        .setCompensationMethod(compensationMethod)
+        .setTimeout(timeout)
+        .setRetryMethod(retryMethod)
+        .setRetries(retries)
+        .setPayloads(ByteString.copyFrom(payloads))
+        .build();
+  }
+
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private String serviceName;
+    private String instanceId;
+
+    private Builder() {
+    }
+
+    public Builder serviceName(String serviceName) {
+      this.serviceName = serviceName;
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      this.instanceId = instanceId;
+      return this;
+    }
+
+    public OmegaEventSagaSimulator build() {
+      OmegaEventSagaSimulator omegaEventSagaSimulator = new 
OmegaEventSagaSimulator();
+      omegaEventSagaSimulator.serviceName = this.serviceName;
+      omegaEventSagaSimulator.instanceId = this.instanceId;
+      return omegaEventSagaSimulator;
+    }
+  }
+}
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
new file mode 100644
index 0000000..a6a5c3c
--- /dev/null
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc;
+import 
org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+import 
org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+
+public class OmegaEventSender {
+  GrpcServiceConfig serviceConfig;
+  protected ManagedChannel clientChannel;
+  private TxEventServiceStub asyncStub;
+  private TxEventServiceBlockingStub blockingStub;
+  private final Queue<GrpcCompensateCommand> receivedCommands = new 
ConcurrentLinkedQueue<>();
+  private final CompensationStreamObserver compensateResponseObserver = new 
CompensationStreamObserver(
+      this::onCompensation);
+  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+  private OmegaEventSagaSimulator omegaEventSagaSimulator;
+
+  private String serviceName;
+  private String instanceId;
+
+  public void configClient(ManagedChannel clientChannel){
+    this.clientChannel = clientChannel;
+    this.asyncStub = TxEventServiceGrpc.newStub(clientChannel);
+    this.blockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
+  }
+
+  public void shutdown(){
+    this.clientChannel.shutdown();
+    this.clientChannel = null;
+  }
+
+  public void onConnected(){
+    serviceConfig = GrpcServiceConfig.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .build();
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    omegaEventSagaSimulator = 
OmegaEventSagaSimulator.builder().serviceName(serviceName).instanceId(instanceId).build();
+
+  }
+
+  public void onDisconnected(){
+    blockingStub.onDisconnected(serviceConfig);
+  }
+
+  public void setOmegaCallbacks(
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+    this.omegaCallbacks = omegaCallbacks;
+  }
+
+  public Queue<GrpcCompensateCommand> getReceivedCommands() {
+    return receivedCommands;
+  }
+
+  public TxEventServiceBlockingStub getBlockingStub() {
+    return blockingStub;
+  }
+
+  public Map<String, OmegaCallback> getOmegaCallbacks(){
+    return omegaCallbacks.get(serviceName);
+  }
+
+  public void reset(){
+    receivedCommands.clear();
+  }
+
+  public OmegaEventSagaSimulator getOmegaEventSagaSimulator(){
+    return omegaEventSagaSimulator;
+  }
+
+  private class CompensationStreamObserver implements 
StreamObserver<GrpcCompensateCommand> {
+    private final Consumer<GrpcCompensateCommand> consumer;
+    private boolean completed = false;
+
+    private CompensationStreamObserver() {
+      this(command -> {});
+    }
+
+    private CompensationStreamObserver(Consumer<GrpcCompensateCommand> 
consumer) {
+      this.consumer = consumer;
+    }
+
+    @Override
+    public void onNext(GrpcCompensateCommand command) {
+      // intercept received command
+      consumer.accept(command);
+      receivedCommands.add(command);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+    }
+
+    @Override
+    public void onCompleted() {
+      completed = true;
+    }
+
+    boolean isCompleted() {
+      return completed;
+    }
+  }
+
+  private GrpcAck onCompensation(GrpcCompensateCommand command) {
+    return 
blockingStub.onTxEvent(omegaEventSagaSimulator.txCompensatedEvent(command.getGlobalTxId(),command.getLocalTxId(),command.getParentTxId()));
+  }
+
+  public static OmegaEventSender.Builder builder() {
+    return new OmegaEventSender.Builder();
+  }
+
+  public static final class Builder {
+
+    private String serviceName = uniquify("omega-serviceName");
+    private String instanceId = uniquify("omega-instanceId");
+
+    public OmegaEventSender.Builder serviceName(String serviceName) {
+      this.serviceName = serviceName;
+      return this;
+    }
+
+    public OmegaEventSender.Builder instanceId(String instanceId) {
+      this.instanceId = instanceId;
+      return this;
+    }
+
+    public OmegaEventSender build() {
+      OmegaEventSender omegaEventSender = new OmegaEventSender();
+      omegaEventSender.instanceId = this.instanceId;
+      omegaEventSender.serviceName = this.serviceName;
+      return omegaEventSender;
+    }
+  }
+}
diff --git a/alpha/alpha-server/src/test/resources/log4j2.xml 
b/alpha/alpha-server/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/alpha/alpha-server/src/test/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/pom.xml b/pom.xml
index 3339b22..3c4e7ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,6 +317,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.servicecomb.pack</groupId>
+        <artifactId>alpha-fsm</artifactId>
+        <version>0.5.0-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.servicecomb.pack</groupId>
         <artifactId>alpha-server</artifactId>
         <version>0.5.0-SNAPSHOT</version>
       </dependency>

Reply via email to