Author: mckierna
Date: Wed Jul 30 12:20:57 2008
New Revision: 681179

URL: http://svn.apache.org/viewvc?rev=681179&view=rev
Log:
See https://issues.apache.org/jira/browse/SANDESHA2-171, Thanks Sara

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=681179&r1=681178&r2=681179&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 Wed Jul 30 12:20:57 2008
@@ -36,6 +36,7 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.Sandesha2Constants.MessageTypes;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
@@ -68,10 +69,14 @@
        boolean processedMessage = false;
 
        long lastHousekeeping = 0;
+       
+       long lastRanCleanup = 0;
 
        private static int HOUSEKEEPING_INTERVAL = 20000;
 
        private ConcurrentHashMap ackMap = new ConcurrentHashMap();
+       
+       private ConcurrentHashMap warnedAlreadyOrphans = new 
ConcurrentHashMap();
 
        private static class AckHolder {
                public long tts = 0;
@@ -563,19 +568,57 @@
                                        String message = null;
                                        String internalSequenceID = 
bean.getInternalSequenceID();
                                        String sequenceID = 
bean.getSequenceID();
-                                       if (bean.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION)
-                                               message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, 
internalSequenceID);
-                                       else {
-                                               String messageType = 
Integer.toString(bean.getMessageType());
-                                               message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, 
messageType, sequenceID, internalSequenceID);
+                               
+                                       
if(!warnedAlreadyOrphans.containsKey(sequenceID)){ // we only want to do 
log.warn once per orphaned sequenceId
+                                       
+                                               if (bean.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION)                                    
   
+                                                       message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, 
internalSequenceID);                              
+                                               else
+                                               {
+                                                       String messageType = 
Integer.toString(bean.getMessageType());
+                                                       message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, 
messageType, sequenceID, internalSequenceID);
+                                               }
+                                               
warnedAlreadyOrphans.put(sequenceID, System.currentTimeMillis());
+                                               log.warn(message);
+                                       }
+                               }
+                               
+                               // If client shuts down too quickly, 
termination messages get orphaned and this has an impact on performance.  
+                               // Will delete these once they have been 
recognised as orphans.
+                               int messageType = bean.getMessageType();
+                               if(MessageTypes.TERMINATE_SEQ ==  messageType 
|| MessageTypes.TERMINATE_SEQ_RESPONSE ==  messageType){
+                                       String id = bean.getSequenceID(); // 
get this again as it is an error case
+                                       if(log.isDebugEnabled()) 
log.debug("Sender::checkForOrphanMessages.  Orphaned message of type 
TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found.  Deleting this message with a 
sequence ID of : " + id);
+                                       manager.getSenderBeanMgr().delete(id);
+                                       
+                                       
+                               } else {                                        
+                                       // Update the bean so that we won't 
emit another message for another TRANSPORT_WAIT_TIME
+                                       
bean.setTimeToSend(System.currentTimeMillis());
+                                       manager.getSenderBeanMgr().update(bean);
+                               }
+                       
+                               // clean up warnedAlreadyOrphans list when it 
gets big - currently over a thousand entries, or every10 minutes
+                               // delete everything over an hour old
+                               long currentTime = System.currentTimeMillis();
+                               if(lastRanCleanup == 0){
+                                       lastRanCleanup = 
System.currentTimeMillis();
+                               }
+                               if( warnedAlreadyOrphans.size() > 1000 || 
currentTime > (lastRanCleanup + 600000)){
+                                       if(log.isDebugEnabled()) 
log.debug("Sender::checkForOrphanMessages.  Cleaning up list of orphans");
+                                       long timeAnHourAgo = currentTime - 
3600000; 
+                                       Iterator it = 
warnedAlreadyOrphans.keySet().iterator();
+                                       while(it.hasNext()){
+                                               Object key = it.next();
+                                               long ageOfThisOrphan = 
((Long)warnedAlreadyOrphans.get(key)).longValue();
+                                               if (ageOfThisOrphan < 
timeAnHourAgo) {
+                                                       
warnedAlreadyOrphans.remove(key);
+                                               }
+                                               
                                        }
-                                       log.warn(message);
+                                       lastRanCleanup = 
System.currentTimeMillis();
                                }
 
-                               // Update the bean so that we won't emit 
another message for
-                               // another TRANSPORT_WAIT_TIME
-                               bean.setTimeToSend(System.currentTimeMillis());
-                               manager.getSenderBeanMgr().update(bean);
                        }
 
                        if (tran != null && tran.isActive())
@@ -595,4 +638,6 @@
                if (log.isDebugEnabled())
                        log.debug("Exit: Sender::checkForOrphanMessages");
        }
+
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to