Author: berndf
Date: Fri Mar 12 23:04:58 2010
New Revision: 922450

URL: http://svn.apache.org/viewvc?rev=922450&view=rev
Log:
VYSPER-179: relay message to either only highest prio sessions, or to all 
non-negative sessions (default), see 
http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
includes improvements in delivery error handling

Modified:
    
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
    
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
    
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
    
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
    
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
    
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
    
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
    
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java

Modified: 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
 Fri Mar 12 23:04:58 2010
@@ -22,6 +22,8 @@ package org.apache.vysper.xmpp.delivery.
 import org.apache.vysper.xmpp.stanza.Stanza;
 import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
 
+import java.util.List;
+
 /**
  * there are many reasons why a stanza may fail to deliver: remote server not 
answering, local addressee has
  * become unavailable, the server has no more resources to process etc.
@@ -39,6 +41,6 @@ public interface DeliveryFailureStrategy
      * @param deliveryException - optional: exception which occured during the 
failed delivery
      * @throws org.apache.vysper.xmpp.delivery.failure.DeliveryException - 
exception which occured during failure strategy execution.
      */
-    public void process(Stanza failedToDeliverStanza, DeliveryException 
deliveryException) throws DeliveryException;
+    public void process(Stanza failedToDeliverStanza, List<DeliveryException> 
deliveryException) throws DeliveryException;
 
 }

Modified: 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
 Fri Mar 12 23:04:58 2010
@@ -19,10 +19,10 @@
  */
 package org.apache.vysper.xmpp.delivery.failure;
 
-import org.apache.vysper.xmpp.delivery.failure.DeliveryFailureStrategy;
-import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
 import org.apache.vysper.xmpp.stanza.Stanza;
 
