Author: cwiklik Date: Mon May 20 17:12:02 2019 New Revision: 1859570 URL: http://svn.apache.org/viewvc?rev=1859570&view=rev Log: UIMA-6046 modified to always release noWorkLock to prevent a hang
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1859570&r1=1859569&r2=1859570&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java Mon May 20 17:12:02 2019 @@ -220,40 +220,45 @@ public class DefaultServiceProtocolHandl } IMetaTaskTransaction metaTransaction = null; - while (running) { - metaTransaction = sendAndReceive(transaction); - if (metaTransaction.getMetaTask() != null - && metaTransaction.getMetaTask().getUserSpaceTask() != null) { - return metaTransaction; - } + try { + while (running) { + metaTransaction = sendAndReceive(transaction); + if (metaTransaction.getMetaTask() != null + && metaTransaction.getMetaTask().getUserSpaceTask() != null) { + return metaTransaction; + } - // If the first thread to get the lock poll for work and unlock when work found - // If don't immediately get the lock then wait for the lock to be released when - // work becomes available, - // and immediately release the lock and loop back to retry - boolean firstLocker = noWorkLock.tryLock(); - if (!firstLocker) { - noWorkLock.lock(); - noWorkLock.unlock(); - continue; - } + // If the first thread to get the lock poll for work and unlock when work found + // If don't immediately get the lock then wait for the lock to be released when + // work becomes available, + // and immediately release the lock and loop back to retry + boolean firstLocker = noWorkLock.tryLock(); + if (!firstLocker) { + noWorkLock.lock(); + noWorkLock.unlock(); + continue; + } - // If the first one here hold the lock and sleep before retrying - if (logger.isLoggable(Level.INFO)) { - logger.log(Level.INFO, "Driver is out of tasks - waiting for " - + noTaskStrategy.getWaitTimeInMillis() + "ms before trying again "); - } - while (running) { - noTaskStrategy.handleNoTaskSupplied(); - metaTransaction = sendAndReceive(transaction); - if (metaTransaction.getMetaTask() != null - && metaTransaction.getMetaTask().getUserSpaceTask() != null) { - noWorkLock.unlock(); - return metaTransaction; - } - } + // If the first one here hold the lock and sleep before retrying + if (logger.isLoggable(Level.INFO)) { + logger.log(Level.INFO, "Driver is out of tasks - waiting for " + + noTaskStrategy.getWaitTimeInMillis() + "ms before trying again "); + } + while (running) { + noTaskStrategy.handleNoTaskSupplied(); + metaTransaction = sendAndReceive(transaction); + if (metaTransaction.getMetaTask() != null + && metaTransaction.getMetaTask().getUserSpaceTask() != null) { + noWorkLock.unlock(); + return metaTransaction; + } + } + } + } finally { + if ( noWorkLock.isHeldByCurrentThread() ) { + noWorkLock.unlock(); + } } - ; return metaTransaction; // When shutting down } @@ -374,7 +379,8 @@ public class DefaultServiceProtocolHandl @Override public void run() { - delegateStop(); + //delegateStop(); + stop(); } }).start(); running = false; @@ -454,7 +460,7 @@ public class DefaultServiceProtocolHandl logger.log(Level.INFO, this.getClass().getName() + " quiesceAndStop() called"); // change state of transport to not running but keep connection open // so that other threads can quiesce (send results) - transport.stop(true); +// transport.stop(true); quiescing = true; running = false; @@ -471,6 +477,7 @@ public class DefaultServiceProtocolHandl try { // wait for process threads to terminate stopLatch.await(); + transport.stop(true); } catch (Exception e) { } Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java?rev=1859570&r1=1859569&r2=1859570&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java Mon May 20 17:12:02 2019 @@ -55,7 +55,6 @@ import org.apache.uima.ducc.ps.service.t import org.apache.uima.ducc.ps.service.transport.ITargetURI; import org.apache.uima.ducc.ps.service.transport.TransportException; import org.apache.uima.ducc.ps.service.transport.TransportStats; -import org.apache.uima.ducc.ps.service.transport.XStreamUtils; import org.apache.uima.ducc.ps.service.transport.target.NoOpTargetURI; import org.apache.uima.ducc.ps.service.transport.target.TargetURIFactory; import org.apache.uima.ducc.ps.service.utils.Utils; @@ -63,7 +62,6 @@ import org.apache.uima.util.Level; import org.apache.uima.util.Logger; import com.thoughtworks.xstream.XStream; -import com.thoughtworks.xstream.io.xml.DomDriver; public class HttpServiceTransport implements IServiceTransport { private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class); @@ -106,14 +104,7 @@ public class HttpServiceTransport implem private volatile boolean log = true; private AtomicLong xstreamTime = new AtomicLong(); - // private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, - // XStream>>() { - // @Override - // protected HashMap<Long, XStream> initialValue() { - // return new HashMap<Long, XStream>(); - // } - // }; - + public HttpServiceTransport(IRegistryClient registryClient, int scaleout) throws ServiceException { this.registryClient = registryClient; @@ -227,15 +218,10 @@ public class HttpServiceTransport implem } private void addCommonHeaders(HttpPost method) { - // synchronized( HttpServiceTransport.class ) { - method.setHeader("IP", nodeIP); method.setHeader("Hostname", nodeName); method.setHeader("ThreadID", String.valueOf(Thread.currentThread().getId())); method.setHeader("PID", pid); - - // } - } private HttpEntity wrapRequest(String serializedRequest) { @@ -356,6 +342,9 @@ public class HttpServiceTransport implem simulatedException); mockExceptionGenerator.throwSimulatedException(); } else { +// if ( stopping ) { +// throw new TransportException("Service is Stopping "); +// } transaction = doPost(postMethod, localXStream); } } catch (IOException | URISyntaxException ex) { @@ -397,9 +386,10 @@ public class HttpServiceTransport implem System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(this.getClass()) + " stop() called - mode:" + (quiesce == true ? "quiesce" : "stop")); logger.log(Level.INFO, this.getClass().getName() + " stop() called"); - System.out.println(" ########################################3 Total time in XStream:" - + (xstreamTime.get() / 1000) + " secs"); if (!quiesce && cMgr != null) { + System.out.println(Utils.getTimestamp() + ">>>>>>> " + + Utils.getShortClassname(this.getClass()) + " stopping connection mgr"); + cMgr.shutdown(); System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(this.getClass()) + " stopped connection mgr");