This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5740699  Cleanup logic in JavaInstanceRunnable close method (#3932)
5740699 is described below

commit 5740699b834439651dc57c87edac2a3002202921
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Mar 28 16:23:30 2019 -0700

    Cleanup logic in JavaInstanceRunnable close method (#3932)
    
    * Cleanup logic in JavaInstanceRunnable close method
    
    * Added comments
---
 .../functions/instance/JavaInstanceRunnable.java   | 25 ++++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e210eb6..8a60537 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -456,11 +456,16 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         return record;
     }
 
+    /**
+     * NOTE: this method is be syncrhonized because it is potentially called 
by two different places
+     *       one inside the run/finally clause and one inside the 
ThreadRuntime::stop
+     */
     @Override
-    public void close() {
+    synchronized public void close() {
 
         if (stats != null) {
             stats.close();
+            stats = null;
         }
 
         if (source != null) {
@@ -468,8 +473,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                 source.close();
             } catch (Throwable e) {
                 log.error("Failed to close source {}", 
instanceConfig.getFunctionDetails().getSource().getClassName(), e);
-
             }
+            source = null;
         }
 
         if (sink != null) {
@@ -478,10 +483,12 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             } catch (Throwable e) {
                 log.error("Failed to close sink {}", 
instanceConfig.getFunctionDetails().getSource().getClassName(), e);
             }
+            sink = null;
         }
 
         if (null != javaInstance) {
             javaInstance.close();
+            javaInstance = null;
         }
 
         // kill the state table
@@ -495,13 +502,17 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     log.warn("Failed to close state storage client", cause);
                     return null;
                 });
+            storageClient = null;
         }
 
-        // once the thread quits, clean up the instance
-        fnCache.unregisterFunctionInstance(
-                instanceConfig.getFunctionId(),
-                instanceConfig.getInstanceName());
-        log.info("Unloading JAR files for function {}", instanceConfig);
+        if (instanceCache != null) {
+            // once the thread quits, clean up the instance
+            fnCache.unregisterFunctionInstance(
+                    instanceConfig.getFunctionId(),
+                    instanceConfig.getInstanceName());
+            log.info("Unloading JAR files for function {}", instanceConfig);
+            instanceCache = null;
+        }
     }
 
     public InstanceCommunication.MetricsData getAndResetMetrics() {

Reply via email to