+import java.util.List;
+
 /**
  *
  * @author The Apache MINA Project ([email protected])
@@ -31,7 +31,7 @@ public class IgnoreFailureStrategy imple
 
     public final static IgnoreFailureStrategy IGNORE_FAILURE_STRATEGY = new 
IgnoreFailureStrategy();
 
-    public void process(Stanza failedToDeliverStanza, DeliveryException 
deliveryException) {
+    public void process(Stanza failedToDeliverStanza, List<DeliveryException> 
deliveryException) throws DeliveryException {
         // do nothing
     }
 }

Modified: 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
 Fri Mar 12 23:04:58 2010
@@ -26,6 +26,9 @@ import static org.apache.vysper.xmpp.sta
 import org.apache.vysper.xmpp.addressing.Entity;
 import org.apache.vysper.compliance.SpecCompliance;
 import org.apache.vysper.compliance.SpecCompliant;
+
+import java.util.List;
+
 import static org.apache.vysper.compliance.SpecCompliant.ComplianceStatus.*;
 import static org.apache.vysper.compliance.SpecCompliant.ComplianceCoverage.*;
 
@@ -46,7 +49,7 @@ public class ReturnErrorToSenderFailureS
         @SpecCompliant(spec="rfc3921bis-08", section = "8.3.2", status = 
NOT_STARTED, coverage = UNKNOWN),
         @SpecCompliant(spec="rfc3921bis-08", section = "4.3", status = 
NOT_STARTED, coverage = UNKNOWN)
     })
-    public void process(Stanza failedToDeliverStanza, DeliveryException 
deliveryException) throws DeliveryException {
+    public void process(Stanza failedToDeliverStanza, List<DeliveryException> 
deliveryExceptions) throws DeliveryException {
 
         StanzaErrorCondition stanzaErrorCondition = 
StanzaErrorCondition.SERVICE_UNAVAILABLE;
         StanzaErrorType errorType = StanzaErrorType.CANCEL;
@@ -59,7 +62,11 @@ public class ReturnErrorToSenderFailureS
             return; // do not answer these
         }
 
-        if (deliveryException != null) {
+        if (deliveryExceptions == null) {
+            XMPPCoreStanza error = 
XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition,
 failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
+            stanzaRelay.relay(error.getTo(), error, 
IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
+        } else if (deliveryExceptions.size() == 1) {
+            DeliveryException deliveryException = deliveryExceptions.get(0);
             if (deliveryException instanceof LocalRecipientOfflineException) {
                 // TODO implement 8.2.3 here
                 stanzaErrorCondition = 
StanzaErrorCondition.RECIPIENT_UNAVAILABLE;
@@ -73,11 +80,11 @@ public class ReturnErrorToSenderFailureS
                 if (failedCoreStanza instanceof PresenceStanza) {
                     final PresenceStanzaType presenceStanzaType = 
((PresenceStanza) failedCoreStanza).getPresenceType();
                     if (presenceStanzaType == null ||
-                        presenceStanzaType == SUBSCRIBED ||
-                        presenceStanzaType == UNSUBSCRIBE ||
-                        presenceStanzaType == UNSUBSCRIBED ||
-                        presenceStanzaType == UNAVAILABLE ||
-                        presenceStanzaType == ERROR) {
+                            presenceStanzaType == SUBSCRIBED ||
+                            presenceStanzaType == UNSUBSCRIBE ||
+                            presenceStanzaType == UNSUBSCRIBED ||
+                            presenceStanzaType == UNAVAILABLE ||
+                            presenceStanzaType == ERROR) {
                         return; // silently ignore
                     }
                     // TODO what happens with PROBE? 8.1 is silent here, but 
see 4.3
@@ -92,9 +99,8 @@ public class ReturnErrorToSenderFailureS
                     }
                 }
             }
+        } else if (deliveryExceptions.size() > 1) {
+            throw new RuntimeException("cannot return to sender for multiple 
failed deliveries");
         }
-
-        XMPPCoreStanza error = 
XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition,
 failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
-        stanzaRelay.relay(error.getTo(), error, 
IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
     }
 }

Modified: 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
 Fri Mar 12 23:04:58 2010
@@ -20,9 +20,7 @@
 package org.apache.vysper.xmpp.delivery.inbound;
 
 import org.apache.vysper.storage.StorageProviderRegistry;
-import org.apache.vysper.storage.jcr.JcrStorageProviderRegistry;
 import org.apache.vysper.xmpp.addressing.Entity;
-import org.apache.vysper.xmpp.addressing.EntityImpl;
 import org.apache.vysper.xmpp.authorization.AccountManagement;
 import org.apache.vysper.xmpp.protocol.SessionStateHolder;
 import org.apache.vysper.xmpp.protocol.StanzaHandler;
@@ -50,6 +48,7 @@ import org.apache.vysper.compliance.Spec
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -58,6 +57,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * relays all 'incoming' stanzas to internal sessions, acts as a 'stage' by 
using a ThreadPoolExecutor
@@ -130,14 +130,14 @@ public class DeliveringInboundStanzaRela
 
         public RelayResult call() {
             RelayResult relayResult = deliver();
-            if (relayResult == null || relayResult.isRelayed()) return 
relayResult;
+            if (relayResult == null || !relayResult.hasProcessingErrors()) 
return relayResult;
             return runFailureStrategy(relayResult);
         }
 
         private RelayResult runFailureStrategy(RelayResult relayResult) {
             if (deliveryFailureStrategy != null) {
                 try {
-                    deliveryFailureStrategy.process(stanza, 
relayResult.getProcessingError());
+                    deliveryFailureStrategy.process(stanza, 
relayResult.getProcessingErrors());
                 } catch (DeliveryException e) {
                     return new RelayResult(e);
                 } catch (RuntimeException e) {
@@ -196,7 +196,8 @@ public class DeliveringInboundStanzaRela
                 switch (messageStanzaType) {
                     case CHAT:
                     case NORMAL:
-                        return relayToBestSession(false);
+                        return 
serverRuntimeContext.getServerFeatures().isDeliveringMessageToHighestPriorityResourcesOnly()
 ?
+                                relayToBestSessions(false) : 
relayToAllSessions(0);
                     
                     case ERROR:
                         // silently ignore
@@ -229,7 +230,7 @@ public class DeliveringInboundStanzaRela
                 // TODO cannot deliver presence with type  AVAIL or UNAVAIL: 
silently ignore
                 // TODO cannot deliver presence with type  SUBSCRIBE: see 
3921bis section 3.1.3
                 // TODO cannot deliver presence with type  (UN)SUBSCRIBED, 
UNSUBSCRIBE: silently ignore
-                return relayToBestSession(false);
+                return relayToBestSessions(false);
             } else if (MessageStanza.isOfType(stanza)) {
                 MessageStanza messageStanza = (MessageStanza)xmppStanza;
                 MessageStanzaType messageStanzaType = 
messageStanza.getMessageType();
@@ -238,11 +239,11 @@ public class DeliveringInboundStanzaRela
                                                    messageStanzaType == 
MessageStanzaType.NORMAL;
                 // TODO cannot deliver ERROR: silently ignore
                 // TODO cannot deliver GROUPCHAT: service n/a
-                return relayToBestSession(fallbackToBareJIDAllowed);
+                return relayToBestSessions(fallbackToBareJIDAllowed);
 
             } else if (IQStanza.isOfType(stanza)) {
                 // TODO no resource matches: service n/a
-                return relayToBestSession(false);
+                return relayToBestSessions(false);
             }
  
             // for any other type of stanza 
@@ -262,54 +263,67 @@ public class DeliveringInboundStanzaRela
             }
         }
 
-        protected RelayResult relayToBestSession(final boolean 
fallbackToBareJIDAllowed) {
-            SessionContext receivingSession = 
resourceRegistry.getHighestPrioSession(receiver, PRIO_THRESHOLD);
+        protected RelayResult relayToBestSessions(final boolean 
fallbackToBareJIDAllowed) {
+            List<SessionContext> receivingSessions = 
resourceRegistry.getHighestPrioSessions(receiver, PRIO_THRESHOLD);
 
-            if (receivingSession == null && receiver.isResourceSet() && 
fallbackToBareJIDAllowed) {
+            if (receivingSessions.size() == 0 && receiver.isResourceSet() && 
fallbackToBareJIDAllowed) {
                 // no concrete session for this resource has been found
                 // fall back to bare JID
-                receivingSession = 
resourceRegistry.getHighestPrioSession(receiver.getBareJID(), PRIO_THRESHOLD);
+                receivingSessions = 
resourceRegistry.getHighestPrioSessions(receiver.getBareJID(), PRIO_THRESHOLD);
             }
 
-            if (receivingSession == null) {
+            if (receivingSessions.size() == 0) {
                 return relayNotPossible();
             }
-            
-            if (receivingSession.getState() != SessionState.AUTHENTICATED) {
-                return new RelayResult(new DeliveryException("no relay to 
non-authenticated sessions"));
-            }
-            try {
-                StanzaHandler stanzaHandler = 
receivingSession.getServerRuntimeContext().getHandler(stanza);
-                INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession, 
sessionStateHolder, stanza, stanzaHandler);
-            } catch (Exception e) {
-                return new RelayResult(new DeliveryException(e));
+
+            RelayResult relayResult = new RelayResult();
+            for (SessionContext receivingSession : receivingSessions) {
+                if (receivingSession.getState() != SessionState.AUTHENTICATED) 
{
+                    relayResult.addProcessingError(new DeliveryException("no 
relay to non-authenticated sessions"));
+                    continue;
+                }
+                try {
+                    StanzaHandler stanzaHandler = 
receivingSession.getServerRuntimeContext().getHandler(stanza);
+                    
INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession, 
sessionStateHolder, stanza, stanzaHandler);
+                } catch (Exception e) {
+                    relayResult.addProcessingError(new DeliveryException("no 
relay to non-authenticated sessions"));
+                    continue;
+                }
+
             }
-            return new RelayResult(); // return success result
+            return relayResult;
         }
         
         protected RelayResult relayToAllSessions() {
-            // the individual results are currently only recorded pro forma
-            List<RelayResult> relayResults = new ArrayList<RelayResult>();
-            
-            List<SessionContext> receivingSessions = 
resourceRegistry.getSessions(receiver);
+            return relayToAllSessions(null);
+        }
+
+        protected RelayResult relayToAllSessions(Integer prioThreshold) {
+
+            List<SessionContext> receivingSessions = prioThreshold == null ?
+                                                            
resourceRegistry.getSessions(receiver) :
+                                                            
resourceRegistry.getSessions(receiver, 0);
 
             if (receivingSessions.size() > 1) {
                  logger.warn("multiplexing: {} sessions will be processing {} 
", receivingSessions.size(), stanza);
              }
+
+            RelayResult relayResult = new RelayResult();
+
              for (SessionContext sessionContext : receivingSessions) {
                  if (sessionContext.getState() != SessionState.AUTHENTICATED) {
-                     relayResults.add(new RelayResult(new 
DeliveryException("no relay to non-authenticated sessions")));
+                     relayResult.addProcessingError(new DeliveryException("no 
relay to non-authenticated sessions"));
                      continue;
                  }
                  try {
                      StanzaHandler stanzaHandler = 
sessionContext.getServerRuntimeContext().getHandler(stanza);
                      
INBOUND_STANZA_PROTOCOL_WORKER.processStanza(sessionContext, 
sessionStateHolder, stanza, stanzaHandler);
                  } catch (Exception e) {
-                     relayResults.add(new RelayResult(new 
DeliveryException(e)));
+                     relayResult.addProcessingError(new DeliveryException(e));
                  }
              }
-                
-             return new RelayResult(); // return success result
+
+            return relayResult; // return success result
          }
     }
 
@@ -327,24 +341,36 @@ public class DeliveringInboundStanzaRela
     }
 
     private static class RelayResult {
-        private DeliveryException processingError;
-        private boolean relayed;
+        private List<DeliveryException> processingErrors = null;
+        private AtomicInteger relayed = new AtomicInteger(0);
 
-        public RelayResult(DeliveryException processingError) {
-            this.processingError = processingError;
-            this.relayed = false;
+        public RelayResult() {
+            // empty
         }
 
-        public RelayResult() {
-            this.relayed = true;
+        public RelayResult(DeliveryException processingError) {
+            addProcessingError(processingError);
         }
 
-        public DeliveryException getProcessingError() {
-            return processingError;
+        /*package*/ void addProcessingError(DeliveryException processingError) 
{
+            if (processingError == null) processingErrors = new 
ArrayList<DeliveryException>();
+            processingErrors.add(processingError);
         }
 
         public boolean isRelayed() {
-            return relayed;
+            return relayed.get() > 0;
+        }
+
+        public List<DeliveryException> getProcessingErrors() {
+            if (processingErrors == null) {
+                return Collections.<DeliveryException>emptyList();
+            } else {
+                return Collections.unmodifiableList(processingErrors);
+            }
+        }
+
+        public boolean hasProcessingErrors() {
+            return processingErrors != null && processingErrors.size() > 0;
         }
     }
 

