Author: cwiklik Date: Fri Jul 8 19:53:41 2011 New Revision: 1144465 URL: http://svn.apache.org/viewvc?rev=1144465&view=rev Log: UIMA-2162 Fixes Races Condition. Modified to block parent CAS from calling Flow Controller's next() method before all its child CASes obtain their Flow objects.
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=1144465&r1=1144464&r2=1144465&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java Fri Jul 8 19:53:41 2011 @@ -30,6 +30,7 @@ import javax.jms.Message; import javax.jms.Session; import org.apache.uima.UIMAFramework; +import org.apache.uima.aae.InProcessCache.CacheEntry; import org.apache.uima.aae.UIMAEE_Constants; import org.apache.uima.aae.UimaAsThreadFactory; import org.apache.uima.aae.UimaBlockingExecutor; @@ -179,6 +180,13 @@ public class ConcurrentMessageListener i CasStateEntry parentEntry = controller.getLocalCache().lookupEntry(parentCasReferenceId); // increment number of child CASes this parent has in play parentEntry.incrementSubordinateCasInPlayCount(); + // increment a counter that counts number of child CASes that have no + // flow object yet. The flow object is created for each child CAS from + // the parent flow object. The method below will actually acquire a + // permit from a binary semaphore to force the parent to block until + // the last of its children acquires its Flow object. + parentEntry.incrementOutstandingFlowCounter(); + } catch (Exception e) { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1144465&r1=1144464&r2=1144465&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Jul 8 19:53:41 2011 @@ -920,6 +920,7 @@ public class AggregateAnalysisEngineCont // be dropped in the delegate. Check Final Step logic. getInProcessCache().getCacheEntryForCAS(aNewCasReferenceId).setNewCas(true, getComponentName()); + getLocalCache().lookupEntry(anInputCasReferenceId).decrementOutstandingFlowCounter(); } else { throw new AsynchAEException( "Flow Object Not In Flow Cache. Expected Flow Object in FlowCache for Cas Reference Id:" @@ -1107,18 +1108,27 @@ public class AggregateAnalysisEngineCont Endpoint lastDelegateEndpoint = casStateEntry.getLastDelegate().getEndpoint(); // Check if this delegate is a Cas Multiplier and the parent CAS is to be processed last casStateEntry.setReplyReceived(); - if (lastDelegateEndpoint.isCasMultiplier() && lastDelegateEndpoint.processParentLast()) { - synchronized (super.finalStepMux) { - // Determine if the CAS should be held until all its children leave this aggregate. - if (casStateEntry.getSubordinateCasInPlayCount() > 0) { - // This input CAS has child CASes still in play. It will remain in the cache - // until the last of the child CASes is released. Only than, the input CAS is - // is allowed to continue into the next step in the flow. - // The CAS has to be in final state - casStateEntry.setState(CacheEntry.FINAL_STATE); - // The input CAS will be interned until all children leave this aggregate - return; + if (lastDelegateEndpoint.isCasMultiplier()){ + // The following blocks until all child CASes acquire their Flow objects from the Flow + // Controller. Release the semaphore immediately after acquiring it. This semaphore is + // no longer needed. This synchronization is only necessary for blocking the parent + // CAS until all child CASes acquire their Flow objects. + casStateEntry.acquireFlowSemaphore(); + casStateEntry.releaseFlowSemaphore(); + if ( lastDelegateEndpoint.processParentLast()) { + synchronized (super.finalStepMux) { + // Determine if the CAS should be held until all its children leave this aggregate. + if (casStateEntry.getSubordinateCasInPlayCount() > 0) { + // This input CAS has child CASes still in play. It will remain in the cache + // until the last of the child CASes is released. Only than, the input CAS is + // is allowed to continue into the next step in the flow. + // The CAS has to be in final state + casStateEntry.setState(CacheEntry.FINAL_STATE); + // The input CAS will be interned until all children leave this aggregate + return; + } } + } } } @@ -2122,6 +2132,7 @@ public class AggregateAnalysisEngineCont parentController.getLocalCache().lookupEntry(inputCasId); if ( parentCasStateEntry != null ) { parentCasStateEntry.incrementSubordinateCasInPlayCount(); + parentCasStateEntry.incrementOutstandingFlowCounter(); } } } Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1144465&r1=1144464&r2=1144465&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Fri Jul 8 19:53:41 2011 @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.uima.UIMAFramework; import org.apache.uima.aae.UIMAEE_Constants; @@ -218,6 +220,17 @@ public class LocalCache extends Concurre private String hostIpProcessingCAS; + // Binary semaphore used to make sure that child CASes obtain their Flow objects + // before the parent CAS is handled by the Flow Controller. The single permit + // is acquired by a child CAS when childCasOutstandingFlowCounter=0. + private Semaphore flowSemaphore = new Semaphore(1); + + // Counts how many child CASes need their Flow objects. Incremented immediately when a child + // CAS arrives at the aggregate and decremented right after the flow is computed from the + // parent CAS flow. + private AtomicInteger childCasOutstandingFlowCounter = new AtomicInteger(); + + public String getHostIpProcessingCAS() { return hostIpProcessingCAS; } @@ -379,5 +392,53 @@ public class LocalCache extends Concurre public List<Throwable> getErrors() { return exceptionList; } + /** + * Grab a permit from a binary semaphore blocking if the permit is not + * available. + * + * @throws InterruptedException - thrown if thread is interrupted + */ + public void acquireFlowSemaphore() throws InterruptedException { + flowSemaphore.acquire(); + } + /** + * Release a permit to the binary semaphore. + */ + public void releaseFlowSemaphore() { + flowSemaphore.release(); + } + /** + * Count the number of child CASes that have not yet obtained their Flow + * object. If this is the first child with no Flow object, grab a permit + * from a binary semaphore preventing parent CAS from calling Flow Controller's + * next() method. + * + */ + public void incrementOutstandingFlowCounter() { + synchronized( flowSemaphore ) { + if ( childCasOutstandingFlowCounter.incrementAndGet() == 1 ) { + try { + acquireFlowSemaphore(); + } catch( InterruptedException e) { + } + } + } + } + /** + * Decrement the number of child CASes with no Flow object. Releases + * the permit to the binary semaphore if all child CASes in-play obtain + * their Flow objects. This releases the parent CAS which can than continue + * with its Flow. + */ + public void decrementOutstandingFlowCounter() { + synchronized( flowSemaphore ) { + if( flowSemaphore.availablePermits() == 0 ) { + if ( childCasOutstandingFlowCounter.get() > 0 ) { + childCasOutstandingFlowCounter.decrementAndGet(); + } + releaseFlowSemaphore(); + } + } + } } } Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1144465&r1=1144464&r2=1144465&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Jul 8 19:53:41 2011 @@ -23,6 +23,7 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.Map.Entry; @@ -50,6 +51,7 @@ import org.apache.uima.aae.spi.transport import org.apache.uima.aae.spi.transport.UimaTransport; import org.apache.uima.analysis_engine.AnalysisEngine; import org.apache.uima.analysis_engine.AnalysisEngineDescription; +import org.apache.uima.analysis_engine.AnalysisEngineManagement; import org.apache.uima.analysis_engine.CasIterator; import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData; import org.apache.uima.cas.CAS; @@ -171,7 +173,8 @@ public class PrimitiveAnalysisEngineCont sharedInitSemaphore.acquire(); // Parse the descriptor in the calling thread. rSpecifier = UimaClassFactory.produceResourceSpecifier(super.aeDescriptor); - AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, paramsMap); + + AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, paramsMap); // Call to produceAnalysisEngine() may take a long time to complete. While this // method was executing, the service may have been stopped. Before continuing // check if the service has been stopped. If so, destroy AE instance and return. @@ -517,7 +520,7 @@ public class PrimitiveAnalysisEngineCont stackDumpTimer = null; // nullify timer instance so that we dont have to worry about // it in case an exception happens below } - + // Store how long it took to call processAndOutputNewCASes() totalProcessTime = (super.getCpuTime() - time); long sequence = 1; @@ -657,6 +660,7 @@ public class PrimitiveAnalysisEngineCont childCasStateEntry.setInputCasReferenceId(aCasReferenceId); // Increment number of child CASes generated from the input CAS parentCasStateEntry.incrementSubordinateCasInPlayCount(); + parentCasStateEntry.incrementOutstandingFlowCounter(); // Associate input CAS with the new CAS newEntry.setInputCasReferenceId(aCasReferenceId);