This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 0ed1b152a4c30bcb6e89964462267e8fee3cc83e Author: Lei Zhang <[email protected]> AuthorDate: Tue Jul 2 17:33:20 2019 +0800 SCB-1321 Add GrpcSagaEventService for SagaActor Events --- alpha/alpha-server/pom.xml | 16 ++ .../servicecomb/pack/alpha/server/AlphaConfig.java | 21 +++ .../pack/alpha/server/fsm/GrpcOmegaCallback.java | 50 +++++++ .../alpha/server/fsm/GrpcSagaEventService.java | 130 ++++++++++++++++ .../src/main/resources/application.yaml | 9 ++ .../alpha/server/fsm/AlphaIntegrationFsmTest.java | 134 +++++++++++++++++ .../alpha/server/fsm/OmegaEventSagaSimulator.java | 159 ++++++++++++++++++++ .../pack/alpha/server/fsm/OmegaEventSender.java | 163 +++++++++++++++++++++ alpha/alpha-server/src/test/resources/log4j2.xml | 30 ++++ pom.xml | 5 + 10 files changed, 717 insertions(+) diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml index 7f40401..df2b47d 100644 --- a/alpha/alpha-server/pom.xml +++ b/alpha/alpha-server/pom.xml @@ -87,11 +87,21 @@ </dependency> <dependency> <groupId>org.apache.servicecomb.pack</groupId> + <artifactId>alpha-fsm</artifactId> + </dependency> + <dependency> + <groupId>org.apache.servicecomb.pack</groupId> <artifactId>alpha-spring-cloud-starter-consul</artifactId> </dependency> <dependency> <groupId>org.apache.servicecomb.pack</groupId> <artifactId>alpha-spring-cloud-starter-zookeeper</artifactId> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.servicecomb.pack</groupId> @@ -159,6 +169,12 @@ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> + <exclusions> + <exclusion> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-logging</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.hsqldb</groupId> diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java index 211ee07..221bfe5 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java @@ -29,6 +29,7 @@ import javax.annotation.PreDestroy; import com.google.common.eventbus.EventBus; import org.apache.servicecomb.pack.alpha.core.*; +import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService; import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService; import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner; import org.apache.servicecomb.pack.alpha.server.tcc.service.TccEventScanner; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEventPublisher; @@ -146,6 +148,7 @@ public class AlphaConfig { } @Bean + @ConditionalOnProperty(name = "alpha.model.actor.enabled", havingValue = "false", matchIfMissing = true) ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService, Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService, TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus) throws IOException { @@ -162,6 +165,24 @@ public class AlphaConfig { return bootstrap; } + @Bean + @ConditionalOnProperty(name= "alpha.model.actor.enabled", havingValue = "true") + ServerStartable serverStartableMy(GrpcServerConfig serverConfig, + Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService, + TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, @Qualifier("sagaEventBus") EventBus sagaEventBus) throws IOException { + ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus, + new GrpcSagaEventService(sagaEventBus, omegaCallbacks), grpcTccEventService); + new Thread(bootstrap::start).start(); + tccPendingTaskRunner.start(); + tccEventScanner.start(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + tccPendingTaskRunner.shutdown(); + tccEventScanner.shutdown(); + })); + + return bootstrap; + } + @PostConstruct void init() { new PendingTaskRunner(pendingCompensations, delay).run(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java new file mode 100644 index 0000000..f5299da --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.server.fsm; + +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import org.apache.servicecomb.pack.alpha.core.OmegaCallback; +import org.apache.servicecomb.pack.alpha.core.TxEvent; +import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; + +class GrpcOmegaCallback implements OmegaCallback { + + private final StreamObserver<GrpcCompensateCommand> observer; + + GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) { + this.observer = observer; + } + + @Override + public void compensate(TxEvent event) { + GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder() + .setGlobalTxId(event.globalTxId()) + .setLocalTxId(event.localTxId()) + .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId()) + .setCompensationMethod(event.compensationMethod()) + .setPayloads(ByteString.copyFrom(event.payloads())) + .build(); + observer.onNext(command); + } + + @Override + public void disconnect() { + observer.onCompleted(); + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java new file mode 100644 index 0000000..2daac50 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.server.fsm; + +import static java.util.Collections.emptyMap; + +import com.google.common.eventbus.EventBus; +import io.grpc.stub.StreamObserver; +import java.lang.invoke.MethodHandles; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import kamon.annotation.Trace; +import org.apache.servicecomb.pack.alpha.core.OmegaCallback; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.common.EventType; +import org.apache.servicecomb.pack.contract.grpc.GrpcAck; +import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; +import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent; +import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcSagaEventService extends TxEventServiceImplBase { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build(); + private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build(); + + private final Map<String, Map<String, OmegaCallback>> omegaCallbacks; + private final EventBus sagaEventBus; + + public GrpcSagaEventService(EventBus sagaEventBus, + Map<String, Map<String, OmegaCallback>> omegaCallbacks) { + this.sagaEventBus = sagaEventBus; + this.omegaCallbacks = omegaCallbacks; + } + + @Override + @Trace("alphaConnected") + public void onConnected( + GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) { + omegaCallbacks + .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>()) + .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver)); + } + + // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected + @Override + @Trace("alphaDisconnected") + public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { + OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap()) + .remove(request.getInstanceId()); + + if (callback != null) { + callback.disconnect(); + } + + responseObserver.onNext(ALLOW); + responseObserver.onCompleted(); + } + + @Override + @Trace("onTransactionEvent") + public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) { + LOG.info("onText {}",message); + boolean ok = true; + BaseEvent event = null; + if (message.getType().equals(EventType.SagaStartedEvent.name())) { + event = org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent.builder() + .globalTxId(message.getGlobalTxId()) + .timeout(message.getTimeout()).build(); + } else if (message.getType().equals(EventType.SagaEndedEvent.name())) { + event = org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent.builder() + .globalTxId(message.getGlobalTxId()).build(); + } else if (message.getType().equals(EventType.SagaAbortedEvent.name())) { + event = org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent.builder() + .globalTxId(message.getGlobalTxId()).build(); + } else if (message.getType().equals(EventType.TxStartedEvent.name())) { + event = org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent.builder() + .serviceName(message.getServiceName()) + .instanceId(message.getInstanceId()) + .creationTime(new Date()) + .globalTxId(message.getGlobalTxId()) + .localTxId(message.getLocalTxId()) + .parentTxId(message.getParentTxId().isEmpty() ? null : message.getParentTxId()) + .compensationMethod(message.getCompensationMethod()) + .retryMethod(message.getRetryMethod()) + .retries(message.getRetries()) + .payloads(message.getPayloads().toByteArray()).build(); + } else if (message.getType().equals(EventType.TxEndedEvent.name())) { + event = org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent.builder() + .globalTxId(message.getGlobalTxId()) + .parentTxId(message.getParentTxId()) + .localTxId(message.getLocalTxId()).build(); + } else if (message.getType().equals(EventType.TxAbortedEvent.name())) { + event = org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent.builder() + .globalTxId(message.getGlobalTxId()) + .parentTxId(message.getParentTxId()) + .localTxId(message.getLocalTxId()).build(); + } else if (message.getType().equals(EventType.TxCompensatedEvent.name())) { + event = org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent.builder() + .globalTxId(message.getGlobalTxId()) + .parentTxId(message.getParentTxId()) + .localTxId(message.getLocalTxId()).build(); + } else { + ok = false; + } + if (event != null) { + this.sagaEventBus.post(event); + } + responseObserver.onNext(ok ? ALLOW : REJECT); + responseObserver.onCompleted(); + } +} diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml index 55283d3..01b03f4 100644 --- a/alpha/alpha-server/src/main/resources/application.yaml +++ b/alpha/alpha-server/src/main/resources/application.yaml @@ -90,3 +90,12 @@ spring: properties: eclipselink: ddl-generation: none + +--- +spring: + profiles: akka-persistence-mem +akkaConfig: + akka.persistence.journal.plugin: akka.persistence.journal.inmem + akka.persistence.journal.leveldb.dir: target/example/journal + akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local + akka.persistence.snapshot-store.local.dir: target/example/snapshots \ No newline at end of file diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java new file mode 100644 index 0000000..f98474d --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.server.fsm; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + +import akka.actor.ActorSystem; +import io.grpc.netty.NettyChannelBuilder; +import java.util.Map; +import java.util.UUID; +import org.apache.servicecomb.pack.alpha.core.OmegaCallback; +import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; +import org.apache.servicecomb.pack.alpha.fsm.TxState; +import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; +import org.apache.servicecomb.pack.alpha.server.AlphaApplication; +import org.apache.servicecomb.pack.alpha.server.AlphaConfig; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, + properties = { + "alpha.server.host=0.0.0.0", + "alpha.server.port=8090", + "alpha.event.pollingInterval=1", + "spring.main.allow-bean-definition-overriding=true", + "alpha.model.actor.enabled=true", + "spring.profiles.active=akka-persistence-mem" + }) +public class AlphaIntegrationFsmTest { + private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build(); + private static final int port = 8090; + + @Autowired(required = false) + ActorSystem system; + + @Autowired + private Map<String, Map<String, OmegaCallback>> omegaCallbacks; + + @BeforeClass + public static void beforeClass() { + omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build()); + } + + @AfterClass + public static void afterClass() throws Exception { + omegaEventSender.shutdown(); + } + + @Before + public void before() { + omegaEventSender.setOmegaCallbacks(omegaCallbacks); + omegaEventSender.reset(); + } + + @After + public void after() { + omegaEventSender.onDisconnected(); + } + + @Test + public void successfulTest() { + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + omegaEventSender.onConnected(); + omegaEventSender.getOmegaEventSagaSimulator().sagaSuccessfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + omegaEventSender.getBlockingStub().onTxEvent(event); + }); + await().atMost(1, SECONDS).until(() -> omegaEventSender.getOmegaCallbacks() != null); + await().atMost(1, SECONDS).until(() -> SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId) != null); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMMITTED); + Assert.assertEquals(sagaData.getTxEntityMap().size(),3); + Assert.assertTrue(sagaData.getBeginTime() > 0); + Assert.assertTrue(sagaData.getEndTime() > 0); + Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime()); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED); + } + + @Test + public void lastTxAbortedEventTest() { + omegaEventSender.onConnected(); + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + omegaEventSender.getBlockingStub().onTxEvent(event); + }); + await().atMost(1, SECONDS).until(() -> omegaEventSender.getOmegaCallbacks() != null); + await().atMost(1, SECONDS).until(() -> { + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); + Assert.assertEquals(sagaData.getTxEntityMap().size(),3); + Assert.assertTrue(sagaData.getBeginTime() > 0); + Assert.assertTrue(sagaData.getEndTime() > 0); + Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime()); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED); + } +} diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java new file mode 100644 index 0000000..6e3c3ce --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.server.fsm; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import org.apache.servicecomb.pack.common.EventType; +import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent; + +public class OmegaEventSagaSimulator { + private String serviceName; + private String instanceId; + + public List<GrpcTxEvent> sagaSuccessfulEvents(String globalTxId, String localTxId_1, + String localTxId_2, String localTxId_3) { + List<GrpcTxEvent> sagaEvents = new ArrayList<>(); + sagaEvents.add(sagaStartedEvent(globalTxId)); + sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); + sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); + sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); + sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); + sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); + sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); + sagaEvents.add(sagaEndedEvent(globalTxId)); + return sagaEvents; + } + + public List<GrpcTxEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ + List<GrpcTxEvent> sagaEvents = new ArrayList<>(); + sagaEvents.add(sagaStartedEvent(globalTxId)); + sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); + sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); + sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); + sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); + sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); + sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); + sagaEvents.add(sagaAbortedEvent(globalTxId)); + return sagaEvents; + } + + private GrpcTxEvent sagaStartedEvent(String globalTxId) { + return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId, + null, new byte[0], "", 0, "", + 0); + } + + private GrpcTxEvent sagaEndedEvent(String globalTxId) { + return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId, + null, new byte[0], "", 0, "", + 0); + } + + private GrpcTxEvent sagaAbortedEvent(String globalTxId) { + return eventOf(EventType.SagaAbortedEvent, globalTxId, globalTxId, + null, new byte[0], "", 0, "", + 0); + } + + private GrpcTxEvent txStartedEvent(String globalTxId, + String localTxId, String parentTxId, byte[] payloads, String compensationMethod) { + return eventOf(EventType.TxStartedEvent, globalTxId, localTxId, + parentTxId, payloads, compensationMethod, 0, "", + 0); + } + + private GrpcTxEvent txEndedEvent(String globalTxId, + String localTxId, String parentTxId, byte[] payloads, String compensationMethod) { + return eventOf(EventType.TxEndedEvent, globalTxId, localTxId, + parentTxId, payloads, compensationMethod, 0, "", + 0); + } + + private GrpcTxEvent txAbortedEvent(String globalTxId, + String localTxId, String parentTxId, byte[] payloads, String compensationMethod) { + return eventOf(EventType.TxAbortedEvent, globalTxId, localTxId, + parentTxId, payloads, compensationMethod, 0, "", + 0); + } + + public GrpcTxEvent txCompensatedEvent(String globalTxId, + String localTxId, String parentTxId) { + return eventOf(EventType.TxCompensatedEvent, globalTxId, localTxId, + parentTxId, new byte[0], "", 0, "", + 0); + } + + private GrpcTxEvent eventOf(EventType eventType, + String globalTxId, + String localTxId, + String parentTxId, + byte[] payloads, + String compensationMethod, + int timeout, + String retryMethod, + int retries) { + + return GrpcTxEvent.newBuilder() + .setServiceName(serviceName) + .setInstanceId(instanceId) + .setTimestamp(System.currentTimeMillis()) + .setGlobalTxId(globalTxId) + .setLocalTxId(localTxId) + .setParentTxId(parentTxId == null ? "" : parentTxId) + .setType(eventType.name()) + .setCompensationMethod(compensationMethod) + .setTimeout(timeout) + .setRetryMethod(retryMethod) + .setRetries(retries) + .setPayloads(ByteString.copyFrom(payloads)) + .build(); + } + + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private String serviceName; + private String instanceId; + + private Builder() { + } + + public Builder serviceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + public Builder instanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + + public OmegaEventSagaSimulator build() { + OmegaEventSagaSimulator omegaEventSagaSimulator = new OmegaEventSagaSimulator(); + omegaEventSagaSimulator.serviceName = this.serviceName; + omegaEventSagaSimulator.instanceId = this.instanceId; + return omegaEventSagaSimulator; + } + } +} diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java new file mode 100644 index 0000000..a6a5c3c --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.server.fsm; + +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; +import org.apache.servicecomb.pack.alpha.core.OmegaCallback; +import org.apache.servicecomb.pack.contract.grpc.GrpcAck; +import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; +import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc; +import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; +import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub; + +public class OmegaEventSender { + GrpcServiceConfig serviceConfig; + protected ManagedChannel clientChannel; + private TxEventServiceStub asyncStub; + private TxEventServiceBlockingStub blockingStub; + private final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>(); + private final CompensationStreamObserver compensateResponseObserver = new CompensationStreamObserver( + this::onCompensation); + private Map<String, Map<String, OmegaCallback>> omegaCallbacks; + private OmegaEventSagaSimulator omegaEventSagaSimulator; + + private String serviceName; + private String instanceId; + + public void configClient(ManagedChannel clientChannel){ + this.clientChannel = clientChannel; + this.asyncStub = TxEventServiceGrpc.newStub(clientChannel); + this.blockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel); + } + + public void shutdown(){ + this.clientChannel.shutdown(); + this.clientChannel = null; + } + + public void onConnected(){ + serviceConfig = GrpcServiceConfig.newBuilder() + .setServiceName(serviceName) + .setInstanceId(instanceId) + .build(); + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + omegaEventSagaSimulator = OmegaEventSagaSimulator.builder().serviceName(serviceName).instanceId(instanceId).build(); + + } + + public void onDisconnected(){ + blockingStub.onDisconnected(serviceConfig); + } + + public void setOmegaCallbacks( + Map<String, Map<String, OmegaCallback>> omegaCallbacks) { + this.omegaCallbacks = omegaCallbacks; + } + + public Queue<GrpcCompensateCommand> getReceivedCommands() { + return receivedCommands; + } + + public TxEventServiceBlockingStub getBlockingStub() { + return blockingStub; + } + + public Map<String, OmegaCallback> getOmegaCallbacks(){ + return omegaCallbacks.get(serviceName); + } + + public void reset(){ + receivedCommands.clear(); + } + + public OmegaEventSagaSimulator getOmegaEventSagaSimulator(){ + return omegaEventSagaSimulator; + } + + private class CompensationStreamObserver implements StreamObserver<GrpcCompensateCommand> { + private final Consumer<GrpcCompensateCommand> consumer; + private boolean completed = false; + + private CompensationStreamObserver() { + this(command -> {}); + } + + private CompensationStreamObserver(Consumer<GrpcCompensateCommand> consumer) { + this.consumer = consumer; + } + + @Override + public void onNext(GrpcCompensateCommand command) { + // intercept received command + consumer.accept(command); + receivedCommands.add(command); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + completed = true; + } + + boolean isCompleted() { + return completed; + } + } + + private GrpcAck onCompensation(GrpcCompensateCommand command) { + return blockingStub.onTxEvent(omegaEventSagaSimulator.txCompensatedEvent(command.getGlobalTxId(),command.getLocalTxId(),command.getParentTxId())); + } + + public static OmegaEventSender.Builder builder() { + return new OmegaEventSender.Builder(); + } + + public static final class Builder { + + private String serviceName = uniquify("omega-serviceName"); + private String instanceId = uniquify("omega-instanceId"); + + public OmegaEventSender.Builder serviceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + public OmegaEventSender.Builder instanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + + public OmegaEventSender build() { + OmegaEventSender omegaEventSender = new OmegaEventSender(); + omegaEventSender.instanceId = this.instanceId; + omegaEventSender.serviceName = this.serviceName; + return omegaEventSender; + } + } +} diff --git a/alpha/alpha-server/src/test/resources/log4j2.xml b/alpha/alpha-server/src/test/resources/log4j2.xml new file mode 100644 index 0000000..58924c6 --- /dev/null +++ b/alpha/alpha-server/src/test/resources/log4j2.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/pom.xml b/pom.xml index 3339b22..3c4e7ea 100644 --- a/pom.xml +++ b/pom.xml @@ -317,6 +317,11 @@ </dependency> <dependency> <groupId>org.apache.servicecomb.pack</groupId> + <artifactId>alpha-fsm</artifactId> + <version>0.5.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.servicecomb.pack</groupId> <artifactId>alpha-server</artifactId> <version>0.5.0-SNAPSHOT</version> </dependency>
