Author: gatfora
Date: Tue Jun 24 01:19:37 2008
New Revision: 671058

URL: http://svn.apache.org/viewvc?rev=671058&view=rev
Log:
Applying patches from SANDESHA2-161 and SANDESHA2-162, thanks David, Sara

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
 Tue Jun 24 01:19:37 2008
@@ -291,24 +291,19 @@
 
                        SandeshaThread sender = storageManager.getSender();
                        WorkerLock lock = sender.getWorkerLock();
-                       
+
                        String workId = matchingMessage.getMessageID();
-                       SenderWorker worker = null;
-                       synchronized(lock){
-                               while (lock.isWorkPresent(workId)) {
-                                       try {
-                                               //wait on the lock.
-                                               lock.wait();
-                                       } catch (InterruptedException e) {
-                                                       e.printStackTrace();
-                                       }
+                       SenderWorker worker = new 
SenderWorker(pollMessage.getConfigurationContext(), matchingMessage, 
pollMessage.getRMSpecVersion());
+                       worker.setLock(lock);
+                       worker.setWorkId(workId);
+                       while (!lock.addWork(workId, worker)) {
+                               try {
+                                       // wait on the lock.
+                                       lock.awaitRemoval(workId);
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
                                }
-                               
-                               worker = new SenderWorker 
(pollMessage.getConfigurationContext(), matchingMessage, 
pollMessage.getRMSpecVersion());
-                               worker.setLock(lock);
-                               worker.setWorkId(workId);
-                               
-                               lock.addWork(workId, worker);
+
                        }
                        
                        setTransportProperties (returnMessage, pollMessage);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 Tue Jun 24 01:19:37 2008
@@ -20,13 +20,12 @@
 package org.apache.sandesha2.storage.inmemory;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beans.RMBean;
@@ -44,7 +43,6 @@
        private InMemoryStorageManager manager;
        private String threadName;
        private ArrayList enlistedBeans = new ArrayList();
-       private InMemoryTransaction waitingForTran = null;
        private boolean sentMessages = false;
        private boolean active = true;
        private Thread thread;
@@ -73,61 +71,54 @@
        public boolean isActive () {
                return active;
        }
+       
+       private class DummyTransaction extends ReentrantLock implements 
Transaction {
+
+               public void commit() throws SandeshaStorageException {
+                       throw new SandeshaStorageException("Not supported");
+               }
+
+               public boolean isActive() {
+                       // TODO Auto-generated method stub
+                       return false;
+               }
+
+               public void rollback() throws SandeshaStorageException {
+                       throw new SandeshaStorageException("Not supported");
+               }
 
+       }
+       
        public void enlist(RMBean bean) throws SandeshaStorageException {
                if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::enlist, " + bean);
-               if(bean != null) {
+               if (bean != null) {
+                       DummyTransaction tran = null;
                        synchronized (bean) {
-                               InMemoryTransaction other = 
(InMemoryTransaction) bean.getTransaction();
-                               while(other != null && other != this) {
-                                       // Put ourselves into the list of 
waiters
-                                       waitingForTran = other;
-
-                                       // Look to see if there is a loop in 
the chain of waiters
-                                       if(!enlistedBeans.isEmpty()) {
-                                               HashSet set = new HashSet();
-                                               set.add(this);
-                                               while(other != null) {
-                                                       if(set.contains(other)) 
{
-                                                               String message 
= SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, 
this.toString(), bean.toString());
-                                                               
SandeshaStorageException e = new SandeshaStorageException(message);
-                                                               
-                                                               // Do our best 
to get out of the way of the other work in the system
-                                                               waitingForTran 
= null;
-                                                               releaseLocks();
-                                                               
-                                                               
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug(message, e);
-                                                               throw e;
-                                                       }
-                                                       set.add(other);
-                                                       other = 
other.waitingForTran;
-                                               }
-                                       }
-                                       
-                                       boolean warn = false;
+                               tran = (DummyTransaction) bean.getTransaction();
+                               if (tran == null) {
+                                       tran = new DummyTransaction();
+                                       bean.setTransaction(tran);
+                               }
+                       }
+
+                       boolean locked = false;
+                       while (!locked) {
+                               locked = tran.tryLock();
+                               if (!locked) {
+
                                        try {
-                                               
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("This " + this + " waiting for " + waitingForTran);
-                                               long pre = 
System.currentTimeMillis();
-                                               bean.wait(5000); 
-                                               long post = 
System.currentTimeMillis();
-                                               if ((post - pre) > 50000)
-                                                       warn = true;
-                                       } catch(InterruptedException e) {
-                                               // Do nothing
-                                       }
-                                       other = (InMemoryTransaction) 
bean.getTransaction();
-                                       if (other != null && warn) {
-                                               //we have been waiting for a 
long time - this might imply a three way deadlock so error condition
-                                               
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("possible deadlock :" + this.toString() + " : " + bean.toString());
+                                               locked = tran.tryLock(5, 
TimeUnit.SECONDS);
+                                               if (!locked) {
+                                                       if 
(log.isDebugEnabled())
+                                                               
log.debug("Waiting for bean lock 5 seconds");
+                                               }
+                                       } catch (InterruptedException e) {
+                                               e.printStackTrace();
                                        }
                                }
-                               
-                               waitingForTran = null;
-                               if(other == null) {
-                                       if(LoggingControl.isAnyTracingEnabled() 
&& log.isDebugEnabled()) log.debug(this + " locking bean");
-                                       bean.setTransaction(this);
-                                       enlistedBeans.add(bean);
-                               }
+
+                               enlistedBeans.add(bean);
+
                        }
                }
                
@@ -141,10 +132,8 @@
                Iterator beans = enlistedBeans.iterator();
                while(beans.hasNext()) {
                        RMBean bean = (RMBean) beans.next();
-                       synchronized (bean) {
-                               bean.setTransaction(null);
-                               bean.notifyAll();
-                       }
+                       DummyTransaction tran = (DummyTransaction) 
bean.getTransaction();
+                       tran.unlock();
                }
                enlistedBeans.clear();
                
@@ -179,3 +168,4 @@
 
 
 
+

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
 Tue Jun 24 01:19:37 2008
@@ -19,7 +19,8 @@
 
 package org.apache.sandesha2.workers;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,44 +29,81 @@
 public class WorkerLock {
 
   static final Log log = LogFactory.getLog(WorkerLock.class);
-  private HashMap locks = new HashMap();
-       
+  private ConcurrentHashMap locks = new ConcurrentHashMap();
+  
   public WorkerLock () {
   }
-       
-  public synchronized boolean addWork (String work, Object owner) {
-       if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Enter: WorkerLock::addWork " + work + ", " + owner);
-    if(locks.containsKey(work)){
-       if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Exit: WorkerLock::addWork " + false);
-       return false;
-    }
-    locks.put(work, owner);
-    if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Exit: WorkerLock::addWork " + true);
-       return true;
-  }
-       
-       public synchronized void removeWork (String work) {
-               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Enter: WorkerLock::removeWork " + work);
-               locks.remove(work);
-               
-               //wake up some thread that is waiting on this lock.
-               this.notify();
-               
-               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Exit: WorkerLock::removeWork");
-       }
-       
-       public synchronized boolean isWorkPresent (String work) {
-         if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Enter: WorkerLock::isWorkPresent " + work);
-         boolean value = locks.containsKey(work);
-         if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Exit: WorkerLock::isWorkPresent " + value);
-         return value;
-       }
-       
-        public synchronized boolean ownsLock(String work, Object owner) {
-               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Enter: WorkerLock::ownsLock " + work + " ," + 
owner);
-           Object realOwner = locks.get(work);
-           if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Exit: WorkerLock::ownsLock " + Boolean.valueOf(realOwner == owner));
-           return realOwner == owner;
-         }
 
-}
\ No newline at end of file
+  
+       private static class Holder {
+               CountDownLatch latch = new CountDownLatch(1);
+
+               Object value;
+
+               public Holder(Object newValue) {
+                       value = newValue;
+               }
+
+               public void awaitRelease() throws InterruptedException {
+                       latch.await();
+               }
+
+               public void release() {
+                       latch.countDown();
+               }
+
+               public Object getValue() {
+                       return value;
+               }
+       }
+
+       public boolean addWork(String work, Object owner) {
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Enter: WorkerLock::addWork " + work + ", " + 
owner);
+       Holder h = new Holder(owner);
+               Object prev = locks.putIfAbsent(work, h);
+               boolean result = (prev == null);
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Exit: WorkerLock::addWork " + result);
+               return result;
+       }
+
+       public void awaitRemoval(String work) throws InterruptedException {
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Enter: WorkerLock::awaitRemoval " + work);
+               Holder h = (Holder) locks.get(work);
+               if (h != null) {
+                       h.awaitRelease();
+               }
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Exit: WorkerLock::awaitRemoval");
+       }
+
+       public void removeWork(String work) {
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Enter: WorkerLock::removeWork " + work);
+               Holder h = (Holder) locks.remove(work);
+               h.release();
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Exit: WorkerLock::removeWork");
+       }
+
+       public boolean isWorkPresent(String work) {
+
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Enter: WorkerLock::isWorkPresent " + work);
+               boolean value = locks.containsKey(work);
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Exit: WorkerLock::isWorkPresent " + value);
+               return value;
+       }
+
+       public boolean ownsLock(String work, Object owner) {
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Enter: WorkerLock::ownsLock " + work + " ," 
+ owner);
+               Holder h = (Holder) locks.get(work);
+               Object realOwner = (h != null ? h.getValue() : null);
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Exit: WorkerLock::ownsLock " + 
Boolean.valueOf(realOwner == owner));
+               return realOwner == owner;
+       }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to