You are correct about the behavior.  I had the same question awhile
back, too.  Basically, the NotificationConsumerClient receives the
notification message from the producer based on the subscribed topic.
The consumer then goes thru its list of MessageListeners and calls
accepts() on each one.  Whichever listeners return true will get the
message in the process() method.

By design, this allows each listener to apply their own filter logic on
the message before needing to process it.  If you want your message
listener to accept all messages, make it return true in accepts().  You
can have one listener process all messages.  Or, you can have multiple
listeners, each processing a different type of message.

I originally thought about having the NotificationConsumerClient hold
the filters and map them to the corresponding listeners, so it wouldn't
have to call accepts() on each listener.  But, because there are many
ways in which a message can be filtered (i.e. by EPR, content, source,
etc), the logic is left to the listener to implement in accepts().


-----Original Message-----
From: dnguyen [mailto:[EMAIL PROTECTED] 
Sent: Tuesday, March 06, 2007 11:58 PM
To: [email protected]
Subject: Re: Problem updating subscription resource property.


Hi Dan.  I modified the wsn-producer to have 2 resources in the same war
with different topics on the producer side.  I modified the wsn-consumer
to have
2 different MessageListener(s) and to subscribe to separate topics using
TopicFilter.  Although both MessageListener(s) have their own
subscriptions, the NotificationMessage(s) are sent to the one
NotificationConsumerClient, which passes the messages to all registered
listeners, so both listeners get the both messages.  This means that the
listeners have to filter in the
accept() method using the same filter specified in the subscription.  OR
am I doing something wrong?  I checked the code, and it seems there is
no way around this.


Daniel Jemiolo wrote:
> 
> It does have this operation, but there are two resource types in the 
> wsn-producer sample: the producer resource and the subscription
resource.
> you need to add the WSRP Set capability to the *subscription* 
> resource's WSDL/definition. you can find the subscription portType in 
> /wsdl/WS-BaseNotification-1_3.wsdl - you'll need to copy in the 
> SetResourceProperties content.
> 
> As for your scenario, I believe what you want to do is have each 
> consumer resource subscribe() with the producer, specifying their own 
> filters each time. They will each get back their own subscription 
> reference, and the filter will already be set to what they need. 
> Trying to share one subscription among many consumer resources won't
work for two reasons:
> 
> 1. A WSN subscription can only have one consumer
(wsnt:ConsumerReference).
> 
> 2. Setting the filter and receiving notifications will not be an 
> atomic operation, and the latter takes an unknown amount of time, so 
> you have a big synchronization problem.
> 
> The right way to handle this is to use NotificationProducerClient in 
> your
> ConsumerCapability.initialize() method to subscribe() using the 
> filters you want. Then you don't have to filter on the consumer side -

> you subscribed with a given filter, so you know that you're going to 
> get messages that meet that filter. you only need to change the filter

> (with
> SetResourceProperties) if a consumer resource wants to change the type

> of data its processing, not because of something another consumer 
> resource has done.
> 
> Dan
> 
> 
> 
> dnguyen <[EMAIL PROTECTED]> wrote on 02/28/2007 06:48:07 PM:
> 
>> 
>> Hi.  The wsn-producer sample has that operation in the 
>> WsResource.wsdl already.  I am assuming that I do not need to create 
>> one for
> wsn-consumer? 
>> The scenario we are going for with the updates of the filter is that 
>> multiple client applications will be using this one wsn-consumer WS,
> since
>> we were not sure of dynamic deployment of wsn-consumer WS for 
>> distinct endpoints.  Each client would specify its own filter for 
>> data, which
> would
>> be passed to a "broker" (each client would also have an associated 
>> ConsumerCapability that will have that client's filter), which would
> then
>> update the filter to the existing subscription.  Since the notify 
>> call
> would
>> contain the combine data from the producer, the wsn-consumer WS would
> then
>> pass the data to each ConsumerCapability to be filtered again for 
>> their perspective clients.
>> 
>> We didn't want to do the filtering twice, but we don't see another 
>> way
> at
>> the moment.  Please let us know if there is a better way to do this.
>> 
>> Regards,
>> Dong
>> 
>> 
>> Daniel Jemiolo wrote:
>> > 
>> > You would need to add the WSRP Set capability ( 
>> > http://docs.oasis-open.org/wsrf/rpw-2/Set), with impl class 
>> >
org.apache.muse.ws.resource.properties.set.impl.SimpleSetCapability.
> It
>> > will probably be easier if you add the WSRP SetResourceProperties 
>> > operation to your WSDL and have wsdl2java handle all of the
> code/artifact
>> > generation. The SetResourceProperties WSDL content can be found in 
>> > the
> 
>> > 'wsrf' sample app (see /wsdl/WsResource.wsdl).
>> > 
>> > However, no one has ever tried to switch a subscription filter 
>> > mid-lifecycle, and so I believe a serializer for the Filter type 
>> > will
> also
>> > be needed. Let me work on this for a few minutes and assuming I can
> fix
>> > the scenario, I'll write back and have you try the nightly build
> (you'll
>> > still need to add SetResourceProperties to your WSDL).
>> > 
>> > Dan
>> > 
>> > 
>> > 
>> > dnguyen <[EMAIL PROTECTED]> wrote on 02/28/2007 04:39:00
PM:
>> > 
>> >> 
>> >> I am trying to update the filter in the SubscriptionClient through
> the
>> >> updateResourceProperty() method but not having any success.  I get
> the
>> > error
>> >> below:
>> >> 
>> >> 
>> >> ******************************
>> >> org.apache.muse.ws.addressing.soap.SoapFault: [ID =
>> > 'ActionNotSupported']
>> >> The re
>> >> source at 'SubscriptionManager' does not expose an operation with 
>> >> the WS-Action 'http://docs.oasis-open.
>> >> org/wsrf/rpw-2/SetResourceProperties/SetResourcePropertiesRequest'
>> >> through any of its capabilities.
>> >>         at
>> >> 
>> > 
> org.apache.muse.core.AbstractResourceClient.invoke(AbstractResourceCli
> ent.java:279)
>> >>         at
>> >> 
>> > 
> org.apache.muse.core.AbstractResourceClient.invoke(AbstractResourceCli
> ent.java:235)
>> >>         at
>> >> org.apache.muse.ws.resource.remote.WsResourceClient.
>> >> setResourceProperties(WsResourceClient.java:175)
>> >>         at
>> >> org.apache.muse.ws.resource.remote.WsResourceClient.
>> >> updateResourceProperty(WsResourceClient.java:202)
>> >>         at org.apache.muse.test.wsn.WsnTestClient2.main(Unknown
> Source)
>> >> ************************************
>> >> 
>> >> 
>> >> I then added this to the SubscriptionManager resource in the
>> > wsn-producer
>> >> muse.xml file:
>> >> 
>> >> 
>> >> *************************************
>> >>         <capability>
>> >> 
>> >> 
>> > 
> <capability-uri>http://docs.oasis-open.org/wsrf/rpw-2/Get</capability-
> uri>
>> >> 
>> >> 
>
<java-capability-class>org.apache.muse.ws.resource.properties.get.impl.
>> >> SimpleGetCapability</java-capability-class>
>> >>         </capability>
>> >> ************************************
>> >> 
>> >> 
>> >> But I still get the same exception.  I looked in the source code 
>> >> for anything related to SetResourceProperties but didn't find 
>> >> anything
>> > helpful. 
>> >> I read that the WS-N spec states that the impl can throw an 
>> >> exception
> in
>> >> this case, but it also says this is allowed.  I am not sure if 
>> >> this
> is
>> >> allowable in Muse, but the error message indicates it is possible 
>> >> by
>> > adding
>> >> the correct capability.  Also, I am not sure I am doing this
right. 
> The
>> >> example I am using is below
>> >> 
>> >> 
>> >> *******************************
>> >> package org.apache.muse.test.wsn;
>> >> 
>> >> import javax.xml.namespace.QName;
>> >> 
>> >> import java.net.InetAddress;
>> >> import java.net.URI;
>> >> import java.net.UnknownHostException;
>> >> 
>> >> import org.apache.muse.ws.addressing.EndpointReference;
>> >> import
>> > org.apache.muse.ws.notification.remote.NotificationProducerClient;
>> >> import org.apache.muse.ws.notification.remote.SubscriptionClient;
>> >> import org.apache.muse.ws.notification.impl.*;
>> >> import org.apache.muse.ws.notification.WsnConstants;
>> >> import org.apache.muse.util.xml.*;
>> >> 
>> >> 
>> >> public class WsnTestClient2
>> >> {
>> >>     public static URI getLocalAddress(String contextPath, int
port)
>> >>         throws UnknownHostException
>> >>     {
>> >>         String ip = InetAddress.getLocalHost().getHostAddress();
>> >> 
>> >>         StringBuffer address = new StringBuffer();
>> >>         address.append("http://";);
>> >>         address.append(ip);
>> >>         address.append(':');
>> >>         address.append(port);
>> >> 
>> >>         if (contextPath.charAt(0) != '/')
>> >>             address.append('/');
>> >> 
>> >>         address.append(contextPath);
>> >> 
>> >>         return URI.create(address.toString());
>> >>     }
>> >> 
>> >>     public static void main(String[] args)
>> >>     {
>> >>         try
>> >>         {
>> >>             //
>> >>             // change these to point to different
> applications/servers
>> >>             //
>> >>             String webAppRoot = "/isr-producer/services";
>> >>             int producer_port =
>> >> Integer.parseInt(System.getProperty("producer_port","8080"));
>> >>             int consumer_port =
>> >> Integer.parseInt(System.getProperty("consumer_port","8080"));
>> >> 
>> >>             //
>> >>             // create producer EPR/client, and use it to subscribe
>> >>             // the consumer to all messages
>> >>             //
>> >> 
>> >>             String contextPath = webAppRoot + "/IsrResource";
>> >>             URI address = (args.length == 0) ? 
>> > getLocalAddress(contextPath,
>> >> producer_port) : URI.create(args[0] + contextPath);
>> >>             EndpointReference epr = new 
>> >> EndpointReference(address);
>> >> 
>> >>             webAppRoot = "/isr-consumer/services";
>> >>             contextPath = webAppRoot + "/IsrConsumer";
>> >>             address = getLocalAddress(contextPath, consumer_port);
>> >>             EndpointReference consumer = new
> EndpointReference(address);
>> >> 
>> >>             //
>> >>             // null filter == send all messages to consumer
>> >>             //
>> >>             NotificationProducerClient producer = new 
>> >> NotificationProducerClient(epr);
>> >>             producer.setTrace(true);
>> >> 
>> >>             SubscriptionClient subscription = null;
>> >> 
>> >>             while( true )
>> >>             {
>> >>                 System.out.println( "s - subscribe, u - 
>> >> unsubscribe,
> v -
>> >> view, q - quit" );
>> >> 
>> >>                 char choice = (char)System.in.read();
>> >> 
>> >>                 if( choice == 's' )
>> >>                 {
>> >>                     subscription = producer.subscribe(consumer, 
>> >> null,
> 
>> > null);
>> >>                 }
>> >>                 else if( choice == 'u' )
>> >>                 {
>> >>                     subscription.destroy();
>> >>                 }
>> >>                 else if( choice == 'v' )
>> >>                 {
>> >> 
>> >> System.out.println(XmlUtils.toString(subscription.
>> >> getResourceProperty(WsnConstants.FILTER_QNAME)[0]));
>> >>                 }
>> >>                 else if( choice == 'f' )
>> >>                 {
>> >>                     QName messageName = new 
>> >> QName("http://ws.apache.org/muse/test/wsrf";, "MyTopic", "tns");
>> >> 
>> >> subscription.updateResourceProperty(WsnConstants.FILTER_QNAME, new
>> > Object[]
>> >> { new TopicFilter(messageName) });
>> >>                 }
>> >>                 else if( choice == 'q' )
>> >>                 {
>> >>                     break;
>> >>                 }
>> >>             }
>> >>         }
>> >> 
>> >>         catch (Throwable error)
>> >>         {
>> >>             error.printStackTrace();
>> >>         }
>> >>     }
>> >> }
>> >> 
>> >> --
>> >> View this message in context: 
>> >> http://www.nabble.com/Problem-updating-
>> >> subscription-resource-property.-tf3323382.html#a9239032
>> >> Sent from the Muse User mailing list archive at Nabble.com.
>> >> 
>> >> 
>> >> ------------------------------------------------------------------
>> >> --- To unsubscribe, e-mail: [EMAIL PROTECTED]
>> >> For additional commands, e-mail: [EMAIL PROTECTED]
>> >> 
>> > 
>> > 
>> > -------------------------------------------------------------------
>> > -- To unsubscribe, e-mail: [EMAIL PROTECTED]
>> > For additional commands, e-mail: [EMAIL PROTECTED]
>> > 
>> > 
>> > 
>> 
>> --
>> View this message in context: http://www.nabble.com/Problem-updating-
>> subscription-resource-property.-tf3323382.html#a9241004
>> Sent from the Muse User mailing list archive at Nabble.com.
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [EMAIL PROTECTED]
>> For additional commands, e-mail: [EMAIL PROTECTED]
>> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [EMAIL PROTECTED]
> For additional commands, e-mail: [EMAIL PROTECTED]
> 
> 
> 

--
View this message in context:
http://www.nabble.com/Problem-updating-subscription-resource-property.-t
f3323382.html#a9348119
Sent from the Muse User mailing list archive at Nabble.com.


---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to