This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit f59e348f5f4d181bfa3fcaa6bea137fc5675c942 Author: Lei Zhang <[email protected]> AuthorDate: Tue Jul 2 17:16:27 2019 +0800 SCB-1321 Add SagaActor call compensate --- alpha/alpha-fsm/pom.xml | 4 + .../pack/alpha/fsm/FsmAutoConfiguration.java | 3 + .../servicecomb/pack/alpha/fsm/SagaActor.java | 62 +++++++++---- .../apache/servicecomb/pack/alpha/fsm/TxState.java | 2 +- .../pack/alpha/fsm/domain/AddTxEventDomain.java | 42 ++++++++- ...mponsitedEvent.java => TxCompensatedEvent.java} | 16 ++-- .../pack/alpha/fsm/event/TxStartedEvent.java | 103 +++++++++++++++++++++ .../pack/alpha/fsm/event/base/TxEvent.java | 18 ++++ .../fsm/event/consumer/SagaEventConsumer.java | 5 +- .../servicecomb/pack/alpha/fsm/model/TxEntity.java | 70 ++++++++++++++ .../fsm/spring/integration/akka/LogExtension.java | 31 ------- ...ogExtensionImpl.java => SagaDataExtension.java} | 25 +++-- .../integration/akka/SpringAkkaExtension.java | 77 +++++++++++++++ .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 32 +++---- .../pack/alpha/fsm/SagaEventSender.java | 50 +++++----- .../pack/alpha/fsm/SagaIntegrationTest.java | 26 +++--- .../apache/servicecomb/pack/common/EventType.java | 3 +- 17 files changed, 447 insertions(+), 122 deletions(-) diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml index 8788430..b6c09d3 100644 --- a/alpha/alpha-fsm/pom.xml +++ b/alpha/alpha-fsm/pom.xml @@ -63,6 +63,10 @@ <artifactId>pack-common</artifactId> </dependency> <dependency> + <groupId>org.apache.servicecomb.pack</groupId> + <artifactId>alpha-core</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java index c5082ab..c92804d 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java @@ -17,6 +17,8 @@ package org.apache.servicecomb.pack.alpha.fsm; +import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER; + import akka.actor.ActorSystem; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; @@ -38,6 +40,7 @@ public class FsmAutoConfiguration { @Bean public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) { ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment)); + SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext); return system; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index d22cb79..72be072 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -32,13 +32,14 @@ import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity; -import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -81,8 +82,13 @@ public class SagaActor extends when(SagaActorState.READY, matchEvent(TxStartedEvent.class, SagaData.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), - event.getLocalTxId()); + AddTxEventDomain domainEvent = new AddTxEventDomain( + event.getServiceName(), + event.getInstanceId(), + event.getParentTxId(), + event.getLocalTxId(), + event.getPayloads(), + event.getCompensationMethod()); if (data.getExpirationTime() > 0) { return goTo(SagaActorState.PARTIALLY_ACTIVE) .applying(domainEvent) @@ -131,8 +137,13 @@ public class SagaActor extends } ).event(TxStartedEvent.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), - event.getLocalTxId()); + AddTxEventDomain domainEvent = new AddTxEventDomain( + event.getServiceName(), + event.getInstanceId(), + event.getParentTxId(), + event.getLocalTxId(), + event.getPayloads(), + event.getCompensationMethod()); if (data.getExpirationTime() > 0) { return stay() .applying(domainEvent) @@ -165,8 +176,13 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_COMMITTED, matchEvent(TxStartedEvent.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), - event.getLocalTxId()); + AddTxEventDomain domainEvent = new AddTxEventDomain( + event.getServiceName(), + event.getInstanceId(), + event.getParentTxId(), + event.getLocalTxId(), + event.getPayloads(), + event.getCompensationMethod()); if (data.getExpirationTime() > 0) { return goTo(SagaActorState.PARTIALLY_ACTIVE) .applying(domainEvent) @@ -230,7 +246,7 @@ public class SagaActor extends .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } - ).event(TxComponsitedEvent.class, SagaData.class, + ).event(TxCompensatedEvent.class, SagaData.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), event.getLocalTxId(), TxState.COMPENSATED); @@ -240,8 +256,9 @@ public class SagaActor extends } ).event(TxComponsitedCheckInternalEvent.class, SagaData.class, (event, data) -> { - if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0) - || hasCommittedTx(data)) { +// if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0) +// || hasCommittedTx(data)) { + if (hasCompensationSentTx(data)) { return stay().replying(data); } else { return goTo(SagaActorState.COMPENSATED) @@ -255,7 +272,7 @@ public class SagaActor extends if (hasCommittedTx(data)) { SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED); return stay().replying(data).applying(domainEvent); - } else if(hasCompensationSentTx(data)){ + } else if (hasCompensationSentTx(data)) { return stay().replying(data); } else { SagaEndedDomain domainEvent = new SagaEndedDomain( @@ -268,8 +285,13 @@ public class SagaActor extends } ).event(TxStartedEvent.class, SagaData.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), - event.getLocalTxId()); + AddTxEventDomain domainEvent = new AddTxEventDomain( + event.getServiceName(), + event.getInstanceId(), + event.getParentTxId(), + event.getLocalTxId(), + event.getPayloads(), + event.getCompensationMethod()); return stay().applying(domainEvent); } ).event(TxEndedEvent.class, SagaData.class, @@ -332,7 +354,7 @@ public class SagaActor extends whenUnhandled( matchAnyEvent((event, data) -> { LOG.error("Unhandled event {}", event); - return goTo(SagaActorState.SUSPENDED).replying(data); + return stay(); }) ); @@ -340,7 +362,7 @@ public class SagaActor extends matchState(null, null, (from, to) -> { if (stateData().getGlobalTxId() != null) { stateData().setLastState(to); - LogExtension.LogExtensionProvider.get(getContext().getSystem()) + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) .putSagaData(stateData().getGlobalTxId(), stateData()); } LOG.info("transition {} {} -> {}", getSelf(), from, to); @@ -353,7 +375,7 @@ public class SagaActor extends LOG.info("stop {} {}", data.getGlobalTxId(), state); data.setTerminated(true); data.setLastState(state); - LogExtension.LogExtensionProvider.get(getContext().getSystem()) + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) .putSagaData(data.getGlobalTxId(), data); } ) @@ -372,8 +394,13 @@ public class SagaActor extends AddTxEventDomain domainEvent = (AddTxEventDomain) event; if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) { TxEntity txEntity = TxEntity.builder() + .serviceName(domainEvent.getServiceName()) + .instanceId(domainEvent.getInstanceId()) + .globalTxId(data.getGlobalTxId()) .localTxId(domainEvent.getLocalTxId()) .parentTxId(domainEvent.getParentTxId()) + .compensationMethod(domainEvent.getCompensationMethod()) + .payloads(domainEvent.getPayloads()) .state(domainEvent.getState()) .build(); data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); @@ -451,6 +478,7 @@ public class SagaActor extends // increments the compensation running counter by one data.getCompensationRunningCounter().incrementAndGet(); //TODO call omega compensate method + SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity); LOG.info("compensate {}", txEntity.getLocalTxId()); txEntity.setState(TxState.COMPENSATION_SENT); } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java index af02ce8..655db30 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java @@ -21,6 +21,6 @@ public enum TxState { ACTIVE, FAILED, COMMITTED, - COMPENSATION_SENT, // The compensation method has been called to wait for TxComponsitedEvent + COMPENSATION_SENT, // The compensation method has been called to wait for TxCompensatedEvent COMPENSATED } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java index c7c65a3..af5936d 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java @@ -20,13 +20,37 @@ package org.apache.servicecomb.pack.alpha.fsm.domain; import org.apache.servicecomb.pack.alpha.fsm.TxState; public class AddTxEventDomain implements DomainEvent { + private String serviceName; + private String instanceId; private String parentTxId; private String localTxId; private TxState state = TxState.ACTIVE; + private String compensationMethod; + private byte[] payloads; - public AddTxEventDomain(String parentTxId, String localTxId) { + public AddTxEventDomain(String serviceName, String instanceId, String parentTxId, String localTxId, byte[] payloads, String compensationMethod) { + this.serviceName = serviceName; + this.instanceId = instanceId; this.parentTxId = parentTxId; this.localTxId = localTxId; + this.compensationMethod = compensationMethod; + this.payloads = payloads; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; } public String getParentTxId() { @@ -52,4 +76,20 @@ public class AddTxEventDomain implements DomainEvent { public void setState(TxState state) { this.state = state; } + + public String getCompensationMethod() { + return compensationMethod; + } + + public void setCompensationMethod(String compensationMethod) { + this.compensationMethod = compensationMethod; + } + + public byte[] getPayloads() { + return payloads; + } + + public void setPayloads(byte[] payloads) { + this.payloads = payloads; + } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java similarity index 77% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java index 1230fff..c007993 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java @@ -19,7 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; -public class TxComponsitedEvent extends TxEvent { +public class TxCompensatedEvent extends TxEvent { public static Builder builder() { return new Builder(); @@ -27,29 +27,29 @@ public class TxComponsitedEvent extends TxEvent { public static final class Builder { - private TxComponsitedEvent txComponsitedEvent; + private TxCompensatedEvent txCompensatedEvent; private Builder() { - txComponsitedEvent = new TxComponsitedEvent(); + txCompensatedEvent = new TxCompensatedEvent(); } public Builder parentTxId(String parentTxId) { - txComponsitedEvent.setParentTxId(parentTxId); + txCompensatedEvent.setParentTxId(parentTxId); return this; } public Builder localTxId(String localTxId) { - txComponsitedEvent.setLocalTxId(localTxId); + txCompensatedEvent.setLocalTxId(localTxId); return this; } public Builder globalTxId(String globalTxId) { - txComponsitedEvent.setGlobalTxId(globalTxId); + txCompensatedEvent.setGlobalTxId(globalTxId); return this; } - public TxComponsitedEvent build() { - return txComponsitedEvent; + public TxCompensatedEvent build() { + return txCompensatedEvent; } } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java index bf259d0..5f173ef 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java @@ -17,9 +17,77 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; public class TxStartedEvent extends TxEvent { + private String serviceName; + private String instanceId; + private String compensationMethod; + private byte[] payloads; + private Date creationTime; + private String retryMethod; + private int retries; + + @Override + public String getServiceName() { + return serviceName; + } + + @Override + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public String getCompensationMethod() { + return compensationMethod; + } + + public void setCompensationMethod(String compensationMethod) { + this.compensationMethod = compensationMethod; + } + + public byte[] getPayloads() { + return payloads; + } + + public void setPayloads(byte[] payloads) { + this.payloads = payloads; + } + + public Date getCreationTime() { + return creationTime; + } + + public void setCreationTime(Date creationTime) { + this.creationTime = creationTime; + } + + public String getRetryMethod() { + return retryMethod; + } + + public void setRetryMethod(String retryMethod) { + this.retryMethod = retryMethod; + } + + public int getRetries() { + return retries; + } + + public void setRetries(int retries) { + this.retries = retries; + } public static Builder builder() { return new Builder(); @@ -48,6 +116,41 @@ public class TxStartedEvent extends TxEvent { return this; } + public Builder compensationMethod(String compensationMethod) { + txStartedEvent.setCompensationMethod(compensationMethod); + return this; + } + + public Builder payloads(byte[] payloads) { + txStartedEvent.setPayloads(payloads); + return this; + } + + public Builder serviceName(String serviceName) { + txStartedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + txStartedEvent.setInstanceId(instanceId); + return this; + } + + public Builder creationTime(Date creationTime) { + txStartedEvent.setCreationTime(creationTime); + return this; + } + + public Builder retryMethod(String retryMethod) { + txStartedEvent.setRetryMethod(retryMethod); + return this; + } + + public Builder retries(int retries) { + txStartedEvent.setRetries(retries); + return this; + } + public TxStartedEvent build() { return txStartedEvent; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java index ceaa242..61c3656 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java @@ -18,6 +18,8 @@ package org.apache.servicecomb.pack.alpha.fsm.event.base; public abstract class TxEvent extends BaseEvent { + private String serviceName; + private String instanceId; private String parentTxId; private String localTxId; @@ -37,6 +39,22 @@ public abstract class TxEvent extends BaseEvent { this.localTxId = localTxId; } + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + @Override public String toString() { return this.getClass().getSimpleName() + "{" + diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java index aae9419..7ddbc1a 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java @@ -37,19 +37,18 @@ import scala.concurrent.Future; public class SagaEventConsumer { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final Timeout TIMEOUT = new Timeout(1000, TimeUnit.MILLISECONDS); + public static final Timeout TIMEOUT = new Timeout(5, TimeUnit.SECONDS); @Autowired ActorSystem system; /** - * Receive saga message + * Receive fsm message * */ @Subscribe public void receiveSagaEvent(BaseEvent event) throws Exception { LOG.info("receive {} ", event.toString()); try{ - //TODO Write-Ahead Logging ActorRef saga; String actorPath = "/user/" + event.getGlobalTxId(); Optional<ActorRef> optional = this.getActorRefFromPath(actorPath); diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java index 1eb3020..0122adc 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java @@ -21,11 +21,32 @@ import java.io.Serializable; import org.apache.servicecomb.pack.alpha.fsm.TxState; public class TxEntity implements Serializable { + private String serviceName; + private String instanceId; + private String globalTxId; private long beginTime = System.currentTimeMillis(); private long endTime; private String parentTxId; private String localTxId; private TxState state; + private String compensationMethod; + private byte[] payloads; + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } public long getBeginTime() { return beginTime; @@ -43,6 +64,14 @@ public class TxEntity implements Serializable { this.endTime = endTime; } + public String getGlobalTxId() { + return globalTxId; + } + + public void setGlobalTxId(String globalTxId) { + this.globalTxId = globalTxId; + } + public String getParentTxId() { return parentTxId; } @@ -67,6 +96,22 @@ public class TxEntity implements Serializable { this.state = state; } + public String getCompensationMethod() { + return compensationMethod; + } + + public void setCompensationMethod(String compensationMethod) { + this.compensationMethod = compensationMethod; + } + + public byte[] getPayloads() { + return payloads; + } + + public void setPayloads(byte[] payloads) { + this.payloads = payloads; + } + public static Builder builder() { return new Builder(); } @@ -89,6 +134,11 @@ public class TxEntity implements Serializable { return this; } + public Builder globalTxId(String globalTxId) { + txEntity.setGlobalTxId(globalTxId); + return this; + } + public Builder parentTxId(String parentTxId) { txEntity.setParentTxId(parentTxId); return this; @@ -99,11 +149,31 @@ public class TxEntity implements Serializable { return this; } + public Builder compensationMethod(String compensationMethod) { + txEntity.setCompensationMethod(compensationMethod); + return this; + } + + public Builder payloads(byte[] payloads) { + txEntity.setPayloads(payloads); + return this; + } + public Builder state(TxState state) { txEntity.setState(state); return this; } + public Builder serviceName(String serviceName) { + txEntity.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + txEntity.setInstanceId(instanceId); + return this; + } + public TxEntity build() { return txEntity; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java deleted file mode 100644 index 9c4d27d..0000000 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka; - -import akka.actor.AbstractExtensionId; -import akka.actor.ExtendedActorSystem; - -public class LogExtension extends AbstractExtensionId<LogExtensionImpl> { - - public static final LogExtension LogExtensionProvider = new LogExtension(); - - @Override - public LogExtensionImpl createExtension(ExtendedActorSystem system) { - return new LogExtensionImpl(); - } -} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java similarity index 56% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java index 4bf00b4..774a2e2 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java @@ -17,19 +17,32 @@ package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka; +import akka.actor.AbstractExtensionId; +import akka.actor.ExtendedActorSystem; import akka.actor.Extension; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt; -public class LogExtensionImpl implements Extension { - private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap(); +public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { - public void putSagaData(String globalTxId, SagaData sagaData){ - sagaDataMap.put(globalTxId, sagaData); + public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension(); + + @Override + public SagaDataExt createExtension(ExtendedActorSystem system) { + return new SagaDataExt(); } - public SagaData getSagaData(String globalTxId){ - return sagaDataMap.get(globalTxId); + public static class SagaDataExt implements Extension { + private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap(); + + public void putSagaData(String globalTxId, SagaData sagaData){ + sagaDataMap.put(globalTxId, sagaData); + } + + public SagaData getSagaData(String globalTxId){ + return sagaDataMap.get(globalTxId); + } } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java new file mode 100644 index 0000000..6994084 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka; + +import static org.apache.servicecomb.pack.common.EventType.TxStartedEvent; + +import akka.actor.AbstractExtensionId; +import akka.actor.ExtendedActorSystem; +import akka.actor.Extension; +import java.lang.invoke.MethodHandles; +import org.apache.servicecomb.pack.alpha.core.OmegaCallback; +import org.apache.servicecomb.pack.alpha.core.TxEvent; +import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SpringExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +public class SpringAkkaExtension extends AbstractExtensionId<SpringExt> { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final SpringAkkaExtension SPRING_EXTENSION_PROVIDER = new SpringAkkaExtension(); + + @Override + public SpringExt createExtension(ExtendedActorSystem system) { + return new SpringExt(); + } + + public static class SpringExt implements Extension { + + private static final String omegaCallbackBeanName = "omegaCallback"; + private volatile ApplicationContext applicationContext; + private OmegaCallback omegaCallback; + + public void compensate(TxEntity txEntity) { + if (applicationContext != null) { + if (applicationContext.containsBean(omegaCallbackBeanName)) { + omegaCallback = applicationContext.getBean(omegaCallbackBeanName, OmegaCallback.class); + TxEvent event = new TxEvent( + txEntity.getServiceName(), + txEntity.getInstanceId(), + txEntity.getGlobalTxId(), + txEntity.getLocalTxId(), + txEntity.getParentTxId(), + TxStartedEvent.name(), + txEntity.getCompensationMethod(), + txEntity.getPayloads()); + omegaCallback.compensate(event); + LOG.info(omegaCallback.toString()); + } else { + LOG.warn("Spring Bean {} doesn't exist in ApplicationContext", omegaCallbackBeanName); + } + } else { + LOG.warn("Spring ApplicationContext is null"); + } + } + + public void initialize(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + } +} diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index 313c526..71962df 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -31,7 +31,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; -import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -186,7 +186,7 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); - //expectTerminated(saga); + //expectTerminated(fsm); ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga"); watch(recoveredSaga); @@ -352,7 +352,7 @@ public class SagaActorTest { * 3. TxEndedEvent-11 * 4. TxStartedEvent-12 * 5. TxAbortedEvent-12 - * 6. TxComponsitedEvent-11 + * 6. TxCompensatedEvent-11 * 7. SagaAbortedEvent-1 */ @Test @@ -414,8 +414,8 @@ public class SagaActorTest { * 5. TxEndedEvent-12 * 6. TxStartedEvent-13 * 7. TxAbortedEvent-13 - * 8. TxComponsitedEvent-11 - * 9. TxComponsitedEvent-12 + * 8. TxCompensatedEvent-11 + * 9. TxCompensatedEvent-12 * 10. SagaAbortedEvent-1 */ @Test @@ -485,8 +485,8 @@ public class SagaActorTest { * 5. TxEndedEvent-12 * 6. TxStartedEvent-13 * 7. TxAbortedEvent-13 - * 8. TxComponsitedEvent-11 - * 9. TxComponsitedEvent-12 + * 8. TxCompensatedEvent-11 + * 9. TxCompensatedEvent-12 * 10. SagaAbortedEvent-1 */ @Test @@ -538,7 +538,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class); + sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -558,8 +558,8 @@ public class SagaActorTest { * 5. TxEndedEvent-12 * 6. TxStartedEvent-13 * 7. TxEndedEvent-13 - * 8. TxComponsitedEvent-12 - * 9. TxComponsitedEvent-13 + * 8. TxCompensatedEvent-12 + * 9. TxCompensatedEvent-13 * 10. SagaAbortedEvent-1 */ @Test @@ -619,9 +619,9 @@ public class SagaActorTest { * 6. TxStartedEvent-13 * 7. TxEndedEvent-13 * 8. SagaAbortedEvent-1 - * 9. TxComponsitedEvent-11 - * 8. TxComponsitedEvent-12 - * 9. TxComponsitedEvent-13 + * 9. TxCompensatedEvent-11 + * 8. TxCompensatedEvent-12 + * 9. TxCompensatedEvent-13 */ @Test public void sagaAbortedEventAfterAllTxEndedTest() { @@ -673,7 +673,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -940,8 +940,8 @@ public class SagaActorTest { * 3. TxEndedEvent-11 * 5. TxEndedEvent-12 * 7. TxAbortedEvent-13 - * 8. TxComponsitedEvent-11 - * 9. TxComponsitedEvent-12 + * 8. TxCompensatedEvent-11 + * 9. TxCompensatedEvent-12 * 10. SagaAbortedEvent-1 */ @Test diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java index d303f86..925bbc2 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java @@ -24,7 +24,7 @@ import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; @@ -79,7 +79,7 @@ public class SagaEventSender { * 3. TxEndedEvent-11 * 4. TxStartedEvent-12 * 5. TxAbortedEvent-12 - * 6. TxComponsitedEvent-11 + * 6. TxCompensatedEvent-11 * 7. SagaAbortedEvent-1 */ public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2){ @@ -89,7 +89,7 @@ public class SagaEventSender { sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); return sagaEvents; } @@ -102,8 +102,8 @@ public class SagaEventSender { * 5. TxEndedEvent-12 * 6. TxStartedEvent-13 * 7. TxAbortedEvent-13 - * 8. TxComponsitedEvent-11 - * 9. TxComponsitedEvent-12 + * 8. TxCompensatedEvent-11 + * 9. TxCompensatedEvent-12 * 10. SagaAbortedEvent-1 */ public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ @@ -115,8 +115,8 @@ public class SagaEventSender { sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); return sagaEvents; } @@ -130,8 +130,8 @@ public class SagaEventSender { * 6. TxStartedEvent-13 * 7. TxAbortedEvent-13 * 8. SagaAbortedEvent-1 - * 9. TxComponsitedEvent-11 - * 10. TxComponsitedEvent-12 + * 9. TxCompensatedEvent-11 + * 10. TxCompensatedEvent-12 */ public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); @@ -143,8 +143,8 @@ public class SagaEventSender { sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); return sagaEvents; } @@ -156,8 +156,8 @@ public class SagaEventSender { * 5. TxEndedEvent-12 * 6. TxStartedEvent-13 * 7. TxEndedEvent-13 - * 8. TxComponsitedEvent-12 - * 9. TxComponsitedEvent-13 + * 8. TxCompensatedEvent-12 + * 9. TxCompensatedEvent-13 * 10. SagaAbortedEvent-1 */ public static List<BaseEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ @@ -169,8 +169,8 @@ public class SagaEventSender { sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); return sagaEvents; } @@ -184,9 +184,9 @@ public class SagaEventSender { * 6. TxStartedEvent-13 * 7. TxEndedEvent-13 * 8. SagaAbortedEvent-1 - * 9. TxComponsitedEvent-11 - * 8. TxComponsitedEvent-12 - * 9. TxComponsitedEvent-13 + * 9. TxCompensatedEvent-11 + * 8. TxCompensatedEvent-12 + * 9. TxCompensatedEvent-13 */ public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); @@ -198,9 +198,9 @@ public class SagaEventSender { sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); return sagaEvents; } @@ -302,8 +302,8 @@ public class SagaEventSender { * 3. TxEndedEvent-11 * 5. TxEndedEvent-12 * 7. TxAbortedEvent-13 - * 8. TxComponsitedEvent-11 - * 9. TxComponsitedEvent-12 + * 8. TxCompensatedEvent-11 + * 9. TxCompensatedEvent-12 * 10. SagaAbortedEvent-1 */ public static List<BaseEvent> lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ @@ -315,8 +315,8 @@ public class SagaEventSender { sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); return sagaEvents; } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index 85c8b18..9a65bfb 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -24,7 +24,7 @@ import akka.actor.ActorSystem; import com.google.common.eventbus.EventBus; import java.util.UUID; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; -import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -58,7 +58,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMMITTED && sagaData.getBeginTime() > 0 @@ -82,7 +82,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED && sagaData.getBeginTime() > 0 @@ -105,7 +105,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED && sagaData.getBeginTime() > 0 @@ -130,7 +130,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED && sagaData.getBeginTime() > 0 @@ -156,7 +156,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED && sagaData.getBeginTime() > 0 @@ -182,7 +182,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED && sagaData.getBeginTime() > 0 @@ -208,7 +208,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED && sagaData.getBeginTime() > 0 @@ -234,7 +234,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.SUSPENDED && sagaData.getBeginTime() > 0 @@ -261,7 +261,7 @@ public class SagaIntegrationTest { }); await().atMost(timeout+1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.SUSPENDED && sagaData.getBeginTime() > 0 @@ -287,7 +287,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMMITTED && sagaData.getBeginTime() > 0 @@ -313,7 +313,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMMITTED && sagaData.getBeginTime() > 0 @@ -339,7 +339,7 @@ public class SagaIntegrationTest { }); await().atMost(1, SECONDS).until(() -> { - SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED && sagaData.getBeginTime() > 0 diff --git a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java index f3d0585..774b2e9 100644 --- a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java +++ b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java @@ -23,5 +23,6 @@ public enum EventType { TxEndedEvent, TxAbortedEvent, TxCompensatedEvent, - SagaEndedEvent + SagaEndedEvent, + SagaAbortedEvent }
