Author: cwiklik
Date: Fri Jul  8 19:53:41 2011
New Revision: 1144465

URL: http://svn.apache.org/viewvc?rev=1144465&view=rev
Log:
UIMA-2162 Fixes Races Condition. Modified to block parent CAS from calling Flow 
Controller's next() method before all its child CASes obtain their Flow objects.

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=1144465&r1=1144464&r2=1144465&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
 Fri Jul  8 19:53:41 2011
@@ -30,6 +30,7 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaAsThreadFactory;
 import org.apache.uima.aae.UimaBlockingExecutor;
@@ -179,6 +180,13 @@ public class ConcurrentMessageListener i
           CasStateEntry parentEntry = 
controller.getLocalCache().lookupEntry(parentCasReferenceId);
           // increment number of child CASes this parent has in play
           parentEntry.incrementSubordinateCasInPlayCount();
+          //  increment a counter that counts number of child CASes that have 
no
+          //  flow object yet. The flow object is created for each child CAS 
from
+          //  the parent flow object. The method below will actually acquire a 
+          //  permit from a binary semaphore to force the parent to block until
+          //  the last of its children acquires its Flow object.
+          parentEntry.incrementOutstandingFlowCounter();
+
         } catch (Exception e) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
             

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1144465&r1=1144464&r2=1144465&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
 Fri Jul  8 19:53:41 2011
@@ -920,6 +920,7 @@ public class AggregateAnalysisEngineCont
           // be dropped in the delegate. Check Final Step logic.
           
getInProcessCache().getCacheEntryForCAS(aNewCasReferenceId).setNewCas(true,
                   getComponentName());
+          
getLocalCache().lookupEntry(anInputCasReferenceId).decrementOutstandingFlowCounter();
         } else {
           throw new AsynchAEException(
                   "Flow Object Not In Flow Cache. Expected Flow Object in 
FlowCache for Cas Reference Id:"
@@ -1107,18 +1108,27 @@ public class AggregateAnalysisEngineCont
           Endpoint lastDelegateEndpoint = 
casStateEntry.getLastDelegate().getEndpoint();
           // Check if this delegate is a Cas Multiplier and the parent CAS is 
to be processed last
           casStateEntry.setReplyReceived();
-          if (lastDelegateEndpoint.isCasMultiplier() && 
lastDelegateEndpoint.processParentLast()) {
-            synchronized (super.finalStepMux) {
-              // Determine if the CAS should be held until all its children 
leave this aggregate.
-              if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
-                // This input CAS has child CASes still in play. It will 
remain in the cache
-                // until the last of the child CASes is released. Only than, 
the input CAS is
-                // is allowed to continue into the next step in the flow.
-                // The CAS has to be in final state
-                casStateEntry.setState(CacheEntry.FINAL_STATE);
-                // The input CAS will be interned until all children leave 
this aggregate
-                return;
+          if (lastDelegateEndpoint.isCasMultiplier()){
+            //  The following blocks until all child CASes acquire their Flow 
objects from the Flow
+            //  Controller. Release the semaphore immediately after acquiring 
it. This semaphore is 
+            //  no longer needed. This synchronization is only necessary for 
blocking the parent
+            //  CAS until all child CASes acquire their Flow objects.
+            casStateEntry.acquireFlowSemaphore();
+            casStateEntry.releaseFlowSemaphore();
+            if ( lastDelegateEndpoint.processParentLast()) {
+              synchronized (super.finalStepMux) {
+                // Determine if the CAS should be held until all its children 
leave this aggregate.
+                if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
+                  // This input CAS has child CASes still in play. It will 
remain in the cache
+                  // until the last of the child CASes is released. Only than, 
the input CAS is
+                  // is allowed to continue into the next step in the flow.
+                  // The CAS has to be in final state
+                  casStateEntry.setState(CacheEntry.FINAL_STATE);
+                  // The input CAS will be interned until all children leave 
this aggregate
+                  return;
+                }
               }
+              
             }
           }
         }
@@ -2122,6 +2132,7 @@ public class AggregateAnalysisEngineCont
             parentController.getLocalCache().lookupEntry(inputCasId);
           if ( parentCasStateEntry != null ) {
             parentCasStateEntry.incrementSubordinateCasInPlayCount();
+            parentCasStateEntry.incrementOutstandingFlowCounter();
           }
         }
       }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1144465&r1=1144464&r2=1144465&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
 Fri Jul  8 19:53:41 2011
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
@@ -218,6 +220,17 @@ public class LocalCache extends Concurre
     
     private String hostIpProcessingCAS;
     
