This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 31f7d70 Allow stats operations not to be blocked in functions (#9005) 31f7d70 is described below commit 31f7d70bd4fdde4c8a272714dece8d806e31ebae Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sun Dec 20 11:14:18 2020 -0800 Allow stats operations not to be blocked in functions (#9005) Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../functions/instance/JavaInstanceRunnable.java | 133 +++++++++++++-------- 1 file changed, 85 insertions(+), 48 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index fa68656..fe476f7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -29,6 +29,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; @@ -86,7 +89,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // input topic consumer & output topic producer private final PulsarClientImpl client; - //private final Map<String, PulsarClient> pulsarClientMap; private LogAppender logAppender; @@ -122,6 +124,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private ClassLoader functionClassLoader; private String narExtractionDirectory; + // a flog to determine if member variables have been initialized as part of setup(). + // used for out of band API calls like operations involving stats + private transient boolean isInitialized = false; + + // a read write lock for stats operations + private ReadWriteLock statsLock = new ReentrantReadWriteLock(); + public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, @@ -216,6 +225,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { setupLogHandler(); javaInstance = new JavaInstance(contextImpl, object, instanceConfig); + + // to signal member variables are initialized + isInitialized = true; } ContextImpl setupContext() { @@ -404,12 +416,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } /** - * NOTE: this method is be syncrhonized because it is potentially called by two different places + * NOTE: this method is be synchronized because it is potentially called by two different places * one inside the run/finally clause and one inside the ThreadRuntime::stop */ @Override synchronized public void close() { + isInitialized = false; + if (stats != null) { stats.close(); stats = null; @@ -466,49 +480,67 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } - synchronized public String getStatsAsString() throws IOException { - if (stats != null) { - return stats.getStatsAsString(); - } else { - return ""; + public String getStatsAsString() throws IOException { + if (isInitialized) { + try { + statsLock.readLock().lock(); + return stats.getStatsAsString(); + } finally { + statsLock.readLock().unlock(); + } } + return ""; } - // This method is synchronized because it is using the stats variable - synchronized public InstanceCommunication.MetricsData getAndResetMetrics() { - InstanceCommunication.MetricsData metricsData = internalGetMetrics(); - internalResetMetrics(); - return metricsData; + public InstanceCommunication.MetricsData getAndResetMetrics() { + if (isInitialized) { + try { + statsLock.writeLock().lock(); + InstanceCommunication.MetricsData metricsData = internalGetMetrics(); + internalResetMetrics(); + return metricsData; + } finally { + statsLock.writeLock().unlock(); + } + } + return InstanceCommunication.MetricsData.getDefaultInstance(); } - // This method is synchronized because it is using the stats and javaInstance variables - synchronized public InstanceCommunication.MetricsData getMetrics() { - return internalGetMetrics(); + public InstanceCommunication.MetricsData getMetrics() { + if (isInitialized) { + try { + statsLock.readLock().lock(); + return internalGetMetrics(); + } finally { + statsLock.readLock().unlock(); + } + } + return InstanceCommunication.MetricsData.getDefaultInstance(); } - // This method is synchronized because it is using the stats and javaInstance variables - synchronized public void resetMetrics() { - internalResetMetrics(); + public void resetMetrics() { + if (isInitialized) { + try { + statsLock.writeLock().lock(); + internalResetMetrics(); + } finally { + statsLock.writeLock().unlock(); + } + } } private InstanceCommunication.MetricsData internalGetMetrics() { InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); - if (javaInstance != null) { - Map<String, Double> userMetrics = javaInstance.getMetrics(); - if (userMetrics != null) { - bldr.putAllUserMetrics(userMetrics); - } + Map<String, Double> userMetrics = javaInstance.getMetrics(); + if (userMetrics != null) { + bldr.putAllUserMetrics(userMetrics); } return bldr.build(); } private void internalResetMetrics() { - if (stats != null) { stats.reset(); - } - if (javaInstance != null) { javaInstance.resetMetrics(); - } } private Builder createMetricsDataBuilder() { @@ -531,28 +563,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { return bldr; } - // This method is synchronized because it is using the stats variable - synchronized public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { + public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); - if (stats != null) { - functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived()); - functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully()); - functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions()); - stats.getLatestUserExceptions().forEach(ex -> { - functionStatusBuilder.addLatestUserExceptions(ex); - }); - functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions()); - stats.getLatestSystemExceptions().forEach(ex -> { - functionStatusBuilder.addLatestSystemExceptions(ex); - }); - stats.getLatestSourceExceptions().forEach(ex -> { - functionStatusBuilder.addLatestSourceExceptions(ex); - }); - stats.getLatestSinkExceptions().forEach(ex -> { - functionStatusBuilder.addLatestSinkExceptions(ex); - }); - functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency()); - functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation()); + if (isInitialized) { + try { + statsLock.readLock().lock(); + + functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived()); + functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully()); + functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions()); + stats.getLatestUserExceptions().forEach(ex -> { + functionStatusBuilder.addLatestUserExceptions(ex); + }); + functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions()); + stats.getLatestSystemExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSystemExceptions(ex); + }); + stats.getLatestSourceExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSourceExceptions(ex); + }); + stats.getLatestSinkExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSinkExceptions(ex); + }); + functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency()); + functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation()); + } finally { + statsLock.readLock().unlock(); + } } return functionStatusBuilder; }