This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new efe5109b When the action has no 'continueOn', continue to invoke subsequent actions. (#11679) efe5109b is described below commit efe5109bc4afcf803d802cf195ef226cd31fb8ba Author: Bharani Chadalavada <bharanic....@gmail.com> AuthorDate: Tue Aug 17 15:58:12 2021 -0700 When the action has no 'continueOn', continue to invoke subsequent actions. (#11679) Co-authored-by: Bharani Chadalavada <bchadalav...@splunk.com> --- .../org/apache/pulsar/functions/utils/Actions.java | 13 +- .../apache/pulsar/functions/utils/ActionsTest.java | 201 +++++++++++++-------- 2 files changed, 131 insertions(+), 83 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java index a451c08..05a6356 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java @@ -94,12 +94,13 @@ public class Actions { log.error("Uncaught exception thrown when running action [ {} ]:", action.getActionName(), e); success = false; } - if (action.getContinueOn() != null - && success == action.getContinueOn()) { - continue; - } else { - // terminate - break; + if (action.getContinueOn() != null) { + if (success == action.getContinueOn()) { + continue; + } else { + // terminate + break; + } } } } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java index f728f67..904b6e3 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java @@ -34,7 +34,7 @@ import static org.testng.Assert.assertEquals; public class ActionsTest { @Test - public void testActions() throws InterruptedException { + public void testActionsSuccess() throws InterruptedException { // Test for success Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class); @@ -47,27 +47,27 @@ public class ActionsTest { java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class); Actions.Action action1 = spy( - Actions.Action.builder() - .actionName("action1") - .numRetries(10) - .sleepBetweenInvocationsMs(100) - .supplier(supplier1) - .continueOn(true) - .onFail(onFail) - .onSuccess(onSucess) - .build()); + Actions.Action.builder() + .actionName("action1") + .numRetries(10) + .sleepBetweenInvocationsMs(100) + .supplier(supplier1) + .continueOn(true) + .onFail(onFail) + .onSuccess(onSucess) + .build()); Actions.Action action2 = spy( - Actions.Action.builder() - .actionName("action2") - .numRetries(20) - .sleepBetweenInvocationsMs(200) - .supplier(supplier2) - .build()); + Actions.Action.builder() + .actionName("action2") + .numRetries(20) + .sleepBetweenInvocationsMs(200) + .supplier(supplier2) + .build()); Actions actions = Actions.newBuilder() - .addAction(action1) - .addAction(action2); + .addAction(action1) + .addAction(action2); actions.run(); assertEquals(actions.numActions(), 2); @@ -75,42 +75,44 @@ public class ActionsTest { verify(onFail, times(0)).accept(any()); verify(onSucess, times(1)).accept(any()); verify(supplier2, times(1)).get(); + } - // test only run 1 action - supplier1 = mock(Supplier.class); + @Test + public void testActionsOneAction() throws InterruptedException { + // test only run 1 action + Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class); when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); - supplier2 = mock(Supplier.class); + Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class); when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); - onFail = mock(java.util.function.Consumer.class); - onSucess = mock(java.util.function.Consumer.class); - - action1 = spy( - Actions.Action.builder() - .actionName("action1") - .numRetries(10) - .sleepBetweenInvocationsMs(100) - .supplier(supplier1) - .continueOn(false) - .onFail(onFail) - .onSuccess(onSucess) - .build()); - - action2 = spy( - Actions.Action.builder() - .actionName("action2") - .numRetries(20) - .sleepBetweenInvocationsMs(200) - .supplier(supplier2) - .onFail(onFail) - .onSuccess(onSucess) - .build()); - - actions = Actions.newBuilder() - .addAction(action1) - .addAction(action2); + java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class); + java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class); + + Actions.Action action1 = spy( + Actions.Action.builder() + .actionName("action1") + .numRetries(10) + .sleepBetweenInvocationsMs(100) + .supplier(supplier1) + .continueOn(false) + .onFail(onFail) + .onSuccess(onSucess) + .build()); + Actions.Action action2 = spy( + Actions.Action.builder() + .actionName("action2") + .numRetries(20) + .sleepBetweenInvocationsMs(200) + .supplier(supplier2) + .onFail(onFail) + .onSuccess(onSucess) + .build()); + + Actions actions = Actions.newBuilder() + .addAction(action1) + .addAction(action2); actions.run(); assertEquals(actions.numActions(), 2); @@ -118,40 +120,44 @@ public class ActionsTest { verify(onFail, times(0)).accept(any()); verify(onSucess, times(1)).accept(any()); verify(supplier2, times(0)).get(); + } - // test retry + @Test + public void testActionsRetry() throws InterruptedException { + + // test retry - supplier1 = mock(Supplier.class); + Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class); when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(false).build()); - supplier2 = mock(Supplier.class); + Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class); when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); - onFail = mock(java.util.function.Consumer.class); - onSucess = mock(java.util.function.Consumer.class); - - action1 = spy( - Actions.Action.builder() - .actionName("action1") - .numRetries(10) - .sleepBetweenInvocationsMs(10) - .supplier(supplier1) - .continueOn(false) - .onFail(onFail) - .onSuccess(onSucess) - .build()); - - action2 = spy( - Actions.Action.builder() - .actionName("action2") - .numRetries(20) - .sleepBetweenInvocationsMs(200) - .supplier(supplier2) - .build()); - - actions = Actions.newBuilder() - .addAction(action1) - .addAction(action2); + java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class); + java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class); + + Actions.Action action1 = spy( + Actions.Action.builder() + .actionName("action1") + .numRetries(10) + .sleepBetweenInvocationsMs(10) + .supplier(supplier1) + .continueOn(false) + .onFail(onFail) + .onSuccess(onSucess) + .build()); + + Actions.Action action2 = spy( + Actions.Action.builder() + .actionName("action2") + .numRetries(20) + .sleepBetweenInvocationsMs(200) + .supplier(supplier2) + .build()); + + Actions actions = Actions.newBuilder() + .addAction(action1) + .addAction(action2); actions.run(); assertEquals(actions.numActions(), 2); @@ -159,6 +165,47 @@ public class ActionsTest { verify(onFail, times(1)).accept(any()); verify(onSucess, times(0)).accept(any()); verify(supplier2, times(1)).get(); + } + @Test + public void testActionsNoContinueOn() throws InterruptedException { + // No continueOn + Supplier<Actions.ActionResult>supplier1 = mock(Supplier.class); + when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); + + Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class); + when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); + + java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class); + java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class); + + Actions.Action action1 = spy( + Actions.Action.builder() + .actionName("action1") + .numRetries(10) + .sleepBetweenInvocationsMs(100) + .supplier(supplier1) + .onFail(onFail) + .onSuccess(onSucess) + .build()); + + Actions.Action action2 = spy( + Actions.Action.builder() + .actionName("action2") + .numRetries(20) + .sleepBetweenInvocationsMs(200) + .supplier(supplier2) + .build()); + + Actions actions = Actions.newBuilder() + .addAction(action1) + .addAction(action2); + actions.run(); + + assertEquals(actions.numActions(), 2); + verify(supplier1, times(1)).get(); + verify(onFail, times(0)).accept(any()); + verify(onSucess, times(1)).accept(any()); + verify(supplier2, times(1)).get(); } -} +} \ No newline at end of file