This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/retry_creation by this push: new 8092542 Use Action based retry mechanism 8092542 is described below commit 809254240b3e0c03b7cd1cc18efc442d33dbf3d9 Author: Sanjeev Kulkarni <sanj...@streaml.io> AuthorDate: Mon Mar 4 10:15:50 2019 -0800 Use Action based retry mechanism --- .../functions/runtime/KubernetesRuntime.java | 87 +++++++-------- .../pulsar/functions/runtime/RuntimeUtils.java | 105 ------------------ .../org/apache/pulsar/functions/utils/Actions.java | 121 +++++++++++++++++++++ .../pulsar/functions/utils/ActionsTest.java} | 43 ++++---- .../pulsar/functions/worker/FunctionActioner.java | 14 +-- .../pulsar/functions/worker/SchedulerManager.java | 56 ++++++---- 6 files changed, 230 insertions(+), 196 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 3fc0c69..3c0468d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -60,6 +60,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceControlGrpc; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Utils; @@ -360,7 +361,7 @@ public class KubernetesRuntime implements Runtime { String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()); - RuntimeUtils.Actions.Action createService = RuntimeUtils.Actions.Action.builder() + Actions.Action createService = Actions.Action.builder() .actionName(String.format("Submitting service for function %s", fqfn)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) @@ -372,25 +373,25 @@ public class KubernetesRuntime implements Runtime { // already exists if (e.getCode() == HTTP_CONFLICT) { log.warn("Service already present for function {}", fqfn); - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); }) .build(); AtomicBoolean success = new AtomicBoolean(false); - RuntimeUtils.Actions.newBuilder() + Actions.newBuilder() .addAction(createService.toBuilder() - .onSuccess(() -> success.set(true)) + .onSuccess((ignored) -> success.set(true)) .build()) .run(); @@ -432,7 +433,7 @@ public class KubernetesRuntime implements Runtime { String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()); - RuntimeUtils.Actions.Action createStatefulSet = RuntimeUtils.Actions.Action.builder() + Actions.Action createStatefulSet = Actions.Action.builder() .actionName(String.format("Submitting statefulset for function %s", fqfn)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) @@ -444,25 +445,25 @@ public class KubernetesRuntime implements Runtime { // already exists if (e.getCode() == HTTP_CONFLICT) { log.warn("Statefulset already present for function {}", fqfn); - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); }) .build(); AtomicBoolean success = new AtomicBoolean(false); - RuntimeUtils.Actions.newBuilder() + Actions.newBuilder() .addAction(createStatefulSet.toBuilder() - .onSuccess(() -> success.set(true)) + .onSuccess((ignored) -> success.set(true)) .build()) .run(); @@ -479,7 +480,7 @@ public class KubernetesRuntime implements Runtime { options.setPropagationPolicy("Foreground"); String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()); - RuntimeUtils.Actions.Action deleteStatefulSet = RuntimeUtils.Actions.Action.builder() + Actions.Action deleteStatefulSet = Actions.Action.builder() .actionName(String.format("Deleting statefulset for function %s", fqfn)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) @@ -498,16 +499,16 @@ public class KubernetesRuntime implements Runtime { // if already deleted if (e.getCode() == HTTP_NOT_FOUND) { log.warn("Statefulset for function {} does not exist", fqfn); - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } catch (IOException e) { - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(e.getMessage()) .build(); @@ -516,9 +517,9 @@ public class KubernetesRuntime implements Runtime { // if already deleted if (response.code() == HTTP_NOT_FOUND) { log.warn("Statefulset for function {} does not exist", fqfn); - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } else { - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(response.isSuccessful()) .errorMsg(response.message()) .build(); @@ -527,7 +528,7 @@ public class KubernetesRuntime implements Runtime { .build(); - RuntimeUtils.Actions.Action waitForStatefulSetDeletion = RuntimeUtils.Actions.Action.builder() + Actions.Action waitForStatefulSetDeletion = Actions.Action.builder() .actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn)) // set retry period to be about 2x the graceshutdown time .numRetries(NUM_RETRIES * 2) @@ -540,16 +541,16 @@ public class KubernetesRuntime implements Runtime { } catch (ApiException e) { // statefulset is gone if (e.getCode() == HTTP_NOT_FOUND) { - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(response.getStatus().toString()) .build(); @@ -557,7 +558,7 @@ public class KubernetesRuntime implements Runtime { .build(); // Need to wait for all pods to die so we can cleanup subscriptions. - RuntimeUtils.Actions.Action waitForStatefulPodsToTerminate = RuntimeUtils.Actions.Action.builder() + Actions.Action waitForStatefulPodsToTerminate = Actions.Action.builder() .actionName(String.format("Waiting for pods for function %s to terminate", fqfn)) .numRetries(NUM_RETRIES * 2) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS * 2) @@ -575,19 +576,19 @@ public class KubernetesRuntime implements Runtime { } catch (ApiException e) { String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } if (response.getItems().size() > 0) { - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(response.getItems().size() + " pods still alive.") .build(); } else { - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(true) .build(); } @@ -596,19 +597,19 @@ public class KubernetesRuntime implements Runtime { AtomicBoolean success = new AtomicBoolean(false); - RuntimeUtils.Actions.newBuilder() + Actions.newBuilder() .addAction(deleteStatefulSet.toBuilder() .continueOn(true) .build()) .addAction(waitForStatefulSetDeletion.toBuilder() .continueOn(false) - .onSuccess(() -> success.set(true)) + .onSuccess((ignored) -> success.set(true)) .build()) .addAction(deleteStatefulSet.toBuilder() .continueOn(true) .build()) .addAction(waitForStatefulSetDeletion.toBuilder() - .onSuccess(() -> success.set(true)) + .onSuccess((ignored) -> success.set(true)) .build()) .run(); @@ -616,7 +617,7 @@ public class KubernetesRuntime implements Runtime { throw new RuntimeException(String.format("Failed to delete statefulset for function %s", fqfn)); } else { // wait for pods to terminate - RuntimeUtils.Actions.newBuilder() + Actions.newBuilder() .addAction(waitForStatefulPodsToTerminate) .run(); } @@ -630,7 +631,7 @@ public class KubernetesRuntime implements Runtime { String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()); String serviceName = createJobName(instanceConfig.getFunctionDetails()); - RuntimeUtils.Actions.Action deleteService = RuntimeUtils.Actions.Action.builder() + Actions.Action deleteService = Actions.Action.builder() .actionName(String.format("Deleting service for function %s", fqfn)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) @@ -648,16 +649,16 @@ public class KubernetesRuntime implements Runtime { // if already deleted if (e.getCode() == HTTP_NOT_FOUND) { log.warn("Service for function {} does not exist", fqfn); - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } catch (IOException e) { - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(e.getMessage()) .build(); @@ -666,9 +667,9 @@ public class KubernetesRuntime implements Runtime { // if already deleted if (response.code() == HTTP_NOT_FOUND) { log.warn("Service for function {} does not exist", fqfn); - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } else { - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(response.isSuccessful()) .errorMsg(response.message()) .build(); @@ -676,7 +677,7 @@ public class KubernetesRuntime implements Runtime { }) .build(); - RuntimeUtils.Actions.Action waitForServiceDeletion = RuntimeUtils.Actions.Action.builder() + Actions.Action waitForServiceDeletion = Actions.Action.builder() .actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) @@ -689,15 +690,15 @@ public class KubernetesRuntime implements Runtime { } catch (ApiException e) { // statefulset is gone if (e.getCode() == HTTP_NOT_FOUND) { - return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); + return Actions.ActionResult.builder().success(true).build(); } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(response.getStatus().toString()) .build(); @@ -705,19 +706,19 @@ public class KubernetesRuntime implements Runtime { .build(); AtomicBoolean success = new AtomicBoolean(false); - RuntimeUtils.Actions.newBuilder() + Actions.newBuilder() .addAction(deleteService.toBuilder() .continueOn(true) .build()) .addAction(waitForServiceDeletion.toBuilder() .continueOn(false) - .onSuccess(() -> success.set(true)) + .onSuccess((ignored) -> success.set(true)) .build()) .addAction(deleteService.toBuilder() .continueOn(true) .build()) .addAction(waitForServiceDeletion.toBuilder() - .onSuccess(() -> success.set(true)) + .onSuccess((ignored) -> success.set(true)) .build()) .run(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 9862b0a..95f10ae 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -258,109 +258,4 @@ public class RuntimeUtils { return result.toString(); } - public static class Actions { - private List<Action> actions = new LinkedList<>(); - - @Data - @Builder(toBuilder=true) - public static class Action { - private String actionName; - private int numRetries = 1; - private Supplier<ActionResult> supplier; - private long sleepBetweenInvocationsMs = 500; - private Boolean continueOn; - private Runnable onFail; - private Runnable onSuccess; - - public void verifyAction() { - if (isBlank(actionName)) { - throw new RuntimeException("Action name is empty!"); - } - if (supplier == null) { - throw new RuntimeException("Supplier is not specified!"); - } - } - } - - @Data - @Builder - public static class ActionResult { - private boolean success; - private String errorMsg; - } - - private Actions() { - - } - - - public Actions addAction(Action action) { - action.verifyAction(); - this.actions.add(action); - return this; - } - - public static Actions newBuilder() { - return new Actions(); - } - - public int numActions() { - return actions.size(); - } - - public void run() throws InterruptedException { - Iterator<Action> it = this.actions.iterator(); - while(it.hasNext()) { - Action action = it.next(); - - boolean success; - try { - success = runAction(action); - } catch (Exception e) { - log.error("Uncaught exception thrown when running action [ {} ]:", action.getActionName(), e); - success = false; - } - if (action.getContinueOn() != null - && success == action.getContinueOn()) { - continue; - } else { - // terminate - break; - } - } - } - - private boolean runAction(Action action) throws InterruptedException { - for (int i = 0; i< action.getNumRetries(); i++) { - - ActionResult actionResult = action.getSupplier().get(); - - if (actionResult.isSuccess()) { - log.info("Sucessfully completed action [ {} ]", action.getActionName()); - if (action.getOnSuccess() != null) { - action.getOnSuccess().run(); - } - return true; - } else { - if (actionResult.getErrorMsg() != null) { - log.warn("Error completing action [ {} ] :- {} - [ATTEMPT] {}/{}", - action.getActionName(), - actionResult.getErrorMsg(), - i + 1, action.getNumRetries()); - } else { - log.warn("Error completing action [ {} ] [ATTEMPT] {}/{}", - action.getActionName(), - i + 1, action.getNumRetries()); - } - - Thread.sleep(action.sleepBetweenInvocationsMs); - } - } - log.error("Failed completing action [ {} ]. Giving up!", action.getActionName()); - if (action.getOnFail() != null) { - action.getOnFail().run(); - } - return false; - } - } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java new file mode 100644 index 0000000..640a977 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java @@ -0,0 +1,121 @@ +package org.apache.pulsar.functions.utils; + +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.commons.lang3.StringUtils.isBlank; + +@Slf4j +public class Actions { + private List<Action> actions = new LinkedList<>(); + + @Data + @Builder(toBuilder=true) + public static class Action { + private String actionName; + private int numRetries = 1; + private Supplier<ActionResult> supplier; + private long sleepBetweenInvocationsMs = 500; + private Boolean continueOn; + private Consumer<ActionResult> onFail; + private Consumer<ActionResult> onSuccess; + + public void verifyAction() { + if (isBlank(actionName)) { + throw new RuntimeException("Action name is empty!"); + } + if (supplier == null) { + throw new RuntimeException("Supplier is not specified!"); + } + } + } + + @Data + @Builder + public static class ActionResult { + private boolean success; + private String errorMsg; + private Object result; + } + + private Actions() { + + } + + + public Actions addAction(Action action) { + action.verifyAction(); + this.actions.add(action); + return this; + } + + public static Actions newBuilder() { + return new Actions(); + } + + public int numActions() { + return actions.size(); + } + + public void run() throws InterruptedException { + Iterator<Action> it = this.actions.iterator(); + while(it.hasNext()) { + Action action = it.next(); + + boolean success; + try { + success = runAction(action); + } catch (Exception e) { + log.error("Uncaught exception thrown when running action [ {} ]:", action.getActionName(), e); + success = false; + } + if (action.getContinueOn() != null + && success == action.getContinueOn()) { + continue; + } else { + // terminate + break; + } + } + } + + private boolean runAction(Action action) throws InterruptedException { + for (int i = 0; i< action.getNumRetries(); i++) { + + ActionResult actionResult = action.getSupplier().get(); + + if (actionResult.isSuccess()) { + log.info("Sucessfully completed action [ {} ]", action.getActionName()); + if (action.getOnSuccess() != null) { + action.getOnSuccess().accept(actionResult); + } + return true; + } else { + if (actionResult.getErrorMsg() != null) { + log.warn("Error completing action [ {} ] :- {} - [ATTEMPT] {}/{}", + action.getActionName(), + actionResult.getErrorMsg(), + i + 1, action.getNumRetries()); + } else { + log.warn("Error completing action [ {} ] [ATTEMPT] {}/{}", + action.getActionName(), + i + 1, action.getNumRetries()); + } + + Thread.sleep(action.sleepBetweenInvocationsMs); + } + } + log.error("Failed completing action [ {} ]. Giving up!", action.getActionName()); + if (action.getOnFail() != null) { + action.getOnFail().accept(action.getSupplier().get()); + } + return false; + } +} diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java similarity index 75% rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java rename to pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java index 5d78599..2ada089 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.runtime; +package org.apache.pulsar.functions.utils; +import org.apache.pulsar.functions.utils.Actions; import org.testng.annotations.Test; import java.util.function.Supplier; @@ -29,23 +30,23 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -public class RuntimeUtilsTest { +public class ActionsTest { @Test public void testActions() throws InterruptedException { // Test for success - Supplier<RuntimeUtils.Actions.ActionResult> supplier1 = mock(Supplier.class); - when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build()); + Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class); + when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); - Supplier<RuntimeUtils.Actions.ActionResult> supplier2 = mock(Supplier.class); - when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build()); + Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class); + when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); Runnable onFail = mock(Runnable.class); Runnable onSucess = mock(Runnable.class); - RuntimeUtils.Actions.Action action1 = spy( - RuntimeUtils.Actions.Action.builder() + Actions.Action action1 = spy( + Actions.Action.builder() .actionName("action1") .numRetries(10) .sleepBetweenInvocationsMs(100) @@ -55,15 +56,15 @@ public class RuntimeUtilsTest { .onSuccess(onSucess) .build()); - RuntimeUtils.Actions.Action action2 = spy( - RuntimeUtils.Actions.Action.builder() + Actions.Action action2 = spy( + Actions.Action.builder() .actionName("action2") .numRetries(20) .sleepBetweenInvocationsMs(200) .supplier(supplier2) .build()); - RuntimeUtils.Actions actions = RuntimeUtils.Actions.newBuilder() + Actions actions = Actions.newBuilder() .addAction(action1) .addAction(action2); actions.run(); @@ -77,16 +78,16 @@ public class RuntimeUtilsTest { // test only run 1 action supplier1 = mock(Supplier.class); - when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build()); + when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); supplier2 = mock(Supplier.class); - when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build()); + when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); onFail = mock(Runnable.class); onSucess = mock(Runnable.class); action1 = spy( - RuntimeUtils.Actions.Action.builder() + Actions.Action.builder() .actionName("action1") .numRetries(10) .sleepBetweenInvocationsMs(100) @@ -97,7 +98,7 @@ public class RuntimeUtilsTest { .build()); action2 = spy( - RuntimeUtils.Actions.Action.builder() + Actions.Action.builder() .actionName("action2") .numRetries(20) .sleepBetweenInvocationsMs(200) @@ -106,7 +107,7 @@ public class RuntimeUtilsTest { .onSuccess(onSucess) .build()); - actions = RuntimeUtils.Actions.newBuilder() + actions = Actions.newBuilder() .addAction(action1) .addAction(action2); actions.run(); @@ -120,16 +121,16 @@ public class RuntimeUtilsTest { // test retry supplier1 = mock(Supplier.class); - when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(false).build()); + when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(false).build()); supplier2 = mock(Supplier.class); - when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build()); + when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); onFail = mock(Runnable.class); onSucess = mock(Runnable.class); action1 = spy( - RuntimeUtils.Actions.Action.builder() + Actions.Action.builder() .actionName("action1") .numRetries(10) .sleepBetweenInvocationsMs(10) @@ -140,14 +141,14 @@ public class RuntimeUtilsTest { .build()); action2 = spy( - RuntimeUtils.Actions.Action.builder() + Actions.Action.builder() .actionName("action2") .numRetries(20) .sleepBetweenInvocationsMs(200) .supplier(supplier2) .build()); - actions = RuntimeUtils.Actions.newBuilder() + actions = Actions.newBuilder() .addAction(action1) .addAction(action2); actions.run(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 1d1014e..ae006cc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -45,7 +45,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.functions.runtime.RuntimeUtils; +import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; @@ -61,8 +61,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -283,9 +281,9 @@ public class FunctionActioner { : functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName(); try { - RuntimeUtils.Actions.newBuilder() + Actions.newBuilder() .addAction( - RuntimeUtils.Actions.Action.builder() + Actions.Action.builder() .actionName(String.format("Cleaning up subscriptions for function %s", fqfn)) .numRetries(10) .sleepBetweenInvocationsMs(1000) @@ -300,7 +298,7 @@ public class FunctionActioner { } } catch (PulsarAdminException e) { if (e instanceof PulsarAdminException.NotFoundException) { - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(true) .build(); } else { @@ -319,14 +317,14 @@ public class FunctionActioner { } String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage(); - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(false) .errorMsg(String.format("%s - existing consumers: %s", errorMsg, existingConsumers)) .build(); } } - return RuntimeUtils.Actions.ActionResult.builder() + return Actions.ActionResult.builder() .success(true) .build(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index f50acc3..e19258c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -46,6 +47,7 @@ import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.Instance; +import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.scheduler.IScheduler; @@ -96,26 +98,42 @@ public class SchedulerManager implements AutoCloseable { } private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) { - Stopwatch stopwatch = Stopwatch.createStarted(); - for (int i = 0; i < 6; i++) { - try { - return client.newProducer().topic(config.getFunctionAssignmentTopic()) - .enableBatching(false) - .blockIfQueueFull(true) - .compressionType(CompressionType.LZ4) - .sendTimeout(0, TimeUnit.MILLISECONDS) - .createAsync().get(10, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("Exception while at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); - } - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - } + Actions.Action createProducerAction = Actions.Action.builder() + .actionName(String.format("Creating producer for assignment topic %s", config.getFunctionAssignmentTopic())) + .numRetries(5) + .sleepBetweenInvocationsMs(10000) + .supplier(() -> { + try { + Producer<byte[]> producer = client.newProducer().topic(config.getFunctionAssignmentTopic()) + .enableBatching(false) + .blockIfQueueFull(true) + .compressionType(CompressionType.LZ4) + .sendTimeout(0, TimeUnit.MILLISECONDS) + .createAsync().get(10, TimeUnit.SECONDS); + return Actions.ActionResult.builder().success(true).result(producer).build(); + } catch (Exception e) { + log.error("Exception while at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); + return Actions.ActionResult.builder() + .success(false) + .build(); + } + }) + .build(); + AtomicReference<Producer<byte[]>> producer = new AtomicReference<>(); + try { + Actions.newBuilder() + .addAction(createProducerAction.toBuilder() + .onSuccess((actionResult) -> producer.set((Producer<byte[]>) actionResult.getResult())) + .build()) + .run(); + } catch (InterruptedException e) { + + } + if (producer.get() == null) { + throw new RuntimeException("Can't create a producer on assignment topic " + + config.getFunctionAssignmentTopic()); } - throw new RuntimeException("Can't create a producer on assignment topic " - + config.getFunctionAssignmentTopic() + " in " + stopwatch.elapsed(TimeUnit.SECONDS) - + " seconds, fail fast ..."); + return producer.get(); } public Future<?> schedule() {