User: user57  
  Date: 01/07/31 22:19:48

  Modified:    src/main/org/jbossmq/server Tag: jboss_buildmagic
                        ClientConsumer.java JBossMQService.java
                        JBossMQServiceMBean.java JMSDestination.java
                        JMSQueue.java JMSServer.java JMSTopic.java
                        StateManager.java StateManagerMBean.java
  Log:
   o updated from HEAD
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.10.2.1  +52 -27    jbossmq/src/main/org/jbossmq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/ClientConsumer.java,v
  retrieving revision 1.10
  retrieving revision 1.10.2.1
  diff -u -r1.10 -r1.10.2.1
  --- ClientConsumer.java       2001/07/28 00:30:16     1.10
  +++ ClientConsumer.java       2001/08/01 05:19:48     1.10.2.1
  @@ -109,6 +109,9 @@
        {
                cat.debug("Queueing outbound message: "+message);
   
  +             if( !enabled )
  +                     return;
  +
                LinkedList l = (LinkedList)destinationSubscriptions.get( 
message.getJMSDestination() );
                if( l == null )
                        throw new JMSException("No subscription found for that 
destination.");
  @@ -138,35 +141,29 @@
                cat.debug("Adding subscription for: "+req);
                req.dc = dc;
                
  +             JMSDestination 
queue=(JMSDestination)server.getJMSDestination(req.destination);
  +             if (queue==null) throw new JMSException("This destination does not 
exist !");
  +
  +             SubscriptionState subState = new SubscriptionState( req );
  +             subState.dest = queue.addSubscriber(req);
  +                             
                synchronized (subscriptions ) {
  -                     SubscriptionState subState = new SubscriptionState( req );
  -                     
                        HashMap subscriptionsClone = (HashMap)subscriptions.clone();
                        subscriptionsClone.put(new Integer(req.subscriptionId), 
subState );
                        subscriptions = subscriptionsClone;
                                
                        LinkedList ll = (LinkedList)destinationSubscriptions.get( 
req.destination );
                        if( ll == null ) {
  -
  -                             JMSDestination 
queue=(JMSDestination)server.getJMSDestination(req.destination);
  -                             if (queue==null) throw new JMSException("This 
destination does not exist !");
  -                             
                                ll = new LinkedList();
  -                             ll.add( subState );
  -                             
  -                             HashMap destinationSubscriptionsClone = 
(HashMap)destinationSubscriptions.clone();
  -                             destinationSubscriptionsClone.put(req.destination, ll 
);                                
  -                             destinationSubscriptions = 
destinationSubscriptionsClone;
  -                             subState.dest = queue.addConsumer(req, this);
  -                             
  -                     } else {
  -                             LinkedList llClone = (LinkedList)ll.clone();
  -                             llClone.add( req );
  -                             
  -                             HashMap destinationSubscriptionsClone = 
(HashMap)destinationSubscriptions.clone();
  -                             destinationSubscriptionsClone.put(req.destination, 
llClone);
  -                             destinationSubscriptions = 
destinationSubscriptionsClone;
  +                             queue.addConsumer(req,this);
                        }
  +                     else
  +                             ll = (LinkedList)ll.clone();
  +                     ll.add( subState );
  +                             
  +                     HashMap destinationSubscriptionsClone = 
(HashMap)destinationSubscriptions.clone();
  +                     destinationSubscriptionsClone.put(req.destination, ll );       
                         
  +                     destinationSubscriptions = destinationSubscriptionsClone;
                }                       
        }
   
  @@ -243,15 +240,31 @@
                JMSDestination queue = 
server.getJMSDestination(req.subscription.destination);
                if( queue == null )
                        throw new JMSException("The subscription's destination does 
not exist");
  +
  +             if( enabled && req.subscription.actsLikeAQueue ) {
  +                     SpyMessage message = req.dest.receiveNoWait(req.subscription);
   
  -             // Is it a receiveNoWait()
  -             if( wait == -1 && req.subscription.actsLikeAQueue ) {
  -                     return req.dest.receiveNoWait(req.subscription);
  +                     if( message != null ) {
  +                             AcknowledgementRequest ack = new 
AcknowledgementRequest();
  +                             ack.destination = message.getJMSDestination();
  +                             ack.messageID = message.getJMSMessageID();
  +                             ack.subscriberId = subscriberId;
  +                             ack.isAck = false;
  +
  +                             synchronized (unacknowledgedMessages) {
  +                                     unacknowledgedMessages.put(ack, message);
  +                             }
  +                             return message;
  +                     }
  +             }
  +
  +             // If not receiveNoWait()
  +             if( wait != -1 ) {
  +                     // Notify the queue. It could be waiting for a consumer
  +                     req.subscription.receiving = true;
  +                     req.dest.notifyMessageAvailable();
                }
   
  -             // Notify the queue. It could be waiting for a consumer
  -             req.subscription.receiving = true;
  -             req.dest.notifyMessageAvailable();
                return null;            
        }
   
  @@ -330,6 +343,9 @@
   
                cat.debug(""+this+"->scanExclusiveQueue(queue="+queue+")");
                
  +             if( !enabled )
  +                     return false;
  +
                Iterator i = queue.messages.iterator();
                while( i.hasNext() ) {
                        
  @@ -381,6 +397,15 @@
        public void setEnabled(boolean enabled) {
                cat.debug(""+this+"->setEnabled(enabled="+enabled+")");
                this.enabled = enabled;
  +             if(enabled){
  +                     // queues might be waiting for messages.
  +                     for(Iterator it = 
destinationSubscriptions.keySet().iterator();it.hasNext();){
  +                             SpyDestination destination = (SpyDestination) 
it.next();
  +                             JMSDestination dest = 
server.getJMSDestination(destination);
  +                             if(dest != null)
  +                                     dest.notifyMessageAvailable();
  +                     }
  +             }
        }
   
        public String toString() {
  
  
  
  1.5.2.1   +181 -217  jbossmq/src/main/org/jbossmq/server/JBossMQService.java
  
  Index: JBossMQService.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JBossMQService.java,v
  retrieving revision 1.5
  retrieving revision 1.5.2.1
  diff -u -r1.5 -r1.5.2.1
  --- JBossMQService.java       2001/07/28 00:30:16     1.5
  +++ JBossMQService.java       2001/08/01 05:19:48     1.5.2.1
  @@ -49,7 +49,7 @@
   
        
        public String getName() {
  -             return "JBossMQ.Server";
  +             return "JBossMQ";
        }
        
   
  @@ -80,50 +80,21 @@
                jmsServer = new JMSServer();
        }
   
  -     public SpyQueue createQueue(String name) throws JMSException
  +     public void createQueue(String name) throws Exception
        {
  -             /*
  -             cat.debug("new queue : "+name);
  -     
  -             SpyQueue newQueue=new SpyQueue(name);
  -             if (destinations.containsKey(newQueue)) throw new JMSException("This 
queue already exists !");
  -             
  -             JMSDestination queue=new JMSDestination(newQueue,null,this);
  -             
  -             InitialContext ctx = null;
  -             Context subcontext = null;
  -                             
  -             try{
  -               //Get an InitialContext
  -               ctx = new InitialContext();
  -               
  -               //get the queues subcontext
  -               subcontext = (Context)ctx.lookup("queue");
  -               
  -               //add this queue to the jndi queue context
  -               subcontext.rebind(name,newQueue);
   
  +             try {
  +                     
  +                     ObjectName objectName = new ObjectName( 
"JBossMQ:serivce=Queue,name="+name );
  +                     
getServer().createMBean("org.jbossmq.server.QueueManager",objectName);
  +                     getServer().invoke(objectName,"init", new Object[] {}, new 
String[] {});
  +                     getServer().invoke(objectName,"start", new Object[] {}, new 
String[] {});
  +                     
  +             } catch ( Exception e ) {
  +                     category.warn("Could not create the destination: ", e );
  +                     throw e;
                }
  -             catch(Exception e){
  -               //remove the queue from the destinations
  -               synchronized (destinations) {
  -                 HashMap newMap=(HashMap)destinations.clone();
  -                 newMap.remove(newQueue);
  -                 destinations=newMap;
  -               }
  -               JMSException newE = new JMSException("Exception unbinding Queue from 
the JNDI queue context");
  -               newE.setLinkedException(e);
  -               throw newE;
  -             }
                
  -             //Add this new JMSServerQueue to the list
  -             synchronized (destinations) {
  -                     HashMap newMap=(HashMap)destinations.clone();
  -                     newMap.put(newQueue,queue);     
  -                     destinations=newMap;
  -             }
  -             */
  -
                /*
                //check to see if this queue already exists in the config structure
                boolean queueExists = false;
  @@ -173,53 +144,23 @@
                */
                
                //return newQueue;
  -             return null;
        }
   
        // Administration calls 
  -     public SpyTopic createTopic(String name) throws JMSException
  +     public void createTopic(String name) throws Exception
        {
  -             /*
  -             cat.debug("new topic : "+name);
  -     
  -             SpyTopic newTopic=new SpyTopic(name);
  -             if (destinations.containsKey(newTopic)) throw new JMSException("This 
topic already exists !");
  -             
  -             JMSDestination queue=new JMSDestination(newTopic,null,this);
  -             
  -             InitialContext ctx = null;
  -             Context subcontext = null;
  -
  -             try{
  -               //Get an InitialContext
  -               ctx = new InitialContext();
  -               
  -               //get the topics subcontext
  -               subcontext = (Context)ctx.lookup("topic");
  -               
  -               //add this topic to the jndi topic context
  -               subcontext.rebind(name,newTopic);
  -
  -             }
  -             catch(Exception e){
  -               //remove the topic from the destinations
  -               synchronized (destinations) {
  -                 HashMap newMap=(HashMap)destinations.clone();
  -                 newMap.remove(newTopic);    
  -                 destinations=newMap;
  -               }
  -               JMSException newE = new JMSException("Exception binding Topic to the 
JNDI topic context");
  -               newE.setLinkedException(e);
  -               throw newE;
  -             }
  -
  -             //Add this new JMSServerQueue to the list
  -             synchronized (destinations) {
  -                     HashMap newMap=(HashMap)destinations.clone();
  -                     newMap.put(newTopic,queue);     
  -                     destinations=newMap;
  +             try {
  +                     
  +                     ObjectName objectName = new ObjectName( 
"JBossMQ:serivce=Topic,name="+name );
  +                     
getServer().createMBean("org.jbossmq.server.TopicManager",objectName);
  +                     getServer().invoke(objectName,"init", new Object[] {}, new 
String[] {});
  +                     getServer().invoke(objectName,"start", new Object[] {}, new 
String[] {});
  +                     
  +             } catch ( Exception e ) {
  +                     category.warn("Could not create the destination: ", e );
  +                     throw e;
                }
  -             */
  +             
                /*
   
                //check to see if this topic already exists in the config structure
  @@ -268,145 +209,168 @@
                }
                return newTopic;
                */
  -             return null;
                
        }
  -
  -             public void destroyQueue(String name) throws JMSException
  -     {
  -             /*
  -             cat.debug("destroy queue : "+name);
  -     
  -             SpyQueue destroyQueue=new SpyQueue(name);
  -             if (!destinations.containsKey(destroyQueue)) throw new 
JMSException("This queue doesn't exist!");
  -             
  -             JMSDestination queue=new JMSDestination(destroyQueue,null,this);
  -             
  -             InitialContext ctx = null;
  -             Context subcontext = null;
   
  -             try{
  -               //Get an InitialContext
  -               ctx = new InitialContext();
  -               
  -               //get the queues subcontext
  -               subcontext = (Context)ctx.lookup("queue");
  -               
  -               //remove this queue to the jndi queue context
  -               subcontext.unbind(name);
  -
  -             }
  -             catch(Exception e){
  -               JMSException newE = new JMSException("Exception unbinding Queue from 
the JNDI queue context");
  -               newE.setLinkedException(e);
  -               throw newE;
  -             }
  -
  -             //remove this new JMSServerQueue from the list
  -             synchronized (destinations) {
  -                     HashMap newMap=(HashMap)destinations.clone();
  -                     newMap.remove(destroyQueue);    
  -                     destinations=newMap;
  -             }
  -             */
  -
  -             /*
  -             //check to see if this queue already exists in the config structure
  -             //and remove it if it does
  -             XElement element = null;
  +     public void destroyQueue(String name) throws Exception {
  +             try {
  +                     
  +                     ObjectName objectName = new ObjectName( 
"JBossMQ:serivce=Queue,name="+name );
  +                     getServer().invoke(objectName,"stop", new Object[] {}, new 
String[] {});
  +                     getServer().invoke(objectName,"destroy", new Object[] {}, new 
String[] {});
  +                     getServer().unregisterMBean(objectName);
  +                     
  +             } catch ( Exception e ) {
  +                     category.warn("Could not destory the destination: ", e );
  +                     throw e;
  +             }
  +             
  +         /*
  +         cat.debug("destroy queue : "+name);
  +         
  +         SpyQueue destroyQueue=new SpyQueue(name);
  +         if (!destinations.containsKey(destroyQueue)) throw new JMSException("This 
queue doesn't exist!");
  +         
  +         JMSDestination queue=new JMSDestination(destroyQueue,null,this);
  +         
  +         InitialContext ctx = null;
  +         Context subcontext = null;
  +         
  +         try{
  +           //Get an InitialContext
  +           ctx = new InitialContext();
  +           
  +           //get the queues subcontext
  +           subcontext = (Context)ctx.lookup("queue");
  +           
  +           //remove this queue to the jndi queue context
  +           subcontext.unbind(name);
  +         
  +         }
  +         catch(Exception e){
  +           JMSException newE = new JMSException("Exception unbinding Queue from the 
JNDI queue context");
  +           newE.setLinkedException(e);
  +           throw newE;
  +         }
  +         
  +         //remove this new JMSServerQueue from the list
  +         synchronized (destinations) {
  +             HashMap newMap=(HashMap)destinations.clone();
  +             newMap.remove(destroyQueue);    
  +             destinations=newMap;
  +         }
  +         */
  +
  +         /*
  +         //check to see if this queue already exists in the config structure
  +         //and remove it if it does
  +         XElement element = null;
  +         
  +         try{
  +           Enumeration enum = serverConfig.getElementsNamed("Queue");
  +         
  +           while( enum.hasMoreElements() ) {
  +             element = (XElement)enum.nextElement();
  +             if(name.equals(element.getField("Name"))){
  +               //remove this queue element from the config structure
  +               element.removeFromParent();
  +               break;
  +             }
  +             
  +           }
  +             
  +           //save the new state in the config file
  +           //saveConfig();
  +           
  +           
  +         }
  +         catch(Exception ioe){
  +           ResourceAllocationException newE = new 
ResourceAllocationException("Exception saving updated configuration state");
  +           newE.setLinkedException(ioe);
  +           throw newE;
  +         }
  +         */
   
  -             try{
  -               Enumeration enum = serverConfig.getElementsNamed("Queue");
  -             
  -               while( enum.hasMoreElements() ) {
  -                 element = (XElement)enum.nextElement();
  -                 if(name.equals(element.getField("Name"))){
  -                   //remove this queue element from the config structure
  -                   element.removeFromParent();
  -                   break;
  -                 }
  -                 
  -               }
  -                 
  -               //save the new state in the config file
  -               //saveConfig();
  -               
  -               
  -             }
  -             catch(Exception ioe){
  -               ResourceAllocationException newE = new 
ResourceAllocationException("Exception saving updated configuration state");
  -               newE.setLinkedException(ioe);
  -               throw newE;
  -             }
  -             */              
  -             
        }
  -
  -     public void destroyTopic(String name) throws JMSException
  -     {
  -                     /*
  -             cat.debug("destroy topic : "+name);
  -     
  -             SpyTopic destroyTopic=new SpyTopic(name);
  -             if (!destinations.containsKey(destroyTopic)) throw new 
JMSException("This topic doesn't exist!");
  -             
  -             JMSDestination queue=new JMSDestination(destroyTopic,null,this);
  -             
  -             InitialContext ctx = null;
  -             Context subcontext = null;
   
  -             try{
  -               //Get an InitialContext
  -               ctx = new InitialContext();
  -               
  -               //get the topics subcontext
  -               subcontext = (Context)ctx.lookup("topic");
  -               
  -               //remove this topic to the jndi topic context
  -               subcontext.unbind(name);
  +     public void destroyTopic(String name) throws Exception {
  +             try {
  +                     
  +                     ObjectName objectName = new ObjectName( 
"JBossMQ:serivce=Topic,name="+name );
  +                     getServer().invoke(objectName,"stop", new Object[] {}, new 
String[] {});
  +                     getServer().invoke(objectName,"destroy", new Object[] {}, new 
String[] {});
  +                     getServer().unregisterMBean(objectName);
  +                     
  +             } catch ( Exception e ) {
  +                     category.warn("Could not destory the destination: ", e );
  +                     throw e;
  +             }
  +             
  +         /*
  +         cat.debug("destroy queue : "+name);
  +         
  +         SpyQueue destroyQueue=new SpyQueue(name);
  +         if (!destinations.containsKey(destroyQueue)) throw new JMSException("This 
queue doesn't exist!");
  +         
  +         JMSDestination queue=new JMSDestination(destroyQueue,null,this);
  +         
  +         InitialContext ctx = null;
  +         Context subcontext = null;
  +         
  +         try{
  +           //Get an InitialContext
  +           ctx = new InitialContext();
  +           
  +           //get the queues subcontext
  +           subcontext = (Context)ctx.lookup("queue");
  +           
  +           //remove this queue to the jndi queue context
  +           subcontext.unbind(name);
  +         
  +         }
  +         catch(Exception e){
  +           JMSException newE = new JMSException("Exception unbinding Queue from the 
JNDI queue context");
  +           newE.setLinkedException(e);
  +           throw newE;
  +         }
  +         
  +         //remove this new JMSServerQueue from the list
  +         synchronized (destinations) {
  +             HashMap newMap=(HashMap)destinations.clone();
  +             newMap.remove(destroyQueue);    
  +             destinations=newMap;
  +         }
  +         */
  +
  +         /*
  +         //check to see if this queue already exists in the config structure
  +         //and remove it if it does
  +         XElement element = null;
  +         
  +         try{
  +           Enumeration enum = serverConfig.getElementsNamed("Queue");
  +         
  +           while( enum.hasMoreElements() ) {
  +             element = (XElement)enum.nextElement();
  +             if(name.equals(element.getField("Name"))){
  +               //remove this queue element from the config structure
  +               element.removeFromParent();
  +               break;
  +             }
  +             
  +           }
  +             
  +           //save the new state in the config file
  +           //saveConfig();
  +           
  +           
  +         }
  +         catch(Exception ioe){
  +           ResourceAllocationException newE = new 
ResourceAllocationException("Exception saving updated configuration state");
  +           newE.setLinkedException(ioe);
  +           throw newE;
  +         }
  +         */
   
  -             }
  -             catch(Exception e){
  -               JMSException newE = new JMSException("Exception unbinding Topic from 
the JNDI topic context");
  -               newE.setLinkedException(e);
  -               throw newE;
  -             }
  -
  -             //remove this new JMSServerQueue from the list
  -             synchronized (destinations) {
  -                     HashMap newMap=(HashMap)destinations.clone();
  -                     newMap.remove(destroyTopic);    
  -                     destinations=newMap;
  -             }
  -             */
  -             
  -             /*
  -             //check to see if this topic already exists in the config structure
  -             //and remove it if it does
  -             XElement element = null;
  -
  -             try{
  -               Enumeration enum = serverConfig.getElementsNamed("Topic");
  -             
  -               while( enum.hasMoreElements() ) {
  -                 element = (XElement)enum.nextElement();
  -                 if(name.equals(element.getField("Name"))){
  -                   //remove this topic element from the config structure
  -                   element.removeFromParent();
  -                   break;
  -                 }
  -                 
  -               }
  -                 
  -               //save the new state in the config file
  -               //saveConfig();
  -             }
  -             catch(Exception ioe){
  -               ResourceAllocationException newE = new 
ResourceAllocationException("Exception saving updated configuration state");
  -               newE.setLinkedException(ioe);
  -               throw newE;
  -             }
  -             */              
  -             
        }
   }
  
  
  
  1.6.2.1   +53 -1     jbossmq/src/main/org/jbossmq/server/JBossMQServiceMBean.java
  
  Index: JBossMQServiceMBean.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JBossMQServiceMBean.java,v
  retrieving revision 1.6
  retrieving revision 1.6.2.1
  diff -u -r1.6 -r1.6.2.1
  --- JBossMQServiceMBean.java  2001/07/28 00:30:16     1.6
  +++ JBossMQServiceMBean.java  2001/08/01 05:19:48     1.6.2.1
  @@ -75,4 +75,56 @@
      // Public --------------------------------------------------------
   
     JMSServer getJMSServer();    
  +
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +  public void createQueue(String name) throws Exception;
  +
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +  public void createTopic(String name) throws Exception;
  +
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +  public void destroyQueue(String name) throws Exception;
  +
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +  public void destroyTopic(String name) throws Exception;
   }
  
  
  
  1.8.2.1   +4 -2      jbossmq/src/main/org/jbossmq/server/JMSDestination.java
  
  Index: JMSDestination.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSDestination.java,v
  retrieving revision 1.8
  retrieving revision 1.8.2.1
  diff -u -r1.8 -r1.8.2.1
  --- JMSDestination.java       2001/07/28 00:30:16     1.8
  +++ JMSDestination.java       2001/08/01 05:19:48     1.8.2.1
  @@ -86,7 +86,9 @@
        org.apache.log4j.Category cat;
   
        // Package protected ---------------------------------------------
  -     abstract JMSDestination addConsumer(Subscription sub, ClientConsumer c) throws 
JMSException;
  +     abstract void addConsumer(Subscription sub,ClientConsumer c) throws 
JMSException;
  +
  +     abstract JMSDestination addSubscriber(Subscription sub) throws JMSException;
   
        abstract public void notifyMessageAvailable();
        
  
  
  
  1.1.2.1   +7 -3      jbossmq/src/main/org/jbossmq/server/JMSQueue.java
  
  Index: JMSQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.1.2.1
  diff -u -r1.1 -r1.1.2.1
  --- JMSQueue.java     2001/07/28 00:33:38     1.1
  +++ JMSQueue.java     2001/08/01 05:19:48     1.1.2.1
  @@ -111,9 +111,13 @@
        ExclusiveQueue exclusiveQueue;
   
        // Package protected ---------------------------------------------
  -     JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws 
JMSException {
  -             cat.debug("Adding consumer: "+c+")");
  +     void addConsumer(Subscription sub,ClientConsumer c) throws JMSException {
  +             cat.debug("Adding consumer: "+c);
                exclusiveQueue.addConsumer(c);
  +     }
  +
  +     JMSDestination addSubscriber(Subscription sub) throws JMSException {
  +             cat.debug("Adding subscriber: "+sub);
                return this;
        }
   
  
  
  
  1.15.2.1  +17 -10    jbossmq/src/main/org/jbossmq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSServer.java,v
  retrieving revision 1.15
  retrieving revision 1.15.2.1
  diff -u -r1.15 -r1.15.2.1
  --- JMSServer.java    2001/07/28 00:30:16     1.15
  +++ JMSServer.java    2001/08/01 05:19:48     1.15.2.1
  @@ -46,8 +46,6 @@
    */
   public class JMSServer 
   {
  -
  -
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
  @@ -217,13 +215,13 @@
        //A connection has sent a new message
        public void addMessage(ConnectionToken dc, SpyMessage val, Long txId) throws 
JMSException 
        {
  -     
                cat.debug("INCOMING: (TX="+txId+")"+dc.getClientID()+" => 
"+val.getJMSDestination());   
                JMSDestination 
queue=(JMSDestination)destinations.get(val.getJMSDestination());
                if (queue==null) throw new JMSException("This destination does not 
exist !");   
  +
                //Add the message to the queue
  +             val.setReadOnlyMode();
                queue.addMessage(val, txId);
  -             
        }
        
        /**
  @@ -376,11 +374,14 @@
        //A connection object wants to subscribe to a Destination
        public void subscribe(ConnectionToken dc, Subscription sub) throws JMSException
        {
  -             cat.debug("Server: 
subscribe(dest="+sub.destination+",idConnection="+dc.getClientID()+")");
  -
  -             ClientConsumer ClientConsumer = getClientConsumer(dc);
  -
  -             ClientConsumer.addSubscription(sub);
  +             try {
  +                     cat.debug("Server: 
subscribe(dest="+sub.destination+",idConnection="+dc.getClientID()+")");
  +                     ClientConsumer ClientConsumer = getClientConsumer(dc);
  +                     ClientConsumer.addSubscription(sub);
  +             } catch ( JMSException e ) {
  +                     cat.debug("Exception:", e);
  +                     throw e;
  +             }
                
        }
   
  @@ -515,11 +516,17 @@
        stateManager = newStateManager;
   }
   
  +     public static final String JBOSS_VESION = "JBossMQ ver. 0.9b";
  +
        public void restoreMessage(SpyMessage message) 
        {
                JMSDestination 
queue=(JMSDestination)destinations.get(message.getJMSDestination());
                if (queue==null) throw new RuntimeException("This destination does not 
exist!");
                //Add the message to the queue
                queue.restoreMessage(message);
  +     }
  +
  +     public String toString() {
  +             return JBOSS_VESION;
        }
   }
  
  
  
  1.1.2.1   +15 -5     jbossmq/src/main/org/jbossmq/server/JMSTopic.java
  
  Index: JMSTopic.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSTopic.java,v
  retrieving revision 1.1
  retrieving revision 1.1.2.1
  diff -u -r1.1 -r1.1.2.1
  --- JMSTopic.java     2001/07/28 00:33:38     1.1
  +++ JMSTopic.java     2001/08/01 05:19:48     1.1.2.1
  @@ -112,8 +112,20 @@
   
   
        // Package protected ---------------------------------------------
  -     JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws 
JMSException {
  +     void addConsumer(Subscription sub,ClientConsumer c) throws JMSException {
  +             cat.debug("Adding consumer: "+c);
  +             SpyTopic topic = (SpyTopic)sub.destination;
  +             DurableSubcriptionID id = topic.getDurableSubscriptionID();
  +             if( id!=null ) {
  +                     JMSQueue queue = getDurableSubscription(id);
  +                     queue.addConsumer(sub,c);
  +             }
  +             else {
  +                     sharedQueue.addConsumer(c);
  +             }
  +     }
   
  +     JMSDestination addSubscriber(Subscription sub) throws JMSException {
                SpyTopic topic = (SpyTopic)sub.destination;
                DurableSubcriptionID id = topic.getDurableSubscriptionID();
                if( id!=null ) {
  @@ -129,11 +141,9 @@
                                }
                        }
                                
  -                     return queue.addConsumer(sub, c);
  +                     return queue.addSubscriber(sub);
                                                
                } else {                
  -                     cat.debug("Adding consumer: "+c);
  -                     sharedQueue.addConsumer(c);
                        return this;
                }
        }
  
  
  
  1.4.2.1   +93 -84    jbossmq/src/main/org/jbossmq/server/StateManager.java
  
  Index: StateManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/StateManager.java,v
  retrieving revision 1.4
  retrieving revision 1.4.2.1
  diff -u -r1.4 -r1.4.2.1
  --- StateManager.java 2001/07/28 00:30:16     1.4
  +++ StateManager.java 2001/08/01 05:19:48     1.4.2.1
  @@ -200,9 +200,14 @@
                                
                                category.debug("Restarting Durable Subscription: 
"+clientId+","+name+","+topicName);
                                SpyTopic topic=new SpyTopic(topicName);
  -
                                JMSTopic dest = 
(JMSTopic)server.getJMSDestination(topic);
  -                             dest.createDurableSubscription(new 
DurableSubcriptionID(clientId, name));
  +                             if( dest == null ) {
  +                                     category.warn("Subscription topic of not 
found: "+topicName);
  +                                     category.warn("Subscription cannot be 
initialized: "+clientId+","+name);
  +                                     element.removeFromParent();
  +                             } else {
  +                                     dest.createDurableSubscription(new 
DurableSubcriptionID(clientId, name));
  +                             }
   
                        } catch (JMSException e ) {
                                category.error("Could not initialize a durable 
subscription for : Client Id="+clientId+", Name="+name+", Topic Name="+topicName,e);
  @@ -238,88 +243,92 @@
        stateFile = newStateFile;
   }
   
  -     public void setDurableSubscription(JMSServer server, DurableSubcriptionID sub, 
SpyTopic topic) throws JMSException {
  -             category.debug("Checking durable subscription: "+sub+", on topic: 
"+topic); 
  -             try {
  -                     //Set the known Ids
  -                     Enumeration enum = stateConfig.getElementsNamed("User");
  -                     while( enum.hasMoreElements() ) {
  -
  -                             // Match the User.Name
  -                             XElement user = (XElement)enum.nextElement();
  -                             if( !user.containsField("Id") || 
!user.getField("Id").equals(sub.getClientID()) ) 
  -                                     continue;
  -
  -                             category.debug("Found a matching ClientID 
configuration section."); 
  -                                     
  -
  -                             XElement subscription=null;
  -
  -                             // Match the User/DurableSubscription.Name
  -                             Enumeration enum2 = 
user.getElementsNamed("DurableSubscription");
  -                             while( enum2.hasMoreElements() ) {
  -                                     XElement t = (XElement)enum2.nextElement();
  -                                     if( 
t.getField("Name").equals(sub.getSubscriptionName())) {
  -                                             subscription = t;
  -                                             break;
  -                                     }
  -                             }
  +     public String displayStateConfig() throws Exception {
  +             return stateConfig.toString();
  +     }
   
  -                             if( subscription == null ) {
  -                                     category.debug("The subscription was not 
previously registered."); 
  -                                     // it was not previously registered...
  -                                     if( topic == null )
  -                                             return;
  -
  -                                     subscription = new 
XElement("DurableSubscription");
  -                                     subscription.addField("Name", 
sub.getSubscriptionName());
  -                                     subscription.addField("TopicName", 
topic.getName());
  -                                     user.addElement(subscription);
  -                                                                                    
 
  -                                     JMSTopic dest = 
(JMSTopic)server.getJMSDestination(topic);
  -                                     dest.createDurableSubscription(sub);
  -                                     
  -                                     saveConfig();
  -                                     
  -                             } else {
  -                                     category.debug("The subscription was 
previously registered."); 
  -                                     // it was previously registered...
  -                                     if( 
subscription.getField("TopicName").equals(topic.getName()) ) {
  -                                             // and it is the same as before, do 
nothing.
  -                                             return;
  -                                     } else {
  -                                             category.debug("But the topic was 
different, changing the subscription."); 
  -                                             // we have to change the 
subscription...
  -                                             SpyTopic prevTopic = new SpyTopic( 
subscription.getField("TopicName") );
  -                                             JMSTopic dest = 
(JMSTopic)server.getJMSDestination(prevTopic);                                         
 
  -                                             dest.destoryDurableSubscription(sub);
  -
  -                                             if( topic == null ) {
  -                                                     
subscription.removeFromParent();
  -                                             } else {
  -                                                     
subscription.setField("TopicName", topic.getName());
  -                                                     dest = 
(JMSTopic)server.getJMSDestination(topic);
  -                                                     
dest.createDurableSubscription(sub);
  -                                             }
  -                                             
  -                                             saveConfig();
  -                                     }
  -                                     // Subscription existed
  -                                     return;
  -                             }
  +     public void setDurableSubscription(JMSServer server, DurableSubcriptionID sub, 
SpyTopic topic) throws JMSException {
  +         category.debug("Checking durable subscription: " + sub + ", on topic: " + 
topic);
  +         try {
  +             //Set the known Ids
  +             Enumeration enum= stateConfig.getElementsNamed("User");
  +             while (enum.hasMoreElements()) {
  +
  +                 // Match the User.Name
  +                 XElement user= (XElement) enum.nextElement();
  +                 if (!user.containsField("Id") || 
!user.getField("Id").equals(sub.getClientID()))
  +                     continue;
  +
  +                 category.debug("Found a matching ClientID configuration section.");
  +
  +                 XElement subscription= null;
  +
  +                 // Match the User/DurableSubscription.Name
  +                 Enumeration enum2= user.getElementsNamed("DurableSubscription");
  +                 while (enum2.hasMoreElements()) {
  +                     XElement t= (XElement) enum2.nextElement();
  +                     if (t.getField("Name").equals(sub.getSubscriptionName())) {
  +                         subscription= t;
  +                         break;
  +                     }
  +                 }
  +
  +                 if (subscription == null) {
  +                     category.debug("The subscription was not previously 
registered.");
  +                     // it was not previously registered...
  +                     if (topic == null)
  +                         return;
  +
  +                     subscription= new XElement("DurableSubscription");
  +                     subscription.addField("Name", sub.getSubscriptionName());
  +                     subscription.addField("TopicName", topic.getName());
  +                     user.addElement(subscription);
  +
  +                     JMSTopic dest= (JMSTopic) server.getJMSDestination(topic);
  +                     dest.createDurableSubscription(sub);
  +
  +                     saveConfig();
  +
  +                 } else {
  +                     category.debug("The subscription was previously registered.");
  +                     // it was previously registered...
  +                     if 
(subscription.getField("TopicName").equals(topic.getName())) {
  +                         // and it is the same as before, do nothing.
  +                         return;
  +                     } else {
  +                         category.debug("But the topic was different, changing the 
subscription.");
  +                         // we have to change the subscription...
  +                         SpyTopic prevTopic= new 
SpyTopic(subscription.getField("TopicName"));
  +                         JMSTopic dest= (JMSTopic) 
server.getJMSDestination(prevTopic);
  +                         dest.destoryDurableSubscription(sub);
  +
  +                         if (topic == null) {
  +                             subscription.removeFromParent();
  +                         } else {
  +                             subscription.setField("TopicName", topic.getName());
  +                             dest= (JMSTopic) server.getJMSDestination(topic);
  +                             dest.createDurableSubscription(sub);
  +                         }
  +
  +                         saveConfig();
  +                     }
  +                     // Subscription existed
  +                 }
  +                             return;
  +             }
  +             
  +                     // Could not find that user..
  +                     throw new JMSException("ClientID '" + sub.getClientID() + "' 
cannot create durable subscriptions.");
  +                     
  +         } catch (java.io.IOException e) {
  +             JMSException newE= new JMSException("Could not setup the durable 
subscription");
  +             newE.setLinkedException(e);
  +             throw newE;
  +         } catch (org.jbossmq.xml.XElementException e) {
  +             JMSException newE= new JMSException("Could not setup the durable 
subscription");
  +             newE.setLinkedException(e);
  +             throw newE;
  +         }
   
  -                             // Could not find that user..
  -                             throw new JMSException("ClientID 
'"+sub.getClientID()+"' cannot create durable subscriptions.");
  -                     }
  -             } catch ( java.io.IOException e ) {
  -                     JMSException newE = new JMSException("Could not setup the 
durable subscription");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             } catch ( org.jbossmq.xml.XElementException e ) {
  -                     JMSException newE = new JMSException("Could not setup the 
durable subscription");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }
  -             
        }
   }
  
  
  
  1.3.2.1   +3 -3      jbossmq/src/main/org/jbossmq/server/StateManagerMBean.java
  
  Index: StateManagerMBean.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/StateManagerMBean.java,v
  retrieving revision 1.3
  retrieving revision 1.3.2.1
  diff -u -r1.3 -r1.3.2.1
  --- StateManagerMBean.java    2001/07/28 00:30:16     1.3
  +++ StateManagerMBean.java    2001/08/01 05:19:48     1.3.2.1
  @@ -60,8 +60,6 @@
   public interface StateManagerMBean
      extends org.jboss.util.ServiceMBean
   {
  -
  -     
      // Public --------------------------------------------------------
   
        
  @@ -75,4 +73,6 @@
   public java.lang.String getStateFile();
   
   public void setStateFile(java.lang.String newStateFile);
  +
  +     public String displayStateConfig() throws Exception;
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to