This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch retry_creation
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/retry_creation by this push:
     new 8092542  Use Action based retry mechanism
8092542 is described below

commit 809254240b3e0c03b7cd1cc18efc442d33dbf3d9
Author: Sanjeev Kulkarni <sanj...@streaml.io>
AuthorDate: Mon Mar 4 10:15:50 2019 -0800

    Use Action based retry mechanism
---
 .../functions/runtime/KubernetesRuntime.java       |  87 +++++++--------
 .../pulsar/functions/runtime/RuntimeUtils.java     | 105 ------------------
 .../org/apache/pulsar/functions/utils/Actions.java | 121 +++++++++++++++++++++
 .../pulsar/functions/utils/ActionsTest.java}       |  43 ++++----
 .../pulsar/functions/worker/FunctionActioner.java  |  14 +--
 .../pulsar/functions/worker/SchedulerManager.java  |  56 ++++++----
 6 files changed, 230 insertions(+), 196 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 3fc0c69..3c0468d 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -60,6 +60,7 @@ import 
org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Utils;
 
@@ -360,7 +361,7 @@ public class KubernetesRuntime implements Runtime {
 
         String fqfn = 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
 
-        RuntimeUtils.Actions.Action createService = 
RuntimeUtils.Actions.Action.builder()
+        Actions.Action createService = Actions.Action.builder()
                 .actionName(String.format("Submitting service for function 
%s", fqfn))
                 .numRetries(NUM_RETRIES)
                 .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -372,25 +373,25 @@ public class KubernetesRuntime implements Runtime {
                         // already exists
                         if (e.getCode() == HTTP_CONFLICT) {
                             log.warn("Service already present for function 
{}", fqfn);
-                            return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                            return 
Actions.ActionResult.builder().success(true).build();
                         }
 
                         String errorMsg = e.getResponseBody() != null ? 
e.getResponseBody() : e.getMessage();
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(errorMsg)
                                 .build();
                     }
 
-                    return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                    return 
Actions.ActionResult.builder().success(true).build();
                 })
                 .build();
 
 
         AtomicBoolean success = new AtomicBoolean(false);
-        RuntimeUtils.Actions.newBuilder()
+        Actions.newBuilder()
                 .addAction(createService.toBuilder()
-                        .onSuccess(() -> success.set(true))
+                        .onSuccess((ignored) -> success.set(true))
                         .build())
                 .run();
 
@@ -432,7 +433,7 @@ public class KubernetesRuntime implements Runtime {
 
         String fqfn = 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
 
-        RuntimeUtils.Actions.Action createStatefulSet = 
RuntimeUtils.Actions.Action.builder()
+        Actions.Action createStatefulSet = Actions.Action.builder()
                 .actionName(String.format("Submitting statefulset for function 
%s", fqfn))
                 .numRetries(NUM_RETRIES)
                 .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -444,25 +445,25 @@ public class KubernetesRuntime implements Runtime {
                         // already exists
                         if (e.getCode() == HTTP_CONFLICT) {
                             log.warn("Statefulset already present for function 
{}", fqfn);
-                            return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                            return 
Actions.ActionResult.builder().success(true).build();
                         }
 
                         String errorMsg = e.getResponseBody() != null ? 
e.getResponseBody() : e.getMessage();
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(errorMsg)
                                 .build();
                     }
 
-                    return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                    return 
Actions.ActionResult.builder().success(true).build();
                 })
                 .build();
 
 
         AtomicBoolean success = new AtomicBoolean(false);
-        RuntimeUtils.Actions.newBuilder()
+        Actions.newBuilder()
                 .addAction(createStatefulSet.toBuilder()
-                        .onSuccess(() -> success.set(true))
+                        .onSuccess((ignored) -> success.set(true))
                         .build())
                 .run();
 
@@ -479,7 +480,7 @@ public class KubernetesRuntime implements Runtime {
         options.setPropagationPolicy("Foreground");
 
         String fqfn = 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
-        RuntimeUtils.Actions.Action deleteStatefulSet = 
RuntimeUtils.Actions.Action.builder()
+        Actions.Action deleteStatefulSet = Actions.Action.builder()
                 .actionName(String.format("Deleting statefulset for function 
%s", fqfn))
                 .numRetries(NUM_RETRIES)
                 .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -498,16 +499,16 @@ public class KubernetesRuntime implements Runtime {
                         // if already deleted
                         if (e.getCode() == HTTP_NOT_FOUND) {
                             log.warn("Statefulset for function {} does not 
exist", fqfn);
-                            return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                            return 
Actions.ActionResult.builder().success(true).build();
                         }
 
                         String errorMsg = e.getResponseBody() != null ? 
e.getResponseBody() : e.getMessage();
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(errorMsg)
                                 .build();
                     } catch (IOException e) {
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(e.getMessage())
                                 .build();
@@ -516,9 +517,9 @@ public class KubernetesRuntime implements Runtime {
                     // if already deleted
                     if (response.code() == HTTP_NOT_FOUND) {
                         log.warn("Statefulset for function {} does not exist", 
fqfn);
-                        return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                        return 
Actions.ActionResult.builder().success(true).build();
                     } else {
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(response.isSuccessful())
                                 .errorMsg(response.message())
                                 .build();
@@ -527,7 +528,7 @@ public class KubernetesRuntime implements Runtime {
                 .build();
 
 
-        RuntimeUtils.Actions.Action waitForStatefulSetDeletion = 
RuntimeUtils.Actions.Action.builder()
+        Actions.Action waitForStatefulSetDeletion = Actions.Action.builder()
                 .actionName(String.format("Waiting for statefulset for 
function %s to complete deletion", fqfn))
                 // set retry period to be about 2x the graceshutdown time
                 .numRetries(NUM_RETRIES * 2)
@@ -540,16 +541,16 @@ public class KubernetesRuntime implements Runtime {
                     } catch (ApiException e) {
                         // statefulset is gone
                         if (e.getCode() == HTTP_NOT_FOUND) {
-                            return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                            return 
Actions.ActionResult.builder().success(true).build();
                         }
 
                         String errorMsg = e.getResponseBody() != null ? 
e.getResponseBody() : e.getMessage();
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(errorMsg)
                                 .build();
                     }
-                    return RuntimeUtils.Actions.ActionResult.builder()
+                    return Actions.ActionResult.builder()
                             .success(false)
                             .errorMsg(response.getStatus().toString())
                             .build();
@@ -557,7 +558,7 @@ public class KubernetesRuntime implements Runtime {
                 .build();
 
         // Need to wait for all pods to die so we can cleanup subscriptions.
-        RuntimeUtils.Actions.Action waitForStatefulPodsToTerminate = 
RuntimeUtils.Actions.Action.builder()
+        Actions.Action waitForStatefulPodsToTerminate = 
Actions.Action.builder()
                 .actionName(String.format("Waiting for pods for function %s to 
terminate", fqfn))
                 .numRetries(NUM_RETRIES * 2)
                 .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS * 2)
@@ -575,19 +576,19 @@ public class KubernetesRuntime implements Runtime {
                     } catch (ApiException e) {
 
                         String errorMsg = e.getResponseBody() != null ? 
e.getResponseBody() : e.getMessage();
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(errorMsg)
                                 .build();
                     }
 
                     if (response.getItems().size() > 0) {
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(response.getItems().size() + " pods 
still alive.")
                                 .build();
                     } else {
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(true)
                                 .build();
                     }
@@ -596,19 +597,19 @@ public class KubernetesRuntime implements Runtime {
 
 
         AtomicBoolean success = new AtomicBoolean(false);
-        RuntimeUtils.Actions.newBuilder()
+        Actions.newBuilder()
                 .addAction(deleteStatefulSet.toBuilder()
                         .continueOn(true)
                         .build())
                 .addAction(waitForStatefulSetDeletion.toBuilder()
                         .continueOn(false)
-                        .onSuccess(() -> success.set(true))
+                        .onSuccess((ignored) -> success.set(true))
                         .build())
                 .addAction(deleteStatefulSet.toBuilder()
                         .continueOn(true)
                         .build())
                 .addAction(waitForStatefulSetDeletion.toBuilder()
-                        .onSuccess(() -> success.set(true))
+                        .onSuccess((ignored) -> success.set(true))
                         .build())
                 .run();
 
@@ -616,7 +617,7 @@ public class KubernetesRuntime implements Runtime {
             throw new RuntimeException(String.format("Failed to delete 
statefulset for function %s", fqfn));
         } else {
             // wait for pods to terminate
-            RuntimeUtils.Actions.newBuilder()
+            Actions.newBuilder()
                     .addAction(waitForStatefulPodsToTerminate)
                     .run();
         }
@@ -630,7 +631,7 @@ public class KubernetesRuntime implements Runtime {
         String fqfn = 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
         String serviceName = 
createJobName(instanceConfig.getFunctionDetails());
 
-        RuntimeUtils.Actions.Action deleteService = 
RuntimeUtils.Actions.Action.builder()
+        Actions.Action deleteService = Actions.Action.builder()
                 .actionName(String.format("Deleting service for function %s", 
fqfn))
                 .numRetries(NUM_RETRIES)
                 .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -648,16 +649,16 @@ public class KubernetesRuntime implements Runtime {
                         // if already deleted
                         if (e.getCode() == HTTP_NOT_FOUND) {
                             log.warn("Service for function {} does not exist", 
fqfn);
-                            return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                            return 
Actions.ActionResult.builder().success(true).build();
                         }
 
                         String errorMsg = e.getResponseBody() != null ? 
e.getResponseBody() : e.getMessage();
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(errorMsg)
                                 .build();
                     } catch (IOException e) {
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(e.getMessage())
                                 .build();
@@ -666,9 +667,9 @@ public class KubernetesRuntime implements Runtime {
                     // if already deleted
                     if (response.code() == HTTP_NOT_FOUND) {
                         log.warn("Service for function {} does not exist", 
fqfn);
-                        return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                        return 
Actions.ActionResult.builder().success(true).build();
                     } else {
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(response.isSuccessful())
                                 .errorMsg(response.message())
                                 .build();
@@ -676,7 +677,7 @@ public class KubernetesRuntime implements Runtime {
                 })
                 .build();
 
-        RuntimeUtils.Actions.Action waitForServiceDeletion = 
RuntimeUtils.Actions.Action.builder()
+        Actions.Action waitForServiceDeletion = Actions.Action.builder()
                 .actionName(String.format("Waiting for statefulset for 
function %s to complete deletion", fqfn))
                 .numRetries(NUM_RETRIES)
                 .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -689,15 +690,15 @@ public class KubernetesRuntime implements Runtime {
                     } catch (ApiException e) {
                         // statefulset is gone
                         if (e.getCode() == HTTP_NOT_FOUND) {
-                            return 
RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+                            return 
Actions.ActionResult.builder().success(true).build();
                         }
                         String errorMsg = e.getResponseBody() != null ? 
e.getResponseBody() : e.getMessage();
-                        return RuntimeUtils.Actions.ActionResult.builder()
+                        return Actions.ActionResult.builder()
                                 .success(false)
                                 .errorMsg(errorMsg)
                                 .build();
                     }
-                    return RuntimeUtils.Actions.ActionResult.builder()
+                    return Actions.ActionResult.builder()
                             .success(false)
                             .errorMsg(response.getStatus().toString())
                             .build();
@@ -705,19 +706,19 @@ public class KubernetesRuntime implements Runtime {
                 .build();
 
         AtomicBoolean success = new AtomicBoolean(false);
-        RuntimeUtils.Actions.newBuilder()
+        Actions.newBuilder()
                 .addAction(deleteService.toBuilder()
                         .continueOn(true)
                         .build())
                 .addAction(waitForServiceDeletion.toBuilder()
                         .continueOn(false)
-                        .onSuccess(() -> success.set(true))
+                        .onSuccess((ignored) -> success.set(true))
                         .build())
                 .addAction(deleteService.toBuilder()
                         .continueOn(true)
                         .build())
                 .addAction(waitForServiceDeletion.toBuilder()
-                        .onSuccess(() -> success.set(true))
+                        .onSuccess((ignored) -> success.set(true))
                         .build())
                 .run();
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 9862b0a..95f10ae 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -258,109 +258,4 @@ public class RuntimeUtils {
         return result.toString();
     }
 
-    public static class Actions {
-        private List<Action> actions = new LinkedList<>();
-
-        @Data
-        @Builder(toBuilder=true)
-        public static class Action {
-            private String actionName;
-            private int numRetries = 1;
-            private Supplier<ActionResult> supplier;
-            private long sleepBetweenInvocationsMs = 500;
-            private Boolean continueOn;
-            private Runnable onFail;
-            private Runnable onSuccess;
-
-            public void verifyAction() {
-                if (isBlank(actionName)) {
-                    throw new RuntimeException("Action name is empty!");
-                }
-                if (supplier == null) {
-                    throw new RuntimeException("Supplier is not specified!");
-                }
-            }
-        }
-
-        @Data
-        @Builder
-        public static class ActionResult {
-            private boolean success;
-            private String errorMsg;
-        }
-
-        private Actions() {
-
-        }
-
-
-        public Actions addAction(Action action) {
-            action.verifyAction();
-            this.actions.add(action);
-            return this;
-        }
-
-        public static Actions newBuilder() {
-            return new Actions();
-        }
-
-        public int numActions() {
-            return actions.size();
-        }
-
-        public void run() throws InterruptedException {
-            Iterator<Action> it = this.actions.iterator();
-            while(it.hasNext()) {
-                Action action  = it.next();
-
-                boolean success;
-                try {
-                    success = runAction(action);
-                } catch (Exception e) {
-                    log.error("Uncaught exception thrown when running action [ 
{} ]:", action.getActionName(), e);
-                    success = false;
-                }
-                if (action.getContinueOn() != null
-                        && success == action.getContinueOn()) {
-                    continue;
-                } else {
-                    // terminate
-                    break;
-                }
-            }
-        }
-
-        private boolean runAction(Action action) throws InterruptedException {
-            for (int i = 0; i< action.getNumRetries(); i++) {
-
-                ActionResult actionResult = action.getSupplier().get();
-
-                if (actionResult.isSuccess()) {
-                    log.info("Sucessfully completed action [ {} ]", 
action.getActionName());
-                    if (action.getOnSuccess() != null) {
-                        action.getOnSuccess().run();
-                    }
-                    return true;
-                } else {
-                    if (actionResult.getErrorMsg() != null) {
-                        log.warn("Error completing action [ {} ] :- {} - 
[ATTEMPT] {}/{}",
-                                action.getActionName(),
-                                actionResult.getErrorMsg(),
-                                i + 1, action.getNumRetries());
-                    } else {
-                        log.warn("Error completing action [ {} ] [ATTEMPT] 
{}/{}",
-                                action.getActionName(),
-                                i + 1, action.getNumRetries());
-                    }
-
-                    Thread.sleep(action.sleepBetweenInvocationsMs);
-                }
-            }
-            log.error("Failed completing action [ {} ]. Giving up!", 
action.getActionName());
-            if (action.getOnFail() != null) {
-                action.getOnFail().run();
-            }
-            return false;
-        }
-    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
new file mode 100644
index 0000000..640a977
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
@@ -0,0 +1,121 @@
+package org.apache.pulsar.functions.utils;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+@Slf4j
+public class Actions {
+    private List<Action> actions = new LinkedList<>();
+
+    @Data
+    @Builder(toBuilder=true)
+    public static class Action {
+        private String actionName;
+        private int numRetries = 1;
+        private Supplier<ActionResult> supplier;
+        private long sleepBetweenInvocationsMs = 500;
+        private Boolean continueOn;
+        private Consumer<ActionResult> onFail;
+        private Consumer<ActionResult> onSuccess;
+
+        public void verifyAction() {
+            if (isBlank(actionName)) {
+                throw new RuntimeException("Action name is empty!");
+            }
+            if (supplier == null) {
+                throw new RuntimeException("Supplier is not specified!");
+            }
+        }
+    }
+
+    @Data
+    @Builder
+    public static class ActionResult {
+        private boolean success;
+        private String errorMsg;
+        private Object result;
+    }
+
+    private Actions() {
+
+    }
+
+
+    public Actions addAction(Action action) {
+        action.verifyAction();
+        this.actions.add(action);
+        return this;
+    }
+
+    public static Actions newBuilder() {
+        return new Actions();
+    }
+
+    public int numActions() {
+        return actions.size();
+    }
+
+    public void run() throws InterruptedException {
+        Iterator<Action> it = this.actions.iterator();
+        while(it.hasNext()) {
+            Action action  = it.next();
+
+            boolean success;
+            try {
+                success = runAction(action);
+            } catch (Exception e) {
+                log.error("Uncaught exception thrown when running action [ {} 
]:", action.getActionName(), e);
+                success = false;
+            }
+            if (action.getContinueOn() != null
+                    && success == action.getContinueOn()) {
+                continue;
+            } else {
+                // terminate
+                break;
+            }
+        }
+    }
+
+    private boolean runAction(Action action) throws InterruptedException {
+        for (int i = 0; i< action.getNumRetries(); i++) {
+
+            ActionResult actionResult = action.getSupplier().get();
+
+            if (actionResult.isSuccess()) {
+                log.info("Sucessfully completed action [ {} ]", 
action.getActionName());
+                if (action.getOnSuccess() != null) {
+                    action.getOnSuccess().accept(actionResult);
+                }
+                return true;
+            } else {
+                if (actionResult.getErrorMsg() != null) {
+                    log.warn("Error completing action [ {} ] :- {} - [ATTEMPT] 
{}/{}",
+                            action.getActionName(),
+                            actionResult.getErrorMsg(),
+                            i + 1, action.getNumRetries());
+                } else {
+                    log.warn("Error completing action [ {} ] [ATTEMPT] {}/{}",
+                            action.getActionName(),
+                            i + 1, action.getNumRetries());
+                }
+
+                Thread.sleep(action.sleepBetweenInvocationsMs);
+            }
+        }
+        log.error("Failed completing action [ {} ]. Giving up!", 
action.getActionName());
+        if (action.getOnFail() != null) {
+            action.getOnFail().accept(action.getSupplier().get());
+        }
+        return false;
+    }
+}
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
similarity index 75%
rename from 
pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
rename to 
pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
index 5d78599..2ada089 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.utils;
 
+import org.apache.pulsar.functions.utils.Actions;
 import org.testng.annotations.Test;
 
 import java.util.function.Supplier;
@@ -29,23 +30,23 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
-public class RuntimeUtilsTest {
+public class ActionsTest {
 
     @Test
     public void testActions() throws InterruptedException {
 
         // Test for success
-        Supplier<RuntimeUtils.Actions.ActionResult> supplier1 = 
mock(Supplier.class);
-        
when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+        Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
+        
when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
-        Supplier<RuntimeUtils.Actions.ActionResult> supplier2 = 
mock(Supplier.class);
-        
when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+        Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class);
+        
when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
         Runnable onFail = mock(Runnable.class);
         Runnable onSucess = mock(Runnable.class);
 
-        RuntimeUtils.Actions.Action action1 = spy(
-                RuntimeUtils.Actions.Action.builder()
+        Actions.Action action1 = spy(
+                Actions.Action.builder()
                         .actionName("action1")
                         .numRetries(10)
                         .sleepBetweenInvocationsMs(100)
@@ -55,15 +56,15 @@ public class RuntimeUtilsTest {
                         .onSuccess(onSucess)
                         .build());
 
-        RuntimeUtils.Actions.Action action2 = spy(
-                RuntimeUtils.Actions.Action.builder()
+        Actions.Action action2 = spy(
+                Actions.Action.builder()
                         .actionName("action2")
                         .numRetries(20)
                         .sleepBetweenInvocationsMs(200)
                         .supplier(supplier2)
                         .build());
 
-        RuntimeUtils.Actions actions = RuntimeUtils.Actions.newBuilder()
+        Actions actions = Actions.newBuilder()
                 .addAction(action1)
                 .addAction(action2);
         actions.run();
@@ -77,16 +78,16 @@ public class RuntimeUtilsTest {
         // test only run 1 action
 
         supplier1 = mock(Supplier.class);
-        
when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+        
when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
         supplier2 = mock(Supplier.class);
-        
when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+        
when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
         onFail = mock(Runnable.class);
         onSucess = mock(Runnable.class);
 
         action1 = spy(
-                RuntimeUtils.Actions.Action.builder()
+                Actions.Action.builder()
                         .actionName("action1")
                         .numRetries(10)
                         .sleepBetweenInvocationsMs(100)
@@ -97,7 +98,7 @@ public class RuntimeUtilsTest {
                         .build());
 
         action2 = spy(
-                RuntimeUtils.Actions.Action.builder()
+                Actions.Action.builder()
                         .actionName("action2")
                         .numRetries(20)
                         .sleepBetweenInvocationsMs(200)
@@ -106,7 +107,7 @@ public class RuntimeUtilsTest {
                         .onSuccess(onSucess)
                         .build());
 
-        actions = RuntimeUtils.Actions.newBuilder()
+        actions = Actions.newBuilder()
                 .addAction(action1)
                 .addAction(action2);
         actions.run();
@@ -120,16 +121,16 @@ public class RuntimeUtilsTest {
         // test retry
 
         supplier1 = mock(Supplier.class);
-        
when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(false).build());
+        
when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(false).build());
 
         supplier2 = mock(Supplier.class);
-        
when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+        
when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
         onFail = mock(Runnable.class);
         onSucess = mock(Runnable.class);
 
         action1 = spy(
-                RuntimeUtils.Actions.Action.builder()
+                Actions.Action.builder()
                         .actionName("action1")
                         .numRetries(10)
                         .sleepBetweenInvocationsMs(10)
@@ -140,14 +141,14 @@ public class RuntimeUtilsTest {
                         .build());
 
         action2 = spy(
-                RuntimeUtils.Actions.Action.builder()
+                Actions.Action.builder()
                         .actionName("action2")
                         .numRetries(20)
                         .sleepBetweenInvocationsMs(200)
                         .supplier(supplier2)
                         .build());
 
-        actions = RuntimeUtils.Actions.newBuilder()
+        actions = Actions.newBuilder()
                 .addAction(action1)
                 .addAction(action2);
         actions.run();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 1d1014e..ae006cc 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.RuntimeUtils;
+import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
@@ -61,8 +61,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -283,9 +281,9 @@ public class FunctionActioner {
                             : 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
 
                     try {
-                        RuntimeUtils.Actions.newBuilder()
+                        Actions.newBuilder()
                                 .addAction(
-                                        RuntimeUtils.Actions.Action.builder()
+                                        Actions.Action.builder()
                                                 
.actionName(String.format("Cleaning up subscriptions for function %s", fqfn))
                                                 .numRetries(10)
                                                 
.sleepBetweenInvocationsMs(1000)
@@ -300,7 +298,7 @@ public class FunctionActioner {
                                                         }
                                                     } catch 
(PulsarAdminException e) {
                                                         if (e instanceof 
PulsarAdminException.NotFoundException) {
-                                                            return 
RuntimeUtils.Actions.ActionResult.builder()
+                                                            return 
Actions.ActionResult.builder()
                                                                     
.success(true)
                                                                     .build();
                                                         } else {
@@ -319,14 +317,14 @@ public class FunctionActioner {
                                                             }
 
                                                             String errorMsg = 
e.getHttpError() != null ? e.getHttpError() : e.getMessage();
-                                                            return 
RuntimeUtils.Actions.ActionResult.builder()
+                                                            return 
Actions.ActionResult.builder()
                                                                     
.success(false)
                                                                     
.errorMsg(String.format("%s - existing consumers: %s", errorMsg, 
existingConsumers))
                                                                     .build();
                                                         }
                                                     }
 
-                                                    return 
RuntimeUtils.Actions.ActionResult.builder()
+                                                    return 
Actions.ActionResult.builder()
                                                             .success(true)
                                                             .build();
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index f50acc3..e19258c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -46,6 +47,7 @@ import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.Instance;
+import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
@@ -96,26 +98,42 @@ public class SchedulerManager implements AutoCloseable {
     }
 
     private static Producer<byte[]> createProducer(PulsarClient client, 
WorkerConfig config) {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        for (int i = 0; i < 6; i++) {
-            try {
-                return 
client.newProducer().topic(config.getFunctionAssignmentTopic())
-                    .enableBatching(false)
-                    .blockIfQueueFull(true)
-                    .compressionType(CompressionType.LZ4)
-                    .sendTimeout(0, TimeUnit.MILLISECONDS)
-                    .createAsync().get(10, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.error("Exception while at creating producer to topic {}", 
config.getFunctionAssignmentTopic(), e);
-            }
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException e) {
-            }
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating producer for assignment 
topic %s", config.getFunctionAssignmentTopic()))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = 
client.newProducer().topic(config.getFunctionAssignmentTopic())
+                                .enableBatching(false)
+                                .blockIfQueueFull(true)
+                                .compressionType(CompressionType.LZ4)
+                                .sendTimeout(0, TimeUnit.MILLISECONDS)
+                                .createAsync().get(10, TimeUnit.SECONDS);
+                        return 
Actions.ActionResult.builder().success(true).result(producer).build();
+                    } catch (Exception e) {
+                        log.error("Exception while at creating producer to 
topic {}", config.getFunctionAssignmentTopic(), e);
+                        return Actions.ActionResult.builder()
+                                .success(false)
+                                .build();
+                    }
+                })
+                .build();
+        AtomicReference<Producer<byte[]>> producer = new AtomicReference<>();
+        try {
+            Actions.newBuilder()
+                    .addAction(createProducerAction.toBuilder()
+                            .onSuccess((actionResult) -> 
producer.set((Producer<byte[]>) actionResult.getResult()))
+                            .build())
+                    .run();
+        } catch (InterruptedException e) {
+
+        }
+        if (producer.get() == null) {
+            throw new RuntimeException("Can't create a producer on assignment 
topic "
+                    + config.getFunctionAssignmentTopic());
         }
-        throw new RuntimeException("Can't create a producer on assignment 
topic "
-            + config.getFunctionAssignmentTopic() + " in " + 
stopwatch.elapsed(TimeUnit.SECONDS)
-            + " seconds, fail fast ...");
+        return producer.get();
     }
 
     public Future<?> schedule() {

Reply via email to