+    //  Binary semaphore used to make sure that child CASes obtain their Flow 
objects
+    //  before the parent CAS is handled by the Flow Controller. The single 
permit
+    //  is acquired by a child CAS when childCasOutstandingFlowCounter=0.    
+    private Semaphore flowSemaphore = new Semaphore(1);
+    
+    //  Counts how many child CASes need their Flow objects. Incremented 
immediately when a child
+    //  CAS arrives at the aggregate and decremented right after the flow is 
computed from the
+    //  parent CAS flow.
+    private AtomicInteger childCasOutstandingFlowCounter = new AtomicInteger();
+    
+    
     public String getHostIpProcessingCAS() {
       return hostIpProcessingCAS;
     }
@@ -379,5 +392,53 @@ public class LocalCache extends Concurre
     public List<Throwable> getErrors() {
       return exceptionList;
     }
+    /**
+     * Grab a permit from a binary semaphore blocking if the permit is not 
+     * available.
+     * 
+     * @throws InterruptedException - thrown if thread is interrupted
+     */
+    public void acquireFlowSemaphore() throws InterruptedException {
+      flowSemaphore.acquire();
+    }
+    /**
+     * Release a permit to the binary semaphore.
+     */
+    public void releaseFlowSemaphore() {
+      flowSemaphore.release();
+    }
+    /**
+     * Count the number of child CASes that have not yet obtained their Flow
+     * object. If this is the first child with no Flow object, grab a permit
+     * from a binary semaphore preventing parent CAS from calling Flow 
Controller's
+     * next() method. 
+     * 
+     */
+    public void incrementOutstandingFlowCounter() {
+      synchronized( flowSemaphore ) {
+        if ( childCasOutstandingFlowCounter.incrementAndGet() == 1  ) {
+          try {
+            acquireFlowSemaphore();
+          } catch( InterruptedException e) {
+          }
+        }
+      }
+    }
+    /**
+     * Decrement the number of child CASes with no Flow object. Releases
+     * the permit to the binary semaphore if all child CASes in-play obtain
+     * their Flow objects. This releases the parent CAS which can than continue
+     * with its Flow. 
+     */
+    public void decrementOutstandingFlowCounter() {
+      synchronized( flowSemaphore ) {
+        if( flowSemaphore.availablePermits() == 0 ) {
+          if ( childCasOutstandingFlowCounter.get() > 0 ) {
+            childCasOutstandingFlowCounter.decrementAndGet();
+          }
+          releaseFlowSemaphore();
+        }
+      }
+    }
   }
 }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1144465&r1=1144464&r2=1144465&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
 Fri Jul  8 19:53:41 2011
@@ -23,6 +23,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Map.Entry;
@@ -50,6 +51,7 @@ import org.apache.uima.aae.spi.transport
 import org.apache.uima.aae.spi.transport.UimaTransport;
 import org.apache.uima.analysis_engine.AnalysisEngine;
 import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
 import org.apache.uima.analysis_engine.CasIterator;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
@@ -171,7 +173,8 @@ public class PrimitiveAnalysisEngineCont
       sharedInitSemaphore.acquire();
       // Parse the descriptor in the calling thread.
       rSpecifier = 
UimaClassFactory.produceResourceSpecifier(super.aeDescriptor);
-        AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, 
paramsMap);
+
+      AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, 
paramsMap);
         //  Call to produceAnalysisEngine() may take a long time to complete. 
While this
         //  method was executing, the service may have been stopped. Before 
continuing 
         //  check if the service has been stopped. If so, destroy AE instance 
and return.
@@ -517,7 +520,7 @@ public class PrimitiveAnalysisEngineCont
          stackDumpTimer = null;   // nullify timer instance so that we dont 
have to worry about
           // it in case an exception happens below
       }
-
+      
       // Store how long it took to call processAndOutputNewCASes()
       totalProcessTime = (super.getCpuTime() - time);
       long sequence = 1;
@@ -657,6 +660,7 @@ public class PrimitiveAnalysisEngineCont
         childCasStateEntry.setInputCasReferenceId(aCasReferenceId);
         // Increment number of child CASes generated from the input CAS
         parentCasStateEntry.incrementSubordinateCasInPlayCount();
+        parentCasStateEntry.incrementOutstandingFlowCounter();
 
         // Associate input CAS with the new CAS
         newEntry.setInputCasReferenceId(aCasReferenceId);


Reply via email to