This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ded9086  Consolidate timer threads in functions (#3109)
ded9086 is described below

commit ded9086237ef6f8cf5d17676a2056bb7ff2daad3
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Dec 3 17:47:13 2018 -0800

    Consolidate timer threads in functions (#3109)
    
    * Use instanceCache schedulerExecutorService for timer
    
    * fixing process runtime
    
    * addressing comments
---
 .../functions/instance/FunctionStatsManager.java   |  5 ++-
 .../pulsar/functions/instance/InstanceCache.java   | 13 +++++--
 .../functions/instance/JavaInstanceRunnable.java   |  2 +-
 .../pulsar/functions/runtime/JavaInstanceMain.java | 28 +++++++--------
 .../pulsar/functions/runtime/ProcessRuntime.java   | 27 +++++++-------
 .../pulsar/functions/runtime/RuntimeSpawner.java   | 41 +++++++++++-----------
 6 files changed, 61 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index f9a3c77..9059f79 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -393,6 +393,9 @@ public class FunctionStatsManager implements AutoCloseable {
 
     @Override
     public void close() {
-        scheduledFuture.cancel(false);
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduledFuture = null;
+        }
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 937c273..fe7e049 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -18,17 +18,24 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Getter;
+
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
 public class InstanceCache {
 
     private static InstanceCache instance;
 
-    public final ScheduledExecutorService executor;
+    @Getter
+    private final ScheduledExecutorService scheduledExecutorService;
 
     private InstanceCache() {
-        executor = Executors.newSingleThreadScheduledExecutor();
+        ThreadFactory namedThreadFactory =
+                new DefaultThreadFactory("function-timer-thread");
+        scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
     }
 
     public static InstanceCache getInstanceCache() {
@@ -43,7 +50,7 @@ public class InstanceCache {
     public static void shutdown() {
         synchronized (InstanceCache.class) {
             if (instance != null) {
-                instance.executor.shutdown();
+                instance.scheduledExecutorService.shutdown();
             }
             instance = null;
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d302e6a..e55fabc 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -216,7 +216,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             if (this.collectorRegistry == null) {
                 this.collectorRegistry = new CollectorRegistry();
             }
-            this.stats = new FunctionStatsManager(this.collectorRegistry, 
this.metricsLabels, this.instanceCache.executor);
+            this.stats = new FunctionStatsManager(this.collectorRegistry, 
this.metricsLabels, this.instanceCache.getScheduledExecutorService());
 
             ContextImpl contextImpl = setupContext();
             javaInstance = setupJavaInstance(contextImpl);
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index f0be511..3a0a404 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -34,6 +34,7 @@ import io.prometheus.client.exporter.HTTPServer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -49,6 +50,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -122,8 +124,8 @@ public class JavaInstanceMain implements AutoCloseable {
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
     private Long lastHealthCheckTs = null;
-    private ScheduledExecutorService timer;
     private HTTPServer metricsServer;
+    private ScheduledFuture healthCheckTimer;
 
     public JavaInstanceMain() { }
 
@@ -215,18 +217,14 @@ public class JavaInstanceMain implements AutoCloseable {
         metricsServer = new HTTPServer(new InetSocketAddress(metrics_port), 
collectorRegistry, true);
 
         if (expectedHealthCheckInterval > 0) {
-            timer = Executors.newSingleThreadScheduledExecutor();
-            timer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        if (System.currentTimeMillis() - lastHealthCheckTs > 3 
* expectedHealthCheckInterval * 1000) {
-                            log.info("Haven't received health check from 
spawner in a while. Stopping instance...");
-                            close();
-                        }
-                    } catch (Exception e) {
-                        log.error("Error occurred when checking for latest 
health check", e);
+            healthCheckTimer = 
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
 -> {
+                try {
+                    if (System.currentTimeMillis() - lastHealthCheckTs > 3 * 
expectedHealthCheckInterval * 1000) {
+                        log.info("Haven't received health check from spawner 
in a while. Stopping instance...");
+                        close();
                     }
+                } catch (Exception e) {
+                    log.error("Error occurred when checking for latest health 
check", e);
                 }
             }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval 
* 1000, TimeUnit.MILLISECONDS);
         }
@@ -260,8 +258,8 @@ public class JavaInstanceMain implements AutoCloseable {
             if (runtimeSpawner != null) {
                 runtimeSpawner.close();
             }
-            if (timer != null) {
-                timer.shutdown();
+            if (healthCheckTimer != null) {
+                healthCheckTimer.cancel(false);
             }
             if (containerFactory != null) {
                 containerFactory.close();
@@ -269,6 +267,8 @@ public class JavaInstanceMain implements AutoCloseable {
             if (metricsServer != null) {
                 metricsServer.stop();
             }
+
+            InstanceCache.shutdown();
         } catch (Exception ex) {
             System.err.println(ex);
         }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 4edac17..70e7a3a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -30,6 +30,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -45,6 +46,7 @@ import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -64,7 +66,7 @@ class ProcessRuntime implements Runtime {
     private Throwable deathException;
     private ManagedChannel channel;
     private InstanceControlGrpc.InstanceControlFutureStub stub;
-    private ScheduledExecutorService timer;
+    private ScheduledFuture timer;
     private InstanceConfig instanceConfig;
     private final Long expectedHealthCheckInterval;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
@@ -138,19 +140,14 @@ class ProcessRuntime implements Runtime {
                     .build();
             stub = InstanceControlGrpc.newFutureStub(channel);
 
-            timer = Executors.newSingleThreadScheduledExecutor();
-            timer.scheduleAtFixedRate(new TimerTask() {
-
-                @Override
-                public void run() {
-                    CompletableFuture<InstanceCommunication.HealthCheckResult> 
result = healthCheck();
-                    try {
-                        result.get();
-                    } catch (Exception e) {
-                        log.error("Health check failed for {}-{}",
-                                instanceConfig.getFunctionDetails().getName(),
-                                instanceConfig.getInstanceId(), e);
-                    }
+            timer = 
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
 -> {
+                CompletableFuture<InstanceCommunication.HealthCheckResult> 
result = healthCheck();
+                try {
+                    result.get();
+                } catch (Exception e) {
+                    log.error("Health check failed for {}-{}",
+                            instanceConfig.getFunctionDetails().getName(),
+                            instanceConfig.getInstanceId(), e);
                 }
             }, expectedHealthCheckInterval, expectedHealthCheckInterval, 
TimeUnit.SECONDS);
         }
@@ -164,7 +161,7 @@ class ProcessRuntime implements Runtime {
     @Override
     public void stop() {
         if (timer != null) {
-            timer.shutdown();
+            timer.cancel(false);
         }
         if (process != null) {
             process.destroyForcibly();
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 6b5abce..aa1784a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -27,12 +27,15 @@ import java.io.IOException;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -50,7 +53,7 @@ public class RuntimeSpawner implements AutoCloseable {
 
     @Getter
     private Runtime runtime;
-    private Timer processLivenessCheckTimer;
+    private ScheduledFuture processLivenessCheckTimer;
     private int numRestarts;
     private long instanceLivenessCheckFreqMs;
     private Throwable runtimeDeathException;
@@ -79,27 +82,23 @@ public class RuntimeSpawner implements AutoCloseable {
 
         // monitor function runtime to make sure it is running.  If not, 
restart the function runtime
         if (!runtimeFactory.externallyManaged() && instanceLivenessCheckFreqMs 
> 0) {
-            processLivenessCheckTimer = new Timer();
-            processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    Runtime runtime = RuntimeSpawner.this.runtime;
-                    if (runtime != null && !runtime.isAlive()) {
-                        log.error("{}/{}/{}-{} Function Container is dead with 
exception.. restarting", details.getTenant(),
-                                details.getNamespace(), details.getName(), 
runtime.getDeathException());
-                        // Just for the sake of sanity, just destroy the 
runtime
-                        try {
-                            runtime.stop();
-                            runtimeDeathException = 
runtime.getDeathException();
-                            runtime.start();
-                        } catch (Exception e) {
-                            log.error("{}/{}/{}-{} Function Restart failed", 
details.getTenant(),
-                                    details.getNamespace(), details.getName(), 
e, e);
-                        }
-                        numRestarts++;
+            processLivenessCheckTimer = 
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
 -> {
+                Runtime runtime = RuntimeSpawner.this.runtime;
+                if (runtime != null && !runtime.isAlive()) {
+                    log.error("{}/{}/{}-{} Function Container is dead with 
exception.. restarting", details.getTenant(),
+                            details.getNamespace(), details.getName(), 
runtime.getDeathException());
+                    // Just for the sake of sanity, just destroy the runtime
+                    try {
+                        runtime.stop();
+                        runtimeDeathException = runtime.getDeathException();
+                        runtime.start();
+                    } catch (Exception e) {
+                        log.error("{}/{}/{}-{} Function Restart failed", 
details.getTenant(),
+                                details.getNamespace(), details.getName(), e, 
e);
                     }
+                    numRestarts++;
                 }
-            }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
+            }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs, 
TimeUnit.MILLISECONDS);
         }
     }
 
@@ -139,7 +138,7 @@ public class RuntimeSpawner implements AutoCloseable {
     public void close() {
         // cancel liveness checker before stopping runtime.
         if (processLivenessCheckTimer != null) {
-            processLivenessCheckTimer.cancel();
+            processLivenessCheckTimer.cancel(false);
             processLivenessCheckTimer = null;
         }
         if (null != runtime) {

Reply via email to