This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 249fb5f6ec6505e42fe86230de07aa2d1c9ed1b7 Author: Lei Zhang <[email protected]> AuthorDate: Wed Jul 3 10:05:28 2019 +0800 SCB-1321 Record throwable stack to SagaData --- .../org/apache/servicecomb/pack/alpha/fsm/SagaActor.java | 13 +++++++------ .../pack/alpha/fsm/domain/UpdateTxEventDomain.java | 12 +++++++++++- .../servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java | 14 ++++++++++++++ .../apache/servicecomb/pack/alpha/fsm/model/TxEntity.java | 14 ++++++++++++++ .../pack/alpha/server/fsm/GrpcSagaEventService.java | 3 ++- .../pack/alpha/server/fsm/AlphaIntegrationFsmTest.java | 6 ++---- .../pack/alpha/server/fsm/OmegaEventSagaSimulator.java | 12 ++++++------ 7 files changed, 56 insertions(+), 18 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index fc5477c..51d6c52 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -125,7 +125,7 @@ public class SagaActor extends matchEvent(TxEndedEvent.class, SagaData.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMMITTED); + event.getLocalTxId(), TxState.COMMITTED, new byte[0]); if (data.getExpirationTime() > 0) { return goTo(SagaActorState.PARTIALLY_COMMITTED) .applying(domainEvent) @@ -163,7 +163,7 @@ public class SagaActor extends ).event(TxAbortedEvent.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.FAILED); + event.getLocalTxId(), TxState.FAILED, event.getPayloads()); return goTo(SagaActorState.FAILED) .applying(domainEvent); } @@ -195,7 +195,7 @@ public class SagaActor extends ).event(TxEndedEvent.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMMITTED); + event.getLocalTxId(), TxState.COMMITTED, new byte[0]); if (data.getExpirationTime() > 0) { return stay() .applying(domainEvent) @@ -228,7 +228,7 @@ public class SagaActor extends ).event(TxAbortedEvent.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.FAILED); + event.getLocalTxId(), TxState.FAILED, event.getPayloads()); return goTo(SagaActorState.FAILED).applying(domainEvent); } ).event(Arrays.asList(StateTimeout()), SagaData.class, @@ -249,7 +249,7 @@ public class SagaActor extends ).event(TxCompensatedEvent.class, SagaData.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMPENSATED); + event.getLocalTxId(), TxState.COMPENSATED, new byte[0]); return stay().applying(domainEvent).andThen(exec(_data -> { self().tell(TxComponsitedCheckInternalEvent.builder().build(), self()); })); @@ -295,7 +295,7 @@ public class SagaActor extends ).event(TxEndedEvent.class, SagaData.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMMITTED); + event.getLocalTxId(), TxState.COMMITTED, new byte[0]); return stay().applying(domainEvent).andThen(exec(_data -> { TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId()); // call compensate @@ -414,6 +414,7 @@ public class SagaActor extends txEntity.setState(domainEvent.getState()); } else if (domainEvent.getState() == TxState.FAILED) { txEntity.setState(domainEvent.getState()); + txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads()); data.getTxEntityMap().forEach((k, v) -> { if (v.getState() == TxState.COMMITTED) { // call compensate diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java index 839cfe0..2c8831c 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java @@ -23,11 +23,13 @@ public class UpdateTxEventDomain implements DomainEvent { private String parentTxId; private String localTxId; private TxState state; + private byte[] throwablePayLoads; - public UpdateTxEventDomain(String parentTxId, String localTxId, TxState state) { + public UpdateTxEventDomain(String parentTxId, String localTxId, TxState state, byte[] throwablePayLoads) { this.parentTxId = parentTxId; this.localTxId = localTxId; this.state = state; + this.throwablePayLoads = throwablePayLoads; } public String getParentTxId() { @@ -53,4 +55,12 @@ public class UpdateTxEventDomain implements DomainEvent { public void setState(TxState state) { this.state = state; } + + public byte[] getThrowablePayLoads() { + return throwablePayLoads; + } + + public void setThrowablePayLoads(byte[] throwablePayLoads) { + this.throwablePayLoads = throwablePayLoads; + } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java index 8ee61de..ca1f9e4 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java @@ -21,6 +21,15 @@ import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; public class TxAbortedEvent extends TxEvent { + private byte[] payloads; + + public byte[] getPayloads() { + return payloads; + } + + public void setPayloads(byte[] payloads) { + this.payloads = payloads; + } public static Builder builder() { return new Builder(); @@ -49,6 +58,11 @@ public class TxAbortedEvent extends TxEvent { return this; } + public Builder payloads(byte[] payloads) { + txAbortedEvent.setPayloads(payloads); + return this; + } + public TxAbortedEvent build() { return txAbortedEvent; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java index 0122adc..de849e2 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java @@ -31,6 +31,7 @@ public class TxEntity implements Serializable { private TxState state; private String compensationMethod; private byte[] payloads; + private byte[] throwablePayLoads; public String getServiceName() { return serviceName; @@ -112,6 +113,14 @@ public class TxEntity implements Serializable { this.payloads = payloads; } + public byte[] getThrowablePayLoads() { + return throwablePayLoads; + } + + public void setThrowablePayLoads(byte[] throwablePayLoads) { + this.throwablePayLoads = throwablePayLoads; + } + public static Builder builder() { return new Builder(); } @@ -159,6 +168,11 @@ public class TxEntity implements Serializable { return this; } + public Builder throwablePayLoads(byte[] throwablePayLoads) { + txEntity.setThrowablePayLoads(throwablePayLoads); + return this; + } + public Builder state(TxState state) { txEntity.setState(state); return this; diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java index a54ef83..2394ab2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java @@ -115,7 +115,8 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { event = org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent.builder() .globalTxId(message.getGlobalTxId()) .parentTxId(message.getParentTxId()) - .localTxId(message.getLocalTxId()).build(); + .localTxId(message.getLocalTxId()) + .payloads(message.getPayloads().toByteArray()).build(); } else if (message.getType().equals(EventType.TxCompensatedEvent.name())) { event = org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent.builder() .globalTxId(message.getGlobalTxId()) diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java index b366c53..3080e2c 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java @@ -97,10 +97,7 @@ public class AlphaIntegrationFsmTest { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; }); - - //await().atMost(1, SECONDS).until(() -> SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId) != null); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - //Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMMITTED); Assert.assertEquals(sagaData.getTxEntityMap().size(),3); Assert.assertTrue(sagaData.getBeginTime() > 0); Assert.assertTrue(sagaData.getEndTime() > 0); @@ -164,7 +161,7 @@ public class AlphaIntegrationFsmTest { omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { omegaEventSender.getBlockingStub().onTxEvent(event); }); - await().atMost(1, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); @@ -177,6 +174,7 @@ public class AlphaIntegrationFsmTest { Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED); Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED); + Assert.assertArrayEquals(sagaData.getTxEntityMap().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes()); } @Test diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java index 8623953..52bcba5 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java @@ -45,7 +45,7 @@ public class OmegaEventSagaSimulator { List<GrpcTxEvent> sagaEvents = new ArrayList<>(); sagaEvents.add(sagaStartedEvent(globalTxId)); sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); - sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); + sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, NullPointerException.class.getName().getBytes(), "method a")); sagaEvents.add(sagaAbortedEvent(globalTxId)); return sagaEvents; } @@ -56,7 +56,7 @@ public class OmegaEventSagaSimulator { sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); - sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); + sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, NullPointerException.class.getName().getBytes(), "method b")); sagaEvents.add(sagaAbortedEvent(globalTxId)); return sagaEvents; } @@ -69,7 +69,7 @@ public class OmegaEventSagaSimulator { sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); - sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); + sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, NullPointerException.class.getName().getBytes(), "method c")); sagaEvents.add(sagaAbortedEvent(globalTxId)); return sagaEvents; } @@ -78,7 +78,7 @@ public class OmegaEventSagaSimulator { List<GrpcTxEvent> sagaEvents = new ArrayList<>(); sagaEvents.add(sagaStartedEvent(globalTxId)); sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); - sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); + sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, NullPointerException.class.getName().getBytes(), "method a")); sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); @@ -159,7 +159,7 @@ public class OmegaEventSagaSimulator { sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a")); sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); - sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); + sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, NullPointerException.class.getName().getBytes(), "method c")); sagaEvents.add(sagaAbortedEvent(globalTxId)); return sagaEvents; } @@ -173,7 +173,7 @@ public class OmegaEventSagaSimulator { sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b")); sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); - sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c")); + sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, NullPointerException.class.getName().getBytes(), "method c")); sagaEvents.add(sagaAbortedEvent(globalTxId)); return sagaEvents; }