Modified: 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
 Fri Mar 12 23:04:58 2010
@@ -56,6 +56,16 @@ public class ServerFeatures {
      */
     private int authenticationRetries = 3;
 
+    /**
+     * decide to which client's resources a message will be delivered:
+     * (a) to the highest-priority available resource(s)
+     * (b) to all available resources with non-negative presence priority 
+     *
+     * see 
http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
+     *
+     */
+    private boolean deliverMessageToHighestPriorityResourcesOnly = false; 
+
     public ServerFeatures() {
         // default constructor
     }
@@ -103,4 +113,12 @@ public class ServerFeatures {
     public void setRelayingToFederationServers(boolean 
relayToFederationServers) {
         this.relayToFederationServers = relayToFederationServers;
     }
+
+    public boolean isDeliveringMessageToHighestPriorityResourcesOnly() {
+        return deliverMessageToHighestPriorityResourcesOnly;
+    }
+
+    public void setDeliverMessageToHighestPriorityResourcesOnly(boolean 
deliverMessageToHighestPriorityResourcesOnly) {
+        this.deliverMessageToHighestPriorityResourcesOnly = 
deliverMessageToHighestPriorityResourcesOnly;
+    }
 }

Modified: 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
 Fri Mar 12 23:04:58 2010
@@ -278,21 +278,44 @@ public class ResourceRegistry {
                return sessionContexts;
        }
 
+    /**
+     * retrieves sessions with same or above threshold
+     *
+     * @param entity all session for the bare jid will be considered.
+     * @param prioThreshold only resources will be returned having same or 
higher priority. a common value
+     * for the threshold is 0 (zero), which is also the default when param is 
NULL.
+     * @return returns the sessions matching the given JID (bare) with same or 
higher priority
+     */
+    public List<SessionContext> getSessions(Entity entity, Integer 
prioThreshold) {
+        if (prioThreshold == null) prioThreshold = 0;
+        List<SessionContext> results = new ArrayList<SessionContext>();
+
+        List<String> boundResourceIds = getBoundResources(entity, true);
+               for (String resourceId : boundResourceIds) {
+            SessionData sessionData = boundResources.get(resourceId);
+            if (sessionData == null) continue;
+
+            if (sessionData.priority >= prioThreshold) {
+                results.add(sessionData.context);
+            }
+        }
+        return results;
+       }
     
     /**
-     * retrieves the highest priorizes session for this entity. 
+     * retrieves the highest prioritized session(s) for this entity.
      * 
      * @param entity if this is not a bare JID, only the session for the JID's 
resource part will be returned, without
-     * looking at other sessions for the resource's bare JID. otherwise, in 
case of a full JID, it will return the 
-     * highest priorized session.
-     * @param prioThreshold if not NULL, only resources will be returned 
having same or higher priority. a common value 
-     * for the threshold is 0 (zero). 
-     * @return for a bare JID, it will return the hightest priorized session. 
for a full JID, it will return the 
+     * looking at other sessions for the resource's bare JID. otherwise, in 
case of a full JID, it will return the
+     * highest prioritized sessions.
+     * @param prioThreshold if not NULL, only resources will be returned 
having same or higher priority. a common value
+     * for the threshold is 0 (zero).
+     * @return for a bare JID, it will return the highest prioritized 
sessions. for a full JID, it will return the
      * related session.
      */
-    public SessionContext getHighestPrioSession(Entity entity, Integer 
prioThreshold) {
-        Integer currentPrio = Integer.MIN_VALUE;
-        SessionData result = null;
+    public List<SessionContext> getHighestPrioSessions(Entity entity, Integer 
prioThreshold) {
+        Integer currentPrio = prioThreshold == null ? Integer.MIN_VALUE : 
prioThreshold;
+        List<SessionContext> results = new ArrayList<SessionContext>();
 
         boolean isResourceSet = entity.isResourceSet();
 
@@ -301,20 +324,24 @@ public class ResourceRegistry {
             SessionData sessionData = boundResources.get(resourceId);
             if (sessionData == null) continue;
             
-            if (isResourceSet) return sessionData.context; // no prio checks, 
take the first proper one
+            if (isResourceSet) {
+                // if resource id matches, there can only be one result
+                // this overrides even parameter prio threshold
+                results.clear();
+                results.add(sessionData.context);
+                return results;
+            }
             
             if (sessionData.priority > currentPrio) {
+                results.clear(); // discard all accumulated lower prio sessions
                 currentPrio = sessionData.priority;
-                result = sessionData;
+                results.add(sessionData.context);
+            } else if(sessionData.priority.intValue() == 
currentPrio.intValue()) {
+                results.add(sessionData.context);
             }
         }
 
-        if (prioThreshold != null && prioThreshold > currentPrio) {
-            // no session over threshold 
-            return null;
-        }
-
-               return result == null ? null : result.context;
+        return results;
        }
 
     /**

Modified: 
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
 Fri Mar 12 23:04:58 2010
@@ -29,10 +29,12 @@ import org.apache.vysper.xmpp.authorizat
 import org.apache.vysper.xmpp.authorization.AccountManagement;
 import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
 import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy;
+import org.apache.vysper.xmpp.server.DefaultServerRuntimeContext;
 import org.apache.vysper.xmpp.server.SessionState;
 import org.apache.vysper.xmpp.server.TestSessionContext;
 import org.apache.vysper.xmpp.stanza.Stanza;
 import org.apache.vysper.xmpp.stanza.StanzaBuilder;
+import org.apache.vysper.xmpp.state.resourcebinding.BindException;
 import org.apache.vysper.xmpp.state.resourcebinding.ResourceRegistry;
 
 /**
@@ -98,4 +100,86 @@ public class DeliveringStanzaRelayTestCa
             throw e;
         }
     }
+
+    public void testRelayToTwoRecepients_DeliverToALL() throws 
EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+        DefaultServerRuntimeContext serverRuntimeContext = new 
DefaultServerRuntimeContext(null, null);
+        
+        // !! DeliverMessageToHighestPriorityResourcesOnly = FALSE
+        
serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
+
+        stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+        EntityImpl fromEntity = EntityImpl.parse("[email protected]");
+
+        EntityImpl toEntity = EntityImpl.parse("[email protected]");
+
+
+        TestSessionContext sessionContextToEntity_1_prio3 = 
createSessionForTo(toEntity, 3); // NON-NEGATIVE
+        TestSessionContext sessionContextToEntity_2_prio0 = 
createSessionForTo(toEntity, 0); // NON-NEGATIVE
+        TestSessionContext sessionContextToEntity_3_prio3 = 
createSessionForTo(toEntity, 3); // NON-NEGATIVE
+        TestSessionContext sessionContextToEntity_4_prioMinus = 
createSessionForTo(toEntity, -1); // not receiving, negative
+
+        Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity, 
toEntity, "en", "Hello").build();
+
+        try {
+            stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+            Stanza recordedStanza_1 = 
sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 1 delivered", recordedStanza_1);
+            Stanza recordedStanza_2 = 
sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+            assertNotNull("stanza 2 delivered", recordedStanza_2);
+            Stanza recordedStanza_3 = 
sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 3 delivered", recordedStanza_3);
+            Stanza recordedStanza_4 = 
sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+            assertNull("stanza 4 delivered", recordedStanza_4);
+        } catch (DeliveryException e) {
+            throw e;
+        }
+
+    }
+
+    public void testRelayToTwoRecepients_DeliverToHIGHEST() throws 
EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+        DefaultServerRuntimeContext serverRuntimeContext = new 
DefaultServerRuntimeContext(null, null);
+
+        // !! DeliverMessageToHighestPriorityResourcesOnly = TRUE
+        
serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(true);
+
+        stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+        EntityImpl fromEntity = EntityImpl.parse("[email protected]");
+
+        EntityImpl toEntity = EntityImpl.parse("[email protected]");
+
+
+        TestSessionContext sessionContextToEntity_1_prio3 = 
createSessionForTo(toEntity, 3); // HIGHEST PRIO
+        TestSessionContext sessionContextToEntity_2_prio0 = 
createSessionForTo(toEntity, 1); // not receiving
+        TestSessionContext sessionContextToEntity_3_prio3 = 
createSessionForTo(toEntity, 3); // HIGHEST PRIO
+        TestSessionContext sessionContextToEntity_4_prioMinus = 
createSessionForTo(toEntity, -1); // not receiving
+
+        Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity, 
toEntity, "en", "Hello").build();
+
+        try {
+            stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+            Stanza recordedStanza_1 = 
sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 1 delivered", recordedStanza_1);
+            Stanza recordedStanza_2 = 
sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+            assertNull("stanza 2 not delivered", recordedStanza_2);
+            Stanza recordedStanza_3 = 
sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 3 delivered", recordedStanza_3);
+            Stanza recordedStanza_4 = 
sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+            assertNull("stanza 4 not delivered", recordedStanza_4);
+        } catch (DeliveryException e) {
+            throw e;
+        }
+
+    }
+
+    private TestSessionContext createSessionForTo(EntityImpl toEntity, final 
int priority) {
+        TestSessionContext sessionContextToEntity = 
TestSessionContext.createSessionContext(toEntity);
+        sessionContextToEntity.setSessionState(SessionState.AUTHENTICATED);
+        String toEntityRes = 
resourceRegistry.bindSession(sessionContextToEntity);
+        resourceRegistry.setResourcePriority(toEntityRes, priority);
+        return sessionContextToEntity;
+    }
+
+
 }

Modified: 
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- 
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
 (original)
+++ 
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
 Fri Mar 12 23:04:58 2010
@@ -93,20 +93,37 @@ public class ResourceRegistryTestCase ex
         assertEquals(2, resourceList.size());
         assertTrue(sessionList.contains(sessionContext1));
         assertTrue(sessionList.contains(sessionContext2));
-        
-        SessionContext hightestPrioSession = 
resourceRegistry.getHighestPrioSession(entity, null);
-        assertSame(resourceRegistry.getSessionContext(resourceId2), 
hightestPrioSession);
-        
+
+        List<SessionContext> highestPrioSessions = 
resourceRegistry.getHighestPrioSessions(entity, null);
+        assertEquals(1, highestPrioSessions.size());
+        SessionContext highestPrioSession = highestPrioSessions.get(0);
+        assertSame(resourceRegistry.getSessionContext(resourceId2), 
highestPrioSession);
+
         resourceRegistry.setResourcePriority(resourceId1, 2); // make this 
highes prio
-        hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, 
null);
-        assertSame(resourceRegistry.getSessionContext(resourceId1), 
hightestPrioSession);
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 
null);
+        assertEquals(1, highestPrioSessions.size());
+        highestPrioSession = highestPrioSessions.get(0);
+        assertSame(resourceRegistry.getSessionContext(resourceId1), 
highestPrioSession);
+
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 
2); // still highest prio
+        assertEquals(1, highestPrioSessions.size());
+        highestPrioSession = highestPrioSessions.get(0);
+        assertSame(resourceRegistry.getSessionContext(resourceId1), 
highestPrioSession);
+        
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 
3); // now, all prios are below threshold
+        assertEquals(0, highestPrioSessions.size());
+
+        resourceRegistry.setResourcePriority(resourceId1, 4); // both are same
+        resourceRegistry.setResourcePriority(resourceId2, 4); // both are same
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 
3);
+        assertEquals(2, highestPrioSessions.size());
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 
4);
+        assertEquals(2, highestPrioSessions.size());
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 
5);
+        assertEquals(0, highestPrioSessions.size());
+
+        
sessionContext1.getServerRuntimeContext().getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
 
-        hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, 
2); // still highest prio
-        assertSame(resourceRegistry.getSessionContext(resourceId1), 
hightestPrioSession);
-        
-        hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, 
3); // now, all prios are below threshold
-        assertNull(hightestPrioSession);
-        
     }
 
     public void testAddOneEntityMultipleResources_TolerateResourceIds() throws 
EntityFormatException {


Reply via email to