Author: berndf
Date: Fri Mar 12 23:04:58 2010
New Revision: 922450
URL: http://svn.apache.org/viewvc?rev=922450&view=rev
Log:
VYSPER-179: relay message to either only highest prio sessions, or to all
non-negative sessions (default), see
http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
includes improvements in delivery error handling
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
Fri Mar 12 23:04:58 2010
@@ -22,6 +22,8 @@ package org.apache.vysper.xmpp.delivery.
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
+import java.util.List;
+
/**
* there are many reasons why a stanza may fail to deliver: remote server not
answering, local addressee has
* become unavailable, the server has no more resources to process etc.
@@ -39,6 +41,6 @@ public interface DeliveryFailureStrategy
* @param deliveryException - optional: exception which occured during the
failed delivery
* @throws org.apache.vysper.xmpp.delivery.failure.DeliveryException -
exception which occured during failure strategy execution.
*/
- public void process(Stanza failedToDeliverStanza, DeliveryException
deliveryException) throws DeliveryException;
+ public void process(Stanza failedToDeliverStanza, List<DeliveryException>
deliveryException) throws DeliveryException;
}
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
Fri Mar 12 23:04:58 2010
@@ -19,10 +19,10 @@
*/
package org.apache.vysper.xmpp.delivery.failure;
-import org.apache.vysper.xmpp.delivery.failure.DeliveryFailureStrategy;
-import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
import org.apache.vysper.xmpp.stanza.Stanza;
+import java.util.List;
+
/**
*
* @author The Apache MINA Project ([email protected])
@@ -31,7 +31,7 @@ public class IgnoreFailureStrategy imple
public final static IgnoreFailureStrategy IGNORE_FAILURE_STRATEGY = new
IgnoreFailureStrategy();
- public void process(Stanza failedToDeliverStanza, DeliveryException
deliveryException) {
+ public void process(Stanza failedToDeliverStanza, List<DeliveryException>
deliveryException) throws DeliveryException {
// do nothing
}
}
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
Fri Mar 12 23:04:58 2010
@@ -26,6 +26,9 @@ import static org.apache.vysper.xmpp.sta
import org.apache.vysper.xmpp.addressing.Entity;
import org.apache.vysper.compliance.SpecCompliance;
import org.apache.vysper.compliance.SpecCompliant;
+
+import java.util.List;
+
import static org.apache.vysper.compliance.SpecCompliant.ComplianceStatus.*;
import static org.apache.vysper.compliance.SpecCompliant.ComplianceCoverage.*;
@@ -46,7 +49,7 @@ public class ReturnErrorToSenderFailureS
@SpecCompliant(spec="rfc3921bis-08", section = "8.3.2", status =
NOT_STARTED, coverage = UNKNOWN),
@SpecCompliant(spec="rfc3921bis-08", section = "4.3", status =
NOT_STARTED, coverage = UNKNOWN)
})
- public void process(Stanza failedToDeliverStanza, DeliveryException
deliveryException) throws DeliveryException {
+ public void process(Stanza failedToDeliverStanza, List<DeliveryException>
deliveryExceptions) throws DeliveryException {
StanzaErrorCondition stanzaErrorCondition =
StanzaErrorCondition.SERVICE_UNAVAILABLE;
StanzaErrorType errorType = StanzaErrorType.CANCEL;
@@ -59,7 +62,11 @@ public class ReturnErrorToSenderFailureS
return; // do not answer these
}
- if (deliveryException != null) {
+ if (deliveryExceptions == null) {
+ XMPPCoreStanza error =
XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition,
failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
+ stanzaRelay.relay(error.getTo(), error,
IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
+ } else if (deliveryExceptions.size() == 1) {
+ DeliveryException deliveryException = deliveryExceptions.get(0);
if (deliveryException instanceof LocalRecipientOfflineException) {
// TODO implement 8.2.3 here
stanzaErrorCondition =
StanzaErrorCondition.RECIPIENT_UNAVAILABLE;
@@ -73,11 +80,11 @@ public class ReturnErrorToSenderFailureS
if (failedCoreStanza instanceof PresenceStanza) {
final PresenceStanzaType presenceStanzaType =
((PresenceStanza) failedCoreStanza).getPresenceType();
if (presenceStanzaType == null ||
- presenceStanzaType == SUBSCRIBED ||
- presenceStanzaType == UNSUBSCRIBE ||
- presenceStanzaType == UNSUBSCRIBED ||
- presenceStanzaType == UNAVAILABLE ||
- presenceStanzaType == ERROR) {
+ presenceStanzaType == SUBSCRIBED ||
+ presenceStanzaType == UNSUBSCRIBE ||
+ presenceStanzaType == UNSUBSCRIBED ||
+ presenceStanzaType == UNAVAILABLE ||
+ presenceStanzaType == ERROR) {
return; // silently ignore
}
// TODO what happens with PROBE? 8.1 is silent here, but
see 4.3
@@ -92,9 +99,8 @@ public class ReturnErrorToSenderFailureS
}
}
}
+ } else if (deliveryExceptions.size() > 1) {
+ throw new RuntimeException("cannot return to sender for multiple
failed deliveries");
}
-
- XMPPCoreStanza error =
XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition,
failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
- stanzaRelay.relay(error.getTo(), error,
IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
}
}
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
Fri Mar 12 23:04:58 2010
@@ -20,9 +20,7 @@
package org.apache.vysper.xmpp.delivery.inbound;
import org.apache.vysper.storage.StorageProviderRegistry;
-import org.apache.vysper.storage.jcr.JcrStorageProviderRegistry;
import org.apache.vysper.xmpp.addressing.Entity;
-import org.apache.vysper.xmpp.addressing.EntityImpl;
import org.apache.vysper.xmpp.authorization.AccountManagement;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.protocol.StanzaHandler;
@@ -50,6 +48,7 @@ import org.apache.vysper.compliance.Spec
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -58,6 +57,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* relays all 'incoming' stanzas to internal sessions, acts as a 'stage' by
using a ThreadPoolExecutor
@@ -130,14 +130,14 @@ public class DeliveringInboundStanzaRela
public RelayResult call() {
RelayResult relayResult = deliver();
- if (relayResult == null || relayResult.isRelayed()) return
relayResult;
+ if (relayResult == null || !relayResult.hasProcessingErrors())
return relayResult;
return runFailureStrategy(relayResult);
}
private RelayResult runFailureStrategy(RelayResult relayResult) {
if (deliveryFailureStrategy != null) {
try {
- deliveryFailureStrategy.process(stanza,
relayResult.getProcessingError());
+ deliveryFailureStrategy.process(stanza,
relayResult.getProcessingErrors());
} catch (DeliveryException e) {
return new RelayResult(e);
} catch (RuntimeException e) {
@@ -196,7 +196,8 @@ public class DeliveringInboundStanzaRela
switch (messageStanzaType) {
case CHAT:
case NORMAL:
- return relayToBestSession(false);
+ return
serverRuntimeContext.getServerFeatures().isDeliveringMessageToHighestPriorityResourcesOnly()
?
+ relayToBestSessions(false) :
relayToAllSessions(0);
case ERROR:
// silently ignore
@@ -229,7 +230,7 @@ public class DeliveringInboundStanzaRela
// TODO cannot deliver presence with type AVAIL or UNAVAIL:
silently ignore
// TODO cannot deliver presence with type SUBSCRIBE: see
3921bis section 3.1.3
// TODO cannot deliver presence with type (UN)SUBSCRIBED,
UNSUBSCRIBE: silently ignore
- return relayToBestSession(false);
+ return relayToBestSessions(false);
} else if (MessageStanza.isOfType(stanza)) {
MessageStanza messageStanza = (MessageStanza)xmppStanza;
MessageStanzaType messageStanzaType =
messageStanza.getMessageType();
@@ -238,11 +239,11 @@ public class DeliveringInboundStanzaRela
messageStanzaType ==
MessageStanzaType.NORMAL;
// TODO cannot deliver ERROR: silently ignore
// TODO cannot deliver GROUPCHAT: service n/a
- return relayToBestSession(fallbackToBareJIDAllowed);
+ return relayToBestSessions(fallbackToBareJIDAllowed);
} else if (IQStanza.isOfType(stanza)) {
// TODO no resource matches: service n/a
- return relayToBestSession(false);
+ return relayToBestSessions(false);
}
// for any other type of stanza
@@ -262,54 +263,67 @@ public class DeliveringInboundStanzaRela
}
}
- protected RelayResult relayToBestSession(final boolean
fallbackToBareJIDAllowed) {
- SessionContext receivingSession =
resourceRegistry.getHighestPrioSession(receiver, PRIO_THRESHOLD);
+ protected RelayResult relayToBestSessions(final boolean
fallbackToBareJIDAllowed) {
+ List<SessionContext> receivingSessions =
resourceRegistry.getHighestPrioSessions(receiver, PRIO_THRESHOLD);
- if (receivingSession == null && receiver.isResourceSet() &&
fallbackToBareJIDAllowed) {
+ if (receivingSessions.size() == 0 && receiver.isResourceSet() &&
fallbackToBareJIDAllowed) {
// no concrete session for this resource has been found
// fall back to bare JID
- receivingSession =
resourceRegistry.getHighestPrioSession(receiver.getBareJID(), PRIO_THRESHOLD);
+ receivingSessions =
resourceRegistry.getHighestPrioSessions(receiver.getBareJID(), PRIO_THRESHOLD);
}
- if (receivingSession == null) {
+ if (receivingSessions.size() == 0) {
return relayNotPossible();
}
-
- if (receivingSession.getState() != SessionState.AUTHENTICATED) {
- return new RelayResult(new DeliveryException("no relay to
non-authenticated sessions"));
- }
- try {
- StanzaHandler stanzaHandler =
receivingSession.getServerRuntimeContext().getHandler(stanza);
- INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession,
sessionStateHolder, stanza, stanzaHandler);
- } catch (Exception e) {
- return new RelayResult(new DeliveryException(e));
+
+ RelayResult relayResult = new RelayResult();
+ for (SessionContext receivingSession : receivingSessions) {
+ if (receivingSession.getState() != SessionState.AUTHENTICATED)
{
+ relayResult.addProcessingError(new DeliveryException("no
relay to non-authenticated sessions"));
+ continue;
+ }
+ try {
+ StanzaHandler stanzaHandler =
receivingSession.getServerRuntimeContext().getHandler(stanza);
+
INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession,
sessionStateHolder, stanza, stanzaHandler);
+ } catch (Exception e) {
+ relayResult.addProcessingError(new DeliveryException("no
relay to non-authenticated sessions"));
+ continue;
+ }
+
}
- return new RelayResult(); // return success result
+ return relayResult;
}
protected RelayResult relayToAllSessions() {
- // the individual results are currently only recorded pro forma
- List<RelayResult> relayResults = new ArrayList<RelayResult>();
-
- List<SessionContext> receivingSessions =
resourceRegistry.getSessions(receiver);
+ return relayToAllSessions(null);
+ }
+
+ protected RelayResult relayToAllSessions(Integer prioThreshold) {
+
+ List<SessionContext> receivingSessions = prioThreshold == null ?
+
resourceRegistry.getSessions(receiver) :
+
resourceRegistry.getSessions(receiver, 0);
if (receivingSessions.size() > 1) {
logger.warn("multiplexing: {} sessions will be processing {}
", receivingSessions.size(), stanza);
}
+
+ RelayResult relayResult = new RelayResult();
+
for (SessionContext sessionContext : receivingSessions) {
if (sessionContext.getState() != SessionState.AUTHENTICATED) {
- relayResults.add(new RelayResult(new
DeliveryException("no relay to non-authenticated sessions")));
+ relayResult.addProcessingError(new DeliveryException("no
relay to non-authenticated sessions"));
continue;
}
try {
StanzaHandler stanzaHandler =
sessionContext.getServerRuntimeContext().getHandler(stanza);
INBOUND_STANZA_PROTOCOL_WORKER.processStanza(sessionContext,
sessionStateHolder, stanza, stanzaHandler);
} catch (Exception e) {
- relayResults.add(new RelayResult(new
DeliveryException(e)));
+ relayResult.addProcessingError(new DeliveryException(e));
}
}
-
- return new RelayResult(); // return success result
+
+ return relayResult; // return success result
}
}
@@ -327,24 +341,36 @@ public class DeliveringInboundStanzaRela
}
private static class RelayResult {
- private DeliveryException processingError;
- private boolean relayed;
+ private List<DeliveryException> processingErrors = null;
+ private AtomicInteger relayed = new AtomicInteger(0);
- public RelayResult(DeliveryException processingError) {
- this.processingError = processingError;
- this.relayed = false;
+ public RelayResult() {
+ // empty
}
- public RelayResult() {
- this.relayed = true;
+ public RelayResult(DeliveryException processingError) {
+ addProcessingError(processingError);
}
- public DeliveryException getProcessingError() {
- return processingError;
+ /*package*/ void addProcessingError(DeliveryException processingError)
{
+ if (processingError == null) processingErrors = new
ArrayList<DeliveryException>();
+ processingErrors.add(processingError);
}
public boolean isRelayed() {
- return relayed;
+ return relayed.get() > 0;
+ }
+
+ public List<DeliveryException> getProcessingErrors() {
+ if (processingErrors == null) {
+ return Collections.<DeliveryException>emptyList();
+ } else {
+ return Collections.unmodifiableList(processingErrors);
+ }
+ }
+
+ public boolean hasProcessingErrors() {
+ return processingErrors != null && processingErrors.size() > 0;
}
}
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
Fri Mar 12 23:04:58 2010
@@ -56,6 +56,16 @@ public class ServerFeatures {
*/
private int authenticationRetries = 3;
+ /**
+ * decide to which client's resources a message will be delivered:
+ * (a) to the highest-priority available resource(s)
+ * (b) to all available resources with non-negative presence priority
+ *
+ * see
http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
+ *
+ */
+ private boolean deliverMessageToHighestPriorityResourcesOnly = false;
+
public ServerFeatures() {
// default constructor
}
@@ -103,4 +113,12 @@ public class ServerFeatures {
public void setRelayingToFederationServers(boolean
relayToFederationServers) {
this.relayToFederationServers = relayToFederationServers;
}
+
+ public boolean isDeliveringMessageToHighestPriorityResourcesOnly() {
+ return deliverMessageToHighestPriorityResourcesOnly;
+ }
+
+ public void setDeliverMessageToHighestPriorityResourcesOnly(boolean
deliverMessageToHighestPriorityResourcesOnly) {
+ this.deliverMessageToHighestPriorityResourcesOnly =
deliverMessageToHighestPriorityResourcesOnly;
+ }
}
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
Fri Mar 12 23:04:58 2010
@@ -278,21 +278,44 @@ public class ResourceRegistry {
return sessionContexts;
}
+ /**
+ * retrieves sessions with same or above threshold
+ *
+ * @param entity all session for the bare jid will be considered.
+ * @param prioThreshold only resources will be returned having same or
higher priority. a common value
+ * for the threshold is 0 (zero), which is also the default when param is
NULL.
+ * @return returns the sessions matching the given JID (bare) with same or
higher priority
+ */
+ public List<SessionContext> getSessions(Entity entity, Integer
prioThreshold) {
+ if (prioThreshold == null) prioThreshold = 0;
+ List<SessionContext> results = new ArrayList<SessionContext>();
+
+ List<String> boundResourceIds = getBoundResources(entity, true);
+ for (String resourceId : boundResourceIds) {
+ SessionData sessionData = boundResources.get(resourceId);
+ if (sessionData == null) continue;
+
+ if (sessionData.priority >= prioThreshold) {
+ results.add(sessionData.context);
+ }
+ }
+ return results;
+ }
/**
- * retrieves the highest priorizes session for this entity.
+ * retrieves the highest prioritized session(s) for this entity.
*
* @param entity if this is not a bare JID, only the session for the JID's
resource part will be returned, without
- * looking at other sessions for the resource's bare JID. otherwise, in
case of a full JID, it will return the
- * highest priorized session.
- * @param prioThreshold if not NULL, only resources will be returned
having same or higher priority. a common value
- * for the threshold is 0 (zero).
- * @return for a bare JID, it will return the hightest priorized session.
for a full JID, it will return the
+ * looking at other sessions for the resource's bare JID. otherwise, in
case of a full JID, it will return the
+ * highest prioritized sessions.
+ * @param prioThreshold if not NULL, only resources will be returned
having same or higher priority. a common value
+ * for the threshold is 0 (zero).
+ * @return for a bare JID, it will return the highest prioritized
sessions. for a full JID, it will return the
* related session.
*/
- public SessionContext getHighestPrioSession(Entity entity, Integer
prioThreshold) {
- Integer currentPrio = Integer.MIN_VALUE;
- SessionData result = null;
+ public List<SessionContext> getHighestPrioSessions(Entity entity, Integer
prioThreshold) {
+ Integer currentPrio = prioThreshold == null ? Integer.MIN_VALUE :
prioThreshold;
+ List<SessionContext> results = new ArrayList<SessionContext>();
boolean isResourceSet = entity.isResourceSet();
@@ -301,20 +324,24 @@ public class ResourceRegistry {
SessionData sessionData = boundResources.get(resourceId);
if (sessionData == null) continue;
- if (isResourceSet) return sessionData.context; // no prio checks,
take the first proper one
+ if (isResourceSet) {
+ // if resource id matches, there can only be one result
+ // this overrides even parameter prio threshold
+ results.clear();
+ results.add(sessionData.context);
+ return results;
+ }
if (sessionData.priority > currentPrio) {
+ results.clear(); // discard all accumulated lower prio sessions
currentPrio = sessionData.priority;
- result = sessionData;
+ results.add(sessionData.context);
+ } else if(sessionData.priority.intValue() ==
currentPrio.intValue()) {
+ results.add(sessionData.context);
}
}
- if (prioThreshold != null && prioThreshold > currentPrio) {
- // no session over threshold
- return null;
- }
-
- return result == null ? null : result.context;
+ return results;
}
/**
Modified:
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
Fri Mar 12 23:04:58 2010
@@ -29,10 +29,12 @@ import org.apache.vysper.xmpp.authorizat
import org.apache.vysper.xmpp.authorization.AccountManagement;
import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy;
+import org.apache.vysper.xmpp.server.DefaultServerRuntimeContext;
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.server.TestSessionContext;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
+import org.apache.vysper.xmpp.state.resourcebinding.BindException;
import org.apache.vysper.xmpp.state.resourcebinding.ResourceRegistry;
/**
@@ -98,4 +100,86 @@ public class DeliveringStanzaRelayTestCa
throw e;
}
}
+
+ public void testRelayToTwoRecepients_DeliverToALL() throws
EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+ DefaultServerRuntimeContext serverRuntimeContext = new
DefaultServerRuntimeContext(null, null);
+
+ // !! DeliverMessageToHighestPriorityResourcesOnly = FALSE
+
serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
+
+ stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+ EntityImpl fromEntity = EntityImpl.parse("[email protected]");
+
+ EntityImpl toEntity = EntityImpl.parse("[email protected]");
+
+
+ TestSessionContext sessionContextToEntity_1_prio3 =
createSessionForTo(toEntity, 3); // NON-NEGATIVE
+ TestSessionContext sessionContextToEntity_2_prio0 =
createSessionForTo(toEntity, 0); // NON-NEGATIVE
+ TestSessionContext sessionContextToEntity_3_prio3 =
createSessionForTo(toEntity, 3); // NON-NEGATIVE
+ TestSessionContext sessionContextToEntity_4_prioMinus =
createSessionForTo(toEntity, -1); // not receiving, negative
+
+ Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity,
toEntity, "en", "Hello").build();
+
+ try {
+ stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+ Stanza recordedStanza_1 =
sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 1 delivered", recordedStanza_1);
+ Stanza recordedStanza_2 =
sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+ assertNotNull("stanza 2 delivered", recordedStanza_2);
+ Stanza recordedStanza_3 =
sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 3 delivered", recordedStanza_3);
+ Stanza recordedStanza_4 =
sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+ assertNull("stanza 4 delivered", recordedStanza_4);
+ } catch (DeliveryException e) {
+ throw e;
+ }
+
+ }
+
+ public void testRelayToTwoRecepients_DeliverToHIGHEST() throws
EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+ DefaultServerRuntimeContext serverRuntimeContext = new
DefaultServerRuntimeContext(null, null);
+
+ // !! DeliverMessageToHighestPriorityResourcesOnly = TRUE
+
serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(true);
+
+ stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+ EntityImpl fromEntity = EntityImpl.parse("[email protected]");
+
+ EntityImpl toEntity = EntityImpl.parse("[email protected]");
+
+
+ TestSessionContext sessionContextToEntity_1_prio3 =
createSessionForTo(toEntity, 3); // HIGHEST PRIO
+ TestSessionContext sessionContextToEntity_2_prio0 =
createSessionForTo(toEntity, 1); // not receiving
+ TestSessionContext sessionContextToEntity_3_prio3 =
createSessionForTo(toEntity, 3); // HIGHEST PRIO
+ TestSessionContext sessionContextToEntity_4_prioMinus =
createSessionForTo(toEntity, -1); // not receiving
+
+ Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity,
toEntity, "en", "Hello").build();
+
+ try {
+ stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+ Stanza recordedStanza_1 =
sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 1 delivered", recordedStanza_1);
+ Stanza recordedStanza_2 =
sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+ assertNull("stanza 2 not delivered", recordedStanza_2);
+ Stanza recordedStanza_3 =
sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 3 delivered", recordedStanza_3);
+ Stanza recordedStanza_4 =
sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+ assertNull("stanza 4 not delivered", recordedStanza_4);
+ } catch (DeliveryException e) {
+ throw e;
+ }
+
+ }
+
+ private TestSessionContext createSessionForTo(EntityImpl toEntity, final
int priority) {
+ TestSessionContext sessionContextToEntity =
TestSessionContext.createSessionContext(toEntity);
+ sessionContextToEntity.setSessionState(SessionState.AUTHENTICATED);
+ String toEntityRes =
resourceRegistry.bindSession(sessionContextToEntity);
+ resourceRegistry.setResourcePriority(toEntityRes, priority);
+ return sessionContextToEntity;
+ }
+
+
}
Modified:
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
---
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
(original)
+++
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
Fri Mar 12 23:04:58 2010
@@ -93,20 +93,37 @@ public class ResourceRegistryTestCase ex
assertEquals(2, resourceList.size());
assertTrue(sessionList.contains(sessionContext1));
assertTrue(sessionList.contains(sessionContext2));
-
- SessionContext hightestPrioSession =
resourceRegistry.getHighestPrioSession(entity, null);
- assertSame(resourceRegistry.getSessionContext(resourceId2),
hightestPrioSession);
-
+
+ List<SessionContext> highestPrioSessions =
resourceRegistry.getHighestPrioSessions(entity, null);
+ assertEquals(1, highestPrioSessions.size());
+ SessionContext highestPrioSession = highestPrioSessions.get(0);
+ assertSame(resourceRegistry.getSessionContext(resourceId2),
highestPrioSession);
+
resourceRegistry.setResourcePriority(resourceId1, 2); // make this
highes prio
- hightestPrioSession = resourceRegistry.getHighestPrioSession(entity,
null);
- assertSame(resourceRegistry.getSessionContext(resourceId1),
hightestPrioSession);
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity,
null);
+ assertEquals(1, highestPrioSessions.size());
+ highestPrioSession = highestPrioSessions.get(0);
+ assertSame(resourceRegistry.getSessionContext(resourceId1),
highestPrioSession);
+
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity,
2); // still highest prio
+ assertEquals(1, highestPrioSessions.size());
+ highestPrioSession = highestPrioSessions.get(0);
+ assertSame(resourceRegistry.getSessionContext(resourceId1),
highestPrioSession);
+
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity,
3); // now, all prios are below threshold
+ assertEquals(0, highestPrioSessions.size());
+
+ resourceRegistry.setResourcePriority(resourceId1, 4); // both are same
+ resourceRegistry.setResourcePriority(resourceId2, 4); // both are same
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity,
3);
+ assertEquals(2, highestPrioSessions.size());
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity,
4);
+ assertEquals(2, highestPrioSessions.size());
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity,
5);
+ assertEquals(0, highestPrioSessions.size());
+
+
sessionContext1.getServerRuntimeContext().getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
- hightestPrioSession = resourceRegistry.getHighestPrioSession(entity,
2); // still highest prio
- assertSame(resourceRegistry.getSessionContext(resourceId1),
hightestPrioSession);
-
- hightestPrioSession = resourceRegistry.getHighestPrioSession(entity,
3); // now, all prios are below threshold
- assertNull(hightestPrioSession);
-
}
public void testAddOneEntityMultipleResources_TolerateResourceIds() throws
EntityFormatException {