This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 249fb5f6ec6505e42fe86230de07aa2d1c9ed1b7
Author: Lei Zhang <[email protected]>
AuthorDate: Wed Jul 3 10:05:28 2019 +0800

    SCB-1321 Record throwable stack to SagaData
---
 .../org/apache/servicecomb/pack/alpha/fsm/SagaActor.java   | 13 +++++++------
 .../pack/alpha/fsm/domain/UpdateTxEventDomain.java         | 12 +++++++++++-
 .../servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java   | 14 ++++++++++++++
 .../apache/servicecomb/pack/alpha/fsm/model/TxEntity.java  | 14 ++++++++++++++
 .../pack/alpha/server/fsm/GrpcSagaEventService.java        |  3 ++-
 .../pack/alpha/server/fsm/AlphaIntegrationFsmTest.java     |  6 ++----
 .../pack/alpha/server/fsm/OmegaEventSagaSimulator.java     | 12 ++++++------
 7 files changed, 56 insertions(+), 18 deletions(-)

diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index fc5477c..51d6c52 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -125,7 +125,7 @@ public class SagaActor extends
         matchEvent(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new 
UpdateTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId(), TxState.COMMITTED);
+                  event.getLocalTxId(), TxState.COMMITTED, new byte[0]);
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_COMMITTED)
                     .applying(domainEvent)
@@ -163,7 +163,7 @@ public class SagaActor extends
         ).event(TxAbortedEvent.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new 
UpdateTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId(), TxState.FAILED);
+                  event.getLocalTxId(), TxState.FAILED, event.getPayloads());
               return goTo(SagaActorState.FAILED)
                   .applying(domainEvent);
             }
@@ -195,7 +195,7 @@ public class SagaActor extends
         ).event(TxEndedEvent.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new 
UpdateTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId(), TxState.COMMITTED);
+                  event.getLocalTxId(), TxState.COMMITTED, new byte[0]);
               if (data.getExpirationTime() > 0) {
                 return stay()
                     .applying(domainEvent)
@@ -228,7 +228,7 @@ public class SagaActor extends
         ).event(TxAbortedEvent.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new 
UpdateTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId(), TxState.FAILED);
+                  event.getLocalTxId(), TxState.FAILED, event.getPayloads());
               return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
@@ -249,7 +249,7 @@ public class SagaActor extends
         ).event(TxCompensatedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new 
UpdateTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId(), TxState.COMPENSATED);
+                  event.getLocalTxId(), TxState.COMPENSATED, new byte[0]);
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 self().tell(TxComponsitedCheckInternalEvent.builder().build(), 
self());
               }));
@@ -295,7 +295,7 @@ public class SagaActor extends
         ).event(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new 
UpdateTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId(), TxState.COMMITTED);
+                  event.getLocalTxId(), TxState.COMMITTED, new byte[0]);
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 TxEntity txEntity = 
_data.getTxEntityMap().get(event.getLocalTxId());
                 // call compensate
@@ -414,6 +414,7 @@ public class SagaActor extends
         txEntity.setState(domainEvent.getState());
       } else if (domainEvent.getState() == TxState.FAILED) {
         txEntity.setState(domainEvent.getState());
+        txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
         data.getTxEntityMap().forEach((k, v) -> {
           if (v.getState() == TxState.COMMITTED) {
             // call compensate
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 839cfe0..2c8831c 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -23,11 +23,13 @@ public class UpdateTxEventDomain implements DomainEvent {
   private String parentTxId;
   private String localTxId;
   private TxState state;
+  private byte[] throwablePayLoads;
 
-  public UpdateTxEventDomain(String parentTxId, String localTxId, TxState 
state) {
+  public UpdateTxEventDomain(String parentTxId, String localTxId, TxState 
state, byte[] throwablePayLoads) {
     this.parentTxId = parentTxId;
     this.localTxId = localTxId;
     this.state = state;
+    this.throwablePayLoads = throwablePayLoads;
   }
 
   public String getParentTxId() {
@@ -53,4 +55,12 @@ public class UpdateTxEventDomain implements DomainEvent {
   public void setState(TxState state) {
     this.state = state;
   }
+
+  public byte[] getThrowablePayLoads() {
+    return throwablePayLoads;
+  }
+
+  public void setThrowablePayLoads(byte[] throwablePayLoads) {
+    this.throwablePayLoads = throwablePayLoads;
+  }
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
index 8ee61de..ca1f9e4 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
@@ -21,6 +21,15 @@ import 
org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 
 public class TxAbortedEvent extends TxEvent {
 
+  private byte[] payloads;
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
 
   public static Builder builder() {
     return new Builder();
@@ -49,6 +58,11 @@ public class TxAbortedEvent extends TxEvent {
       return this;
     }
 
+    public Builder payloads(byte[] payloads) {
+      txAbortedEvent.setPayloads(payloads);
+      return this;
+    }
+
     public TxAbortedEvent build() {
       return txAbortedEvent;
     }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
index 0122adc..de849e2 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -31,6 +31,7 @@ public class TxEntity implements Serializable {
   private TxState state;
   private String compensationMethod;
   private byte[] payloads;
+  private byte[] throwablePayLoads;
 
   public String getServiceName() {
     return serviceName;
@@ -112,6 +113,14 @@ public class TxEntity implements Serializable {
     this.payloads = payloads;
   }
 
+  public byte[] getThrowablePayLoads() {
+    return throwablePayLoads;
+  }
+
+  public void setThrowablePayLoads(byte[] throwablePayLoads) {
+    this.throwablePayLoads = throwablePayLoads;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -159,6 +168,11 @@ public class TxEntity implements Serializable {
       return this;
     }
 
+    public Builder throwablePayLoads(byte[] throwablePayLoads) {
+      txEntity.setThrowablePayLoads(throwablePayLoads);
+      return this;
+    }
+
     public Builder state(TxState state) {
       txEntity.setState(state);
       return this;
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index a54ef83..2394ab2 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -115,7 +115,8 @@ public class GrpcSagaEventService extends 
TxEventServiceImplBase {
       event = 
org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent.builder()
           .globalTxId(message.getGlobalTxId())
           .parentTxId(message.getParentTxId())
-          .localTxId(message.getLocalTxId()).build();
+          .localTxId(message.getLocalTxId())
+          .payloads(message.getPayloads().toByteArray()).build();
     } else if (message.getType().equals(EventType.TxCompensatedEvent.name())) {
       event = 
org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent.builder()
           .globalTxId(message.getGlobalTxId())
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index b366c53..3080e2c 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -97,10 +97,7 @@ public class AlphaIntegrationFsmTest {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMMITTED;
     });
-
-    //await().atMost(1, SECONDS).until(() -> 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId)
 != null);
     SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-    //Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMMITTED);
     Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
     Assert.assertTrue(sagaData.getBeginTime() > 0);
     Assert.assertTrue(sagaData.getEndTime() > 0);
@@ -164,7 +161,7 @@ public class AlphaIntegrationFsmTest {
     
omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
       omegaEventSender.getBlockingStub().onTxEvent(event);
     });
-    await().atMost(1, SECONDS).until(() -> {
+    await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       return sagaData !=null && 
sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
@@ -177,6 +174,7 @@ public class AlphaIntegrationFsmTest {
     
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
     
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
     
Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED);
+    
Assert.assertArrayEquals(sagaData.getTxEntityMap().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes());
   }
 
   @Test
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 8623953..52bcba5 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -45,7 +45,7 @@ public class OmegaEventSagaSimulator {
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
-    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, 
NullPointerException.class.getName().getBytes(), "method a"));
     sagaEvents.add(sagaAbortedEvent(globalTxId));
     return sagaEvents;
   }
@@ -56,7 +56,7 @@ public class OmegaEventSagaSimulator {
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
-    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, 
NullPointerException.class.getName().getBytes(), "method b"));
     sagaEvents.add(sagaAbortedEvent(globalTxId));
     return sagaEvents;
   }
@@ -69,7 +69,7 @@ public class OmegaEventSagaSimulator {
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
-    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
NullPointerException.class.getName().getBytes(), "method c"));
     sagaEvents.add(sagaAbortedEvent(globalTxId));
     return sagaEvents;
   }
@@ -78,7 +78,7 @@ public class OmegaEventSagaSimulator {
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
-    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, 
"service a".getBytes(), "method a"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_1, globalTxId, 
NullPointerException.class.getName().getBytes(), "method a"));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
@@ -159,7 +159,7 @@ public class OmegaEventSagaSimulator {
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service 
a".getBytes(), "method a"));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
-    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
NullPointerException.class.getName().getBytes(), "method c"));
     sagaEvents.add(sagaAbortedEvent(globalTxId));
     return sagaEvents;
   }
@@ -173,7 +173,7 @@ public class OmegaEventSagaSimulator {
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, 
"service b".getBytes(), "method b"));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service 
b".getBytes(), "method b"));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
-    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
"service c".getBytes(), "method c"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, 
NullPointerException.class.getName().getBytes(), "method c"));
     sagaEvents.add(sagaAbortedEvent(globalTxId));
     return sagaEvents;
   }

Reply via email to