This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5740699 Cleanup logic in JavaInstanceRunnable close method (#3932) 5740699 is described below commit 5740699b834439651dc57c87edac2a3002202921 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Mar 28 16:23:30 2019 -0700 Cleanup logic in JavaInstanceRunnable close method (#3932) * Cleanup logic in JavaInstanceRunnable close method * Added comments --- .../functions/instance/JavaInstanceRunnable.java | 25 ++++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index e210eb6..8a60537 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -456,11 +456,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { return record; } + /** + * NOTE: this method is be syncrhonized because it is potentially called by two different places + * one inside the run/finally clause and one inside the ThreadRuntime::stop + */ @Override - public void close() { + synchronized public void close() { if (stats != null) { stats.close(); + stats = null; } if (source != null) { @@ -468,8 +473,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { source.close(); } catch (Throwable e) { log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); - } + source = null; } if (sink != null) { @@ -478,10 +483,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } catch (Throwable e) { log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); } + sink = null; } if (null != javaInstance) { javaInstance.close(); + javaInstance = null; } // kill the state table @@ -495,13 +502,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { log.warn("Failed to close state storage client", cause); return null; }); + storageClient = null; } - // once the thread quits, clean up the instance - fnCache.unregisterFunctionInstance( - instanceConfig.getFunctionId(), - instanceConfig.getInstanceName()); - log.info("Unloading JAR files for function {}", instanceConfig); + if (instanceCache != null) { + // once the thread quits, clean up the instance + fnCache.unregisterFunctionInstance( + instanceConfig.getFunctionId(), + instanceConfig.getInstanceName()); + log.info("Unloading JAR files for function {}", instanceConfig); + instanceCache = null; + } } public InstanceCommunication.MetricsData getAndResetMetrics() {