User: user57 Date: 01/07/31 22:19:47 Modified: src/main/org/jbossmq Tag: jboss_buildmagic Connection.java SpyBytesMessage.java SpyConnection.java SpyMessage.java SpyMessageConsumer.java SpyQueueSender.java SpyQueueSession.java SpySession.java SpyTopicSession.java SpyXAConnection.java Subscription.java Log: o updated from HEAD Revision Changes Path No revision No revision 1.3.2.1 +15 -21 jbossmq/src/main/org/jbossmq/Connection.java Index: Connection.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Connection.java,v retrieving revision 1.3 retrieving revision 1.3.2.1 diff -u -r1.3 -r1.3.2.1 --- Connection.java 2001/07/28 00:30:15 1.3 +++ Connection.java 2001/08/01 05:19:47 1.3.2.1 @@ -146,8 +146,6 @@ } catch (Exception e) { throw new SpyJMSException( "Cannot enable the connection with the JMS server",e); } - - changeModeStop(modeStop); } public void stop() throws JMSException { @@ -166,8 +164,6 @@ throw new SpyJMSException( "Cannot disable the connection with the JMS server",e); } - changeModeStop(modeStop); - } public synchronized void close() throws JMSException { @@ -247,19 +243,6 @@ } } - //notify his sessions that he has changed his stopped mode - synchronized void changeModeStop(boolean newValue) { - synchronized (createdSessions) { - - Iterator i = createdSessions.iterator(); - while (i.hasNext()) { - ((SpySession) i.next()).setStopMode(newValue); - } - - } - - } - //Called by a session when it is closing void sessionClosing(SpySession who) { synchronized (createdSessions) { @@ -278,7 +261,8 @@ try { clientID = serverIL.getID(); } catch (Exception e) { - throw new SpyJMSException( "Cannot get a client ID",e); + cat.debug("Server Exception: ", e); + throw new SpyJMSException( "Cannot get a client ID: "+e.getMessage(), e ); } } @@ -398,7 +382,12 @@ throw new IllegalStateException("The connection is closed"); try { - return serverIL.receive(connectionToken, sub.subscriptionId, wait); + SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, wait); + if( message != null ) { + message.shouldAck = true; + message.routeToSubscriber = sub.subscriptionId; + } + return message; } catch (Exception e) { throw new SpyJMSException( "Cannot create a ConnectionReceiver",e); } @@ -620,7 +609,12 @@ while( iter.hasNext() ) { SpyConsumer consumer = (SpyConsumer)iter.next(); - consumer.addMessage(requests[i].message); + if( iter.hasNext() ) { + consumer.addMessage(requests[i].message.myClone()); + } + else { + consumer.addMessage(requests[i].message); + } consumersUsed.add(consumer); } 1.4.4.1 +126 -116 jbossmq/src/main/org/jbossmq/SpyBytesMessage.java Index: SpyBytesMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyBytesMessage.java,v retrieving revision 1.4 retrieving revision 1.4.4.1 diff -u -r1.4 -r1.4.4.1 --- SpyBytesMessage.java 2001/05/17 23:46:29 1.4 +++ SpyBytesMessage.java 2001/08/01 05:19:47 1.4.4.1 @@ -20,26 +20,26 @@ /** * This class implements javax.jms.BytesMessage - * + * * @author Norbert Lataille ([EMAIL PROTECTED]) - * + * * @version $Revision$ */ -public class SpyBytesMessage - extends SpyMessage +public class SpyBytesMessage + extends SpyMessage implements Cloneable, BytesMessage { - + // Attributes ---------------------------------------------------- - private ByteArrayOutputStream ostream=null; - private DataOutputStream p=null; - private byte[] InternalArray=null; - private ByteArrayInputStream istream=null; - private DataInputStream m=null; + private transient ByteArrayOutputStream ostream=null; + private transient DataOutputStream p=null; + private transient ByteArrayInputStream istream=null; + private transient DataInputStream m=null; + private byte[] InternalArray=null; // Constructor --------------------------------------------------- - + public SpyBytesMessage() { msgReadOnly=false; @@ -49,272 +49,272 @@ // Public -------------------------------------------------------- - public boolean readBoolean() throws JMSException + public boolean readBoolean() throws JMSException { checkRead(); - try { + try { return m.readBoolean(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public byte readByte() throws JMSException + public byte readByte() throws JMSException { - checkRead(); - try { + checkRead(); + try { return m.readByte(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public int readUnsignedByte() throws JMSException + public int readUnsignedByte() throws JMSException { checkRead(); - try { + try { return m.readUnsignedByte(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public short readShort() throws JMSException + public short readShort() throws JMSException { checkRead(); - try { + try { return m.readShort(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - - public int readUnsignedShort() throws JMSException + + public int readUnsignedShort() throws JMSException { checkRead(); - try { + try { return m.readUnsignedShort(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public char readChar() throws JMSException + public char readChar() throws JMSException { checkRead(); - try { + try { return m.readChar(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public int readInt() throws JMSException + public int readInt() throws JMSException { checkRead(); - try { + try { return m.readInt(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public long readLong() throws JMSException + public long readLong() throws JMSException { checkRead(); - try { + try { return m.readLong(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public float readFloat() throws JMSException + public float readFloat() throws JMSException { checkRead(); - try { + try { return m.readFloat(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - - public double readDouble() throws JMSException + + public double readDouble() throws JMSException { checkRead(); - try { + try { return m.readDouble(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public String readUTF() throws JMSException + public String readUTF() throws JMSException { checkRead(); - try { + try { return m.readUTF(); } catch (EOFException e) { throw new MessageEOFException(""); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - - public int readBytes(byte[] value) throws JMSException + + public int readBytes(byte[] value) throws JMSException { checkRead(); - try { + try { return m.read(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public int readBytes(byte[] value, int length) throws JMSException + public int readBytes(byte[] value, int length) throws JMSException { checkRead(); - try { + try { return m.read(value,0,length); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeBoolean(boolean value) throws JMSException + public void writeBoolean(boolean value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeBoolean(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeByte(byte value) throws JMSException + public void writeByte(byte value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeByte(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeShort(short value) throws JMSException + public void writeShort(short value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeShort(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - - public void writeChar(char value) throws JMSException + + public void writeChar(char value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeChar(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - - public void writeInt(int value) throws JMSException + + public void writeInt(int value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeInt(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeLong(long value) throws JMSException + public void writeLong(long value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeLong(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeFloat(float value) throws JMSException + public void writeFloat(float value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeFloat(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeDouble(double value) throws JMSException + public void writeDouble(double value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeDouble(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeUTF(String value) throws JMSException + public void writeUTF(String value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.writeUTF(value); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - public void writeBytes(byte[] value) throws JMSException + public void writeBytes(byte[] value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.write(value,0,value.length); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - - public void writeBytes(byte[] value, int offset, int length) throws JMSException + + public void writeBytes(byte[] value, int offset, int length) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { p.write(value,offset,length); } catch (IOException e) { throw new JMSException("IOException"); - } + } } - - public void writeObject(Object value) throws JMSException + + public void writeObject(Object value) throws JMSException { if (msgReadOnly) throw new MessageNotWriteableException("the message body is read-only"); - try { + try { if (value instanceof String) p.writeChars((String)value); else if (value instanceof Boolean) p.writeBoolean(((Boolean)value).booleanValue()); else if (value instanceof Byte) p.writeByte(((Byte)value).byteValue()); @@ -324,16 +324,16 @@ else if (value instanceof Float) p.writeFloat(((Float)value).floatValue()); else if (value instanceof Double) p.writeDouble(((Double)value).doubleValue()); else if (value instanceof byte[]) p.write((byte[])value,0,((byte[])value).length); - else throw new MessageFormatException("Invalid object for properties"); + else throw new MessageFormatException("Invalid object for properties"); } catch (IOException e) { throw new JMSException("IOException"); - } - - } + } - public void reset() throws JMSException + } + + public void reset() throws JMSException { - try { + try { if (!msgReadOnly) { p.flush(); InternalArray=ostream.toByteArray(); @@ -346,50 +346,60 @@ msgReadOnly = true; } catch (IOException e) { throw new JMSException("IOException"); - } + } } - + public void clearBody() throws JMSException { - try { + try { if (!msgReadOnly) ostream.close(); else istream.close(); } catch (IOException e) { //don't throw an exception - } - + } + ostream=new ByteArrayOutputStream(); p=new DataOutputStream(ostream); - InternalArray=null; + InternalArray=null; istream=null; m=null; - + super.clearBody(); } - // Package protected --------------------------------------------- - + // Package protected --------------------------------------------- + //We need to reset() since this message is going to be cloned/serialized public SpyMessage myClone() { try { - reset(); + reset(); return (SpyMessage)clone(); - } catch (Exception e) { + } catch (Exception e) { throw new RuntimeException("myClone failed !"); - } + } } - + // Private ------------------------------------------------------- + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + if (!msgReadOnly) { + p.flush(); + InternalArray=ostream.toByteArray(); + } + out.writeObject(InternalArray); + } + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + InternalArray = (byte[])in.readObject(); + } - private void checkRead() throws JMSException + private void checkRead() throws JMSException { if (!msgReadOnly) throw new MessageNotWriteableException("readByte while the buffer is writeonly"); - + //We have just received/reset() the message, and the client is trying to read it if (istream==null||m==null) { istream = new ByteArrayInputStream(InternalArray); - m = new DataInputStream(istream); + m = new DataInputStream(istream); } } 1.13.2.1 +3 -3 jbossmq/src/main/org/jbossmq/SpyConnection.java Index: SpyConnection.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyConnection.java,v retrieving revision 1.13 retrieving revision 1.13.2.1 diff -u -r1.13 -r1.13.2.1 --- SpyConnection.java 2001/07/28 00:30:15 1.13 +++ SpyConnection.java 2001/08/01 05:19:47 1.13.2.1 @@ -47,7 +47,7 @@ { if (closed) throw new IllegalStateException("The connection is closed"); - TopicSession session=new SpyTopicSession(this,transacted,acknowledgeMode,modeStop); + TopicSession session=new SpyTopicSession(this,transacted,acknowledgeMode); //add the new session to the createdSessions list synchronized (createdSessions) { @@ -146,7 +146,7 @@ { if (closed) throw new IllegalStateException("The connection is closed"); - QueueSession session=new SpyQueueSession(this,transacted,acknowledgeMode,modeStop); + QueueSession session=new SpyQueueSession(this,transacted,acknowledgeMode); //add the new session to the createdSessions list synchronized (createdSessions) { 1.8.2.1 +3 -1 jbossmq/src/main/org/jbossmq/SpyMessage.java Index: SpyMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessage.java,v retrieving revision 1.8 retrieving revision 1.8.2.1 diff -u -r1.8 -r1.8.2.1 --- SpyMessage.java 2001/07/28 00:30:15 1.8 +++ SpyMessage.java 2001/08/01 05:19:47 1.8.2.1 @@ -67,6 +67,8 @@ public transient boolean shouldAck; //For ordering in the JMSServerQueue (set on the server side) public transient long messageId; + //For some persistence mechanisms + public transient Object persistData; // Constructor --------------------------------------------------- 1.8.2.1 +44 -51 jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java Index: SpyMessageConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java,v retrieving revision 1.8 retrieving revision 1.8.2.1 diff -u -r1.8 -r1.8.2.1 --- SpyMessageConsumer.java 2001/07/28 00:30:15 1.8 +++ SpyMessageConsumer.java 2001/08/01 05:19:47 1.8.2.1 @@ -82,13 +82,22 @@ if (closed) throw new IllegalStateException("The MessageConsumer is closed"); + if (messageListener != null) throw new JMSException("A message listener is already registered"); - subscription.receiving = true; + if ( subscription.actsLikeAQueue ) { + while( true ) { + SpyMessage msg = session.connection.receive(subscription, 0); + if( msg == null ) + break; + Message mes = preProcessMessage( msg ); + if( mes != null ) + return mes; + } + } - if ( subscription.actsLikeAQueue ) - session.connection.receive(subscription, 0); + subscription.receiving = true; synchronized (messages) { @@ -96,12 +105,9 @@ while (true) { if (closed) return null; - if (!session.modeStop) { - Message mes = getMessage(); - if (mes != null) - return mes; - } else - cat.debug("the connection is stopped !"); + Message mes = getMessage(); + if (mes != null) + return mes; cat.debug("SpyMessageConsumer: receive in messages.wait()"); messages.wait(); @@ -118,20 +124,29 @@ } public Message receive(long timeOut) throws JMSException { + if (timeOut == 0) + return receive(); + if (closed) throw new IllegalStateException("The MessageConsumer is closed"); + if (messageListener != null) throw new JMSException("A message listener is already registered"); - if (timeOut == 0) - return receive(); - long endTime = System.currentTimeMillis() + timeOut; - subscription.receiving = true; + if ( subscription.actsLikeAQueue ) { + while( true ) { + SpyMessage msg = session.connection.receive(subscription, timeOut); + if( msg == null ) + break; + Message mes = preProcessMessage( msg ); + if( mes != null ) + return mes; + } + } - if ( subscription.actsLikeAQueue ) - session.connection.receive(subscription, timeOut); + subscription.receiving = true; synchronized (messages) { @@ -142,14 +157,10 @@ if (closed) return null; - if (!session.modeStop) { - Message mes = getMessage(); - if (mes != null) { - return mes; - } - - } else - cat.debug("the connection is stopped !"); + Message mes = getMessage(); + if (mes != null) { + return mes; + } long att = endTime - System.currentTimeMillis(); if (att <= 0) { @@ -173,31 +184,18 @@ public Message receiveNoWait() throws JMSException { if (closed) throw new IllegalStateException("The MessageConsumer is closed"); + if (messageListener != null) throw new JMSException("A message listener is already registered"); - - subscription.receiving = true; - try { - - if ( subscription.actsLikeAQueue ) { - if (session.modeStop) - return null; - - SpyMessage msg = session.connection.receive(getSubscription(), -1); - return preProcessMessage( msg ); - } - - synchronized (messages) { - while (true) { - if (session.modeStop) - return null; - return getMessage(); - } - } - } finally { - subscription.receiving = false; + if ( subscription.actsLikeAQueue ) { + SpyMessage msg = session.connection.receive(subscription, -1); + if( msg == null ) + return null; + return preProcessMessage( msg ); } + + return getMessage(); } public void close() throws JMSException { @@ -341,13 +339,8 @@ return null; SpyMessage mes = (SpyMessage) messages.removeFirst(); - - //the SAME Message object is put in different SessionQueues - //when we deliver it, we have to clone() it to insure independance - //HRC: could we avoid this if we know that we are delivering in P2P??? - SpyMessage message = mes.myClone(); - Message rc = preProcessMessage( message ); + Message rc = preProcessMessage( mes); // could happen if the message has expired. if( rc == null ) continue; 1.3.4.1 +2 -6 jbossmq/src/main/org/jbossmq/SpyQueueSender.java Index: SpyQueueSender.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueueSender.java,v retrieving revision 1.3 retrieving revision 1.3.4.1 diff -u -r1.3 -r1.3.4.1 --- SpyQueueSender.java 2001/05/20 23:38:18 1.3 +++ SpyQueueSender.java 2001/08/01 05:19:47 1.3.4.1 @@ -95,11 +95,7 @@ message = m; } - // Clone the message so we can make the outbound message read only - SpyMessage clone = ((SpyMessage)message).myClone(); - clone.setReadOnlyMode(); - //Send the message. - session.sendMessage(clone); + session.sendMessage((SpyMessage)message); } } 1.4.2.1 +5 -5 jbossmq/src/main/org/jbossmq/SpyQueueSession.java Index: SpyQueueSession.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueueSession.java,v retrieving revision 1.4 retrieving revision 1.4.2.1 diff -u -r1.4 -r1.4.2.1 --- SpyQueueSession.java 2001/07/16 02:51:44 1.4 +++ SpyQueueSession.java 2001/08/01 05:19:47 1.4.2.1 @@ -102,15 +102,15 @@ } - SpyQueueSession(Connection myConnection, boolean transacted, int acknowledgeMode, boolean stop) + SpyQueueSession(Connection myConnection, boolean transacted, int acknowledgeMode) { - this(myConnection,transacted,acknowledgeMode,stop, false); + this(myConnection,transacted,acknowledgeMode,false); } // Constructor --------------------------------------------------- - SpyQueueSession(Connection myConnection, boolean transacted, int acknowledgeMode, boolean stop, boolean xaSession) + SpyQueueSession(Connection myConnection, boolean transacted, int acknowledgeMode, boolean xaSession) { - super(myConnection,transacted,acknowledgeMode,stop, xaSession); + super(myConnection,transacted,acknowledgeMode,xaSession); } } 1.6.2.1 +4 -17 jbossmq/src/main/org/jbossmq/SpySession.java Index: SpySession.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpySession.java,v retrieving revision 1.6 retrieving revision 1.6.2.1 diff -u -r1.6 -r1.6.2.1 --- SpySession.java 2001/07/16 02:51:44 1.6 +++ SpySession.java 2001/08/01 05:19:47 1.6.2.1 @@ -54,8 +54,6 @@ //MessageConsumers created by this session protected HashSet consumers; - //Is my connection in stopped mode ? - protected boolean modeStop; //Is the session closed ? boolean closed; @@ -181,7 +179,7 @@ //if we are not in stopped mode, look at the incoming queue //Consisder if should be stopped because we are outside the XA transaction (start/end) boolean xaStop = spyXAResource!=null && currentTransactionId==null; - if (!(modeStop || xaStop)) { + if (!xaStop) { Iterator i; synchronized (consumers) { @@ -338,7 +336,7 @@ throw new IllegalStateException("The session is closed"); if( transacted ) { - connection.spyXAResourceManager.addMessage(currentTransactionId, m); + connection.spyXAResourceManager.addMessage(currentTransactionId, m.myClone()); } else { connection.sendToServer(m); } @@ -350,16 +348,6 @@ return spyXAResource; } - //The connection has changed its mode (stop() or start()) - //We have to wait until message delivery has stopped or wake up the thread - void setStopMode(boolean newValue) - { - - if (closed) throw new IllegalStateException("The session is closed"); - if (modeStop==newValue) return; - modeStop=newValue; - } - void addConsumer(SpyMessageConsumer who) throws JMSException { if (closed) throw new IllegalStateException("The session is closed"); @@ -401,13 +389,12 @@ // Constructor --------------------------------------------------- - SpySession(Connection conn, boolean trans, int acknowledge, boolean stop, boolean xaSession) + SpySession(Connection conn, boolean trans, int acknowledge, boolean xaSession) { connection=conn; transacted=trans; acknowledgeMode=acknowledge; - modeStop=stop; if( xaSession ) spyXAResource = new SpyXAResource(this); 1.5.2.1 +5 -5 jbossmq/src/main/org/jbossmq/SpyTopicSession.java Index: SpyTopicSession.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopicSession.java,v retrieving revision 1.5 retrieving revision 1.5.2.1 diff -u -r1.5 -r1.5.2.1 --- SpyTopicSession.java 2001/07/28 00:30:15 1.5 +++ SpyTopicSession.java 2001/08/01 05:19:47 1.5.2.1 @@ -121,15 +121,15 @@ } - SpyTopicSession(Connection myConnection, boolean transacted, int acknowledgeMode, boolean stop) + SpyTopicSession(Connection myConnection, boolean transacted, int acknowledgeMode) { - this(myConnection,transacted,acknowledgeMode,stop,false); + this(myConnection,transacted,acknowledgeMode,false); } // Constructor --------------------------------------------------- - SpyTopicSession(Connection myConnection, boolean transacted, int acknowledgeMode, boolean stop, boolean xaSession) + SpyTopicSession(Connection myConnection, boolean transacted, int acknowledgeMode, boolean xaSession) { - super(myConnection,transacted,acknowledgeMode,stop,xaSession); + super(myConnection,transacted,acknowledgeMode,xaSession); } } 1.2.2.1 +3 -3 jbossmq/src/main/org/jbossmq/SpyXAConnection.java Index: SpyXAConnection.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyXAConnection.java,v retrieving revision 1.2 retrieving revision 1.2.2.1 diff -u -r1.2 -r1.2.2.1 --- SpyXAConnection.java 2001/07/16 02:51:44 1.2 +++ SpyXAConnection.java 2001/08/01 05:19:47 1.2.2.1 @@ -51,7 +51,7 @@ if (closed) throw new IllegalStateException("The connection is closed"); - XAQueueSession session=new SpyQueueSession(this,true,0,modeStop,true); + XAQueueSession session=new SpyQueueSession(this,true,0,true); //add the new session to the createdSessions list synchronized (createdSessions) { @@ -85,7 +85,7 @@ if (closed) throw new IllegalStateException("The connection is closed"); - XATopicSession session = new SpyTopicSession(this, true, 0, modeStop, true); + XATopicSession session = new SpyTopicSession(this, true, 0, true); //add the new session to the createdSessions list synchronized (createdSessions) { createdSessions.add(session); 1.5.2.1 +32 -20 jbossmq/src/main/org/jbossmq/Subscription.java Index: Subscription.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Subscription.java,v retrieving revision 1.5 retrieving revision 1.5.2.1 diff -u -r1.5 -r1.5.2.1 --- Subscription.java 2001/07/28 00:30:15 1.5 +++ Subscription.java 2001/08/01 05:19:47 1.5.2.1 @@ -15,9 +15,9 @@ /** * This class contians all the data needed to for a the provider to * to determine if a message can be routed to a consumer. - * + * * @author Hiram Chirino ([EMAIL PROTECTED]) - * + * * @version $Revision$ */ public class Subscription @@ -31,18 +31,18 @@ public String messageSelector; // Should this message destroy the subscription? public boolean destroyDurableSubscription; - - // Topics might not want locally produced messages public boolean noLocal; + // Does this subscription look like a queue? + public boolean actsLikeAQueue; - // Transient Values + // Transient Values public transient Selector selector; public transient ConnectionToken dc; public transient boolean listening; public transient boolean receiving; - // Determines the consumer would accept the message. + // Determines the consumer would accept the message. public boolean accepts( SpyMessage message, boolean exclusive ) throws javax.jms.JMSException { Selector ms = getSelector(); @@ -59,39 +59,51 @@ // But if the subscriber is durable, then it acts like a Queue if( actsLikeAQueue ) { - + if( !exclusive ) return false; if( !listening && !receiving ) return false; - + } - + } else { - + if( !exclusive ) - return false; + return false; // In the Queue case we only deliver if it is currently // has a listner or is receiving if( !listening && !receiving ) return false; } - + return true; - - } - public boolean actsLikeAQueue; + } - // Determines the consumer would accept the message. + // Determines the consumer would accept the message. public Selector getSelector() throws javax.jms.JMSException { if( messageSelector == null ) return null; - - if( selector==null ) + + if( selector==null ) selector = new Selector(messageSelector); + + return selector; + } + + public Subscription myClone(){ + Subscription result = new Subscription(); + + //only need to clone non-transient fields for our purposes. + result.subscriptionId = subscriptionId; + result.destination = destination; + result.messageSelector = messageSelector; + result.destroyDurableSubscription = destroyDurableSubscription; + result.noLocal = noLocal; + result.actsLikeAQueue = actsLikeAQueue; - return selector; + return result; } } _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development