Bugs item #526696, was opened at 2002-03-06 23:18
You can respond by visiting:
https://sourceforge.net/tracker/?func=detail&atid=376685&aid=526696&group_id=22866
Category: JBossMQ
Group: v2.4 (stable)
Status: Open
Resolution: None
Priority: 5
Submitted By: Kelly McTiernan (kellymct)
Assigned to: Andreas Schaefer (schaefera)
Summary: session.recover() doesn't work
Initial Comment:
All operating systems.
Any version of Java.
N/A.
Call session.recover().
According to the JMS spec, a call to session.recover()
from a transacted session should return an
IllegalStateException. This it does, with the message:
The session is trasacted.
When called with a non-transacted session, however,
recover still throws an IllegalStateException, this
time with the message:
The session is not transacted.
Looking into the code a little, it turns out that the
recover() method does a check for the session being
transacted, and throws the IllegalStateException with
the second message if it is. Then it proceeds to call
rollback(), which does a check for the session NOT
being transacted, and throws the IllegalStateException
with the second message if it's not.
----------------------------------------------------------------------
Comment By: Seth Sites (thesitesman)
Date: 2002-07-19 11:47
Message:
Logged In: YES
user_id=579275
This is my proposed fix for session.recover(). It also
handles redelivery of messages that are still
unacknowledged when a client's connection crashes.
I have tested this code in HEAD and it also did not break
any of the testsuite tests. It is a long fix and involves
17 files, so I hope they all the diff's fit in here.
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/C
onnection.java,v
retrieving revision 1.20
diff -u -r1.20 Connection.java
--- Connection.java 2 May 2002 02:44:28 -
0000 1.20
+++ Connection.java 19 Jul 2002 07:24:10 -
0000
@@ -75,7 +75,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author <a href="[EMAIL PROTECTED]">Peter Antman</a>
- * @version $Revision: 1.20 $
+ * @version $Revision: 1.21 $
* @created August 16, 2001
*/
public class Connection implements
java.io.Serializable, javax.jms.Connection
@@ -1139,7 +1139,20 @@
//This session should not be in the "destinations"
object anymore.
//We could check this, though
}
-
+
+ void recover ( Subscription s )
+ throws JMSException
+ {
+ try
+ {
+ serverIL.recover(connectionToken, s);
+ }
+ catch ( Exception e )
+ {
+ throw new SpyJMSException("Cannot recover
session", e);
+ }
+ }
+
void unsubscribe(DurableSubscriptionID id)
throws JMSException
{
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/S
pySession.java,v
retrieving revision 1.14
diff -u -w -r1.14 SpySession.java
--- SpySession.java 25 May 2002 22:51:03 -0000 1.14
+++ SpySession.java 19 Jul 2002 07:26:39 -
0000
@@ -37,7 +37,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
*/
public abstract class SpySession
implements Session, XASession {
@@ -385,7 +385,43 @@
throw new IllegalStateException( "The session is
transacted" );
}
- rollback();
+ //stop message delivery
+ try
+ {
+ System.out.println( "stopped message delivery
for " + connection.getClientID() );
+ connection.stop();
+ }
+ catch (JMSException e)
+ {
+ throw new SpyJMSException("Could not stop
message delivery", e);
+ }
+
+ //we need a function to call on the server to
recover
+ if ( acknowledgeMode ==
Session.CLIENT_ACKNOWLEDGE ) {
+ try
+ {
+ Iterator i = connection.subscriptions.values
().iterator();
+ while ( i.hasNext() )
+ {
+ connection.recover ( ((SpyQueueReceiver)
i.next()).getSubscription() );
+ }
+ }
+ catch ( Exception e )
+ {
+ throw new SpyJMSException ("Unable to recover
session ", e );
+ }
+ }
+
+ //Restart the delivery sequence including all
unacknowledged messages that had been previously
delivered. Redelivered messages do not have to be
delivered in exactly their original delivery order.
+ try
+ {
+ System.out.println( "restarted message delivery
for " + connection.getClientID() );
+ connection.start();
+ }
+ catch ( JMSException e )
+ {
+ throw new SpyJMSException("Could not resume
message delivery", e);
+ }
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/Invoker.java,v
retrieving revision 1.1
diff -w -u -r1.1 Invoker.java
--- Invoker.java 4 May 2002 03:07:49 -0000 1.1
+++ Invoker.java 19 Jul 2002 07:28:56 -0000
@@ -28,7 +28,7 @@
* @author <a
href="[EMAIL PROTECTED]">Hiram Chirino</a>
* @author <a
href="[EMAIL PROTECTED]">Norbert Lataille</a>
* @author <a href="[EMAIL PROTECTED]">Peter Antman</a>
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
* @created August 16, 2001
*/
public interface Invoker
@@ -264,5 +264,19 @@
* @exception JMSException if it can not find the
subscription.
*/
public Subscription getSubscription(ConnectionToken
dc,int subscriberId) throws JMSException;
+
+ /**
+ * recover the session.
+ *
+ * @exception JMSException if it can not find the
subscription.
+ */
+ public void recover(ConnectionToken
dc,Subscription s) throws JMSException;
+
+ /**
+ * recover all sessions for a failed connection.
+ *
+ * @exception JMSException if it can not find the
subscription.
+ */
+ public void recoverAll(ConnectionToken dc) throws
JMSException;
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/ServerIL.java,v
retrieving revision 1.6
diff -u -w -r1.6 ServerIL.java
--- ServerIL.java 4 May 2002 03:07:49 -0000 1.6
+++ ServerIL.java 19 Jul 2002 07:29:55 -0000
@@ -26,7 +26,7 @@
* @author <a
href="[EMAIL PROTECTED]">Hiram Chirino</a>
* @author <a
href="[EMAIL PROTECTED]">Norbert Lataille</a>
* @author <a href="[EMAIL PROTECTED]">Peter Antman</a>
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
* @created August 16, 2001
*/
public interface ServerIL
@@ -258,5 +258,14 @@
*/
public void ping(ConnectionToken dc, long
clientTime)
throws Exception;
+
+
+ /**
+ * @param dc
org.jboss.mq.ConnectionToken
+ * @param s org.jboss.mq.Subscription
+ * @exception java.lang.Exception The exception
description.
+ */
+ public void recover(org.jboss.mq.ConnectionToken
dc, org.jboss.mq.Subscription s)
+ throws java.lang.Exception;
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/uil/UILServerIL.java,v
retrieving revision 1.8
diff -u -w -r1.8 UILServerIL.java
--- UILServerIL.java 9 Mar 2002 06:11:47 -0000 1.8
+++ UILServerIL.java 19 Jul 2002 11:34:51 -
0000
@@ -37,7 +34,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author <a href="[EMAIL PROTECTED]">Peter Antman</a>
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
* @created August 16, 2001
*/
public class UILServerIL implements java.io.Serializable,
Cloneable, org.jboss.mq.il.ServerIL
@@ -63,6 +60,7 @@
final static int m_checkUser = 20;
final static int m_ping = 21;
final static int m_authenticate = 22;
+ final static int m_recover = 23;
/**
* Description of the Field
@@ -563,5 +561,20 @@
Exception e = (Exception)in.readObject();
throw e;
}
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @return Description of the Returned Value
+ * @exception Exception Description of Exception
+ */
+ public synchronized void recover( ConnectionToken
ct, org.jboss.mq.Subscription s)
+ throws Exception
+ {
+ checkConnection();
+ out.writeByte(m_recover);
+ out.writeObject(s);
+ waitAnswer();
}
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/uil/UILServerILService.java,v
retrieving revision 1.21
diff -u -w -r1.21 UILServerILService.java
--- UILServerILService.java 19 Jul 2002 03:53:11 -
0000 1.21
+++ UILServerILService.java 19 Jul 2002 07:45:20 -
0000
@@ -48,7 +48,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author <a href="[EMAIL PROTECTED]">Peter Antman</a>
- * @version $Revision: 1.21 $
+ * @version $Revision: 1.22 $
*
* @jmx:mbean
extends="org.jboss.mq.il.ServerILJMXServiceMBean"
*/
@@ -83,6 +83,7 @@
final static int m_checkUser = 20;
final static int m_ping = 21;
final static int m_authenticate = 22;
+ final static int m_recover = 23;
final static int SO_TIMEOUT = 5000;
/**
@@ -221,8 +222,16 @@
{
break;
}
-
log.warn("Connection failure (1).", e);
+
+ try
+ {
+ server.recoverAll( connectionToken );
+ }
+ catch ( JMSException ex )
+ {
+ log.warn( "Could not recover sessions", ex );
+ }
break;
}
@@ -310,6 +319,9 @@
case m_authenticate:
result = server.authenticate((String)
in.readObject(), (String)in.readObject());
break;
+ case m_recover:
+ server.recover(connectionToken,
(Subscription)in.readObject());
+ break;
default:
throw new RemoteException("Bad method
code !");
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/rmi/RMIServerIL.java,v
retrieving revision 1.6
diff -u -w -r1.6 RMIServerIL.java
--- RMIServerIL.java 4 May 2002 03:07:50 -
0000 1.6
+++ RMIServerIL.java 19 Jul 2002 07:48:20 -
0000
@@ -29,7 +26,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author <a href="[EMAIL PROTECTED]">Peter Antman</a>
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
* @created August 16, 2001
*/
public class RMIServerIL extends
java.rmi.server.UnicastRemoteObject implements
RMIServerILRemote
@@ -325,6 +322,18 @@
throws JMSException
{
server.ping(dc, clientTime);
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @return Description of the Returned Value
+ * @exception Exception Description of Exception
+ */
+ public void recover( ConnectionToken ct,
Subscription s)
+ throws Exception
+ {
+ server.recover( ct, s );
}
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/oil/OILConstants.java,v
retrieving revision 1.2
diff -w -u -r1.2 OILConstants.java
--- OILConstants.java 6 Mar 2002 17:27:49 -
0000 1.2
+++ OILConstants.java 19 Jul 2002 07:52:07 -
0000
@@ -10,7 +10,7 @@
*
*
* @author Brian Weaver ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
* @created January 10, 2002
*/
final class OILConstants
@@ -49,6 +49,7 @@
final static int PONG = 28;
final static int CLOSE = 29;
final static int AUTHENTICATE = 30;
+ final static int RECOVER = 31;
}
/*
vim:tabstop=3:expandtab:shiftwidth=3
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/oil/OILServerIL.java,v
retrieving revision 1.9
diff -u -w -r1.9 OILServerIL.java
--- OILServerIL.java 9 Mar 2002 06:11:47 -0000 1.9
+++ OILServerIL.java 19 Jul 2002 07:53:27 -
0000
@@ -35,7 +32,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
* @created August 16, 2001
*/
public final class OILServerIL
@@ -532,5 +529,20 @@
Exception e = (Exception)in.readObject();
throw e;
}
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @return Description of the Returned Value
+ * @exception Exception Description of Exception
+ */
+ public synchronized void recover( ConnectionToken
ct, Subscription s)
+ throws Exception
+ {
+ checkConnection();
+ out.writeByte(OILConstants.RECOVER);
+ out.writeObject(s);
+ waitAnswer();
}
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/oil/OILServerILService.java,v
retrieving revision 1.23
diff -u -w -r1.23 OILServerILService.java
--- OILServerILService.java 19 Jul 2002 03:53:10 -
0000 1.23
+++ OILServerILService.java 19 Jul 2002 07:56:35 -
0000
@@ -48,7 +48,7 @@
* Implements the ServerILJMXService which is used to
manage the JVM IL.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.23 $
+ * @version $Revision: 1.24 $
*
* @jmx:mbean
extends="org.jboss.mq.il.ServerILJMXServiceMBean"
*/
@@ -207,6 +207,15 @@
break;
}
log.warn("Connection failure (1).", e);
+
+ try
+ {
+ server.recoverAll( connectionToken );
+ }
+ catch ( JMSException ex )
+ {
+ log.warn( "Could not recover sessions", ex );
+ }
break;
}
@@ -315,6 +324,10 @@
result = server.authenticate((String)
in.readObject(), (String)in.readObject());
break;
+ case OILConstants.RECOVER:
+ server.recover( connectionToken,
(Subscription)in.readObject() );
+ break;
+
default:
throw new RemoteException("Bad method
code !");
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/il
/jvm/JVMServerIL.java,v
retrieving revision 1.8
diff -u -w -r1.8 JVMServerIL.java
--- JVMServerIL.java 4 May 2002 03:07:50 -
0000 1.8
+++ JVMServerIL.java 19 Jul 2002 08:02:45 -
0000
@@ -29,7 +26,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author <a href="[EMAIL PROTECTED]">Peter Antman</a>
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
* @created August 16, 2001
*/
public class JVMServerIL implements
org.jboss.mq.il.ServerIL
@@ -339,5 +336,17 @@
throws JMSException
{
server.ping(dc, clientTime);
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @return Description of the Returned Value
+ * @exception Exception Description of Exception
+ */
+ public void recover( ConnectionToken ct,
org.jboss.mq.Subscription s)
+ throws Exception
+ {
+ server.recover(ct, s);
}
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/s
erver/BasicQueue.java,v
retrieving revision 1.20
diff -u -w -r1.20 BasicQueue.java
--- BasicQueue.java 24 May 2002 02:38:30 -
0000 1.20
+++ BasicQueue.java 19 Jul 2002 11:05:37 -
0000
@@ -36,7 +36,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author David Maplesden
([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.20 $
+ * @version $Revision: 1.21 $
*/
//abstract public class BasicQueue implements
Runnable {
public class BasicQueue {
@@ -375,6 +375,30 @@
}
}
+ public void recoverSession ( Subscription sub )
+ throws JMSException
+ {
+
+ if ( !hasUnackedMessages( sub.subscriptionId ) )
+ {
+ System.out.println("Subscriber " +
sub.subscriptionId + " has no unacked messages.");
+ return;
+ }
+ synchronized (unacknowledgedMessages) {
+ for (Iterator it =
unacknowledgedMessages.keySet().iterator(); it.hasNext
();)
+ {
+ AcknowledgementRequest ack =
(AcknowledgementRequest)it.next();
+ if (ack.subscriberId == sub.subscriptionId)
+ {
+ MessageReference messageRef = (
MessageReference )unacknowledgedMessages.get( ack );
+ messageRef.getMessage().setJMSRedelivered(
true );
+
+ restoreMessage( messageRef );
+ }
+ }
+ }
+ }
+
protected void checkRemovedSubscribers(int subId) {
Integer id = new Integer(subId);
synchronized (removedSubscribers) {
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/s
erver/JMSDestination.java,v
retrieving revision 1.11
diff -u -w -r1.11 JMSDestination.java
--- JMSDestination.java 9 May 2002 04:19:21 -
0000 1.11
+++ JMSDestination.java 19 Jul 2002 11:30:39 -
0000
@@ -30,7 +30,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author David Maplesden
([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public abstract class JMSDestination {
@@ -75,6 +75,8 @@
public abstract void removeReceiver( Subscription
sub );
public abstract void restoreMessage(
MessageReference message );
+
+ public abstract void recoverSession (Subscription
sub);
public abstract void clientConsumerStopped(
ClientConsumer clientConsumer );
public abstract boolean isInUse();
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/s
erver/JMSDestinationManager.java,v
retrieving revision 1.2
diff -u -w -r1.2 JMSDestinationManager.java
--- JMSDestinationManager.java 4 May 2002
15:26:59 -0000 1.2
+++ JMSDestinationManager.java 19 Jul 2002
11:09:41 -0000
@@ -52,7 +52,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author David Maplesden
([EMAIL PROTECTED])
* @author <a href="mailto:[EMAIL PROTECTED]">Peter
Antman</a>
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class JMSDestinationManager extends
JMSServerInterceptorSupport
{
@@ -587,6 +587,15 @@
{
//We should try again :) This behavior should under
control of a Failure-Plugin
log.error("The connection to client " +
dc.getClientID() + " failed.");
+ ClientConsumer cq = (ClientConsumer)
clientConsumers.get(dc);
+
+ Iterator i = cq.subscriptions.keySet().iterator();
+ while ( i.hasNext() )
+ {
+ Integer subId = ( Integer )i.next();
+ Subscription sub = ( Subscription )
cq.subscriptions.get( subId );
+ recover( dc, sub );
+ }
connectionClosing(dc);
}
@@ -883,5 +892,23 @@
public Subscription getSubscription(ConnectionToken
dc,int subscriberId) throws JMSException {
ClientConsumer clientConsumer =
getClientConsumer(dc);
return clientConsumer.getSubscription
(subscriberId);
+ }
+
+ public void recover ( ConnectionToken dc,
Subscription sub )
+ throws JMSException
+ {
+ JMSDestination dest = (JMSDestination)
destinations.get(sub.destination);
+ if ( dest instanceof JMSQueue )
+ {
+ JMSQueue queue = (JMSQueue)dest;
+
+ queue.queue.recoverSession( sub );
+ }
+ }
+
+ public void recoverAll ( ConnectionToken dc )
+ throws JMSException
+ {
+ this.connectionFailure(dc);
}
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/s
erver/JMSServerInterceptor.java,v
retrieving revision 1.1
diff -u -w -r1.1 JMSServerInterceptor.java
--- JMSServerInterceptor.java 4 May 2002 03:07:52 -
0000 1.1
+++ JMSServerInterceptor.java 19 Jul 2002
11:11:48 -0000
@@ -34,7 +34,7 @@
*
*
* @author <a href="mailto:[EMAIL PROTECTED]">Peter
Antman</a>
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface JMSServerInterceptor {
@@ -279,4 +279,18 @@
* @exception JMSException if it can not find the
subscription.
*/
public Subscription getSubscription(ConnectionToken
dc,int subscriberId) throws JMSException;
+
+ /**
+ * recover session.
+ *
+ * @exception JMSException if it can not find the
subscription.
+ */
+ public void recover(ConnectionToken dc,
Subscription sub) throws JMSException;
+
+ /**
+ * recover session.
+ *
+ * @exception JMSException if it can not find the
subscription.
+ */
+ public void recoverAll(ConnectionToken dc) throws
JMSException;
}
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/s
erver/JMSServerInterceptorSupport.java,v
retrieving revision 1.1
diff -u -w -r1.1 JMSServerInterceptorSupport.java
--- JMSServerInterceptorSupport.java 4 May 2002
03:07:52 -0000 1.1
+++ JMSServerInterceptorSupport.java 19 Jul 2002
11:12:49 -0000
@@ -30,7 +30,7 @@
* A pass through JMSServerInvoker.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Peter
Antman</a>
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSServerInterceptorSupport implements
JMSServerInterceptor {
@@ -317,4 +317,15 @@
return nextInterceptor.getSubscription
(dc,subscriberId);
}
+ public void recover(ConnectionToken dc,
Subscription sub)
+ throws JMSException
+ {
+ nextInterceptor.recover( dc, sub );
+ }
+
+ public void recoverAll(ConnectionToken dc)
+ throws JMSException
+ {
+ nextInterceptor.recoverAll( dc );
+ }
} // JMSServerInvokerSupport
RCS
file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/s
erver/JMSServerInvoker.java,v
retrieving revision 1.2
diff -u -w -r1.2 JMSServerInvoker.java
--- JMSServerInvoker.java 4 May 2002 03:07:52 -
0000 1.2
+++ JMSServerInvoker.java 19 Jul 2002 11:14:10 -
0000
@@ -33,7 +33,7 @@
* A pass through JMSServerInvoker.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Peter
Antman</a>
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class JMSServerInvoker implements Invoker {
@@ -317,6 +317,18 @@
public Subscription getSubscription(ConnectionToken
dc,int subscriberId) throws JMSException {
return nextInterceptor.getSubscription
(dc,subscriberId);
+ }
+
+ public void recover(ConnectionToken dc,
Subscription sub)
+ throws JMSException
+ {
+ nextInterceptor.recover(dc,sub);
+ }
+
+ public void recoverAll(ConnectionToken dc)
+ throws JMSException
+ {
+ nextInterceptor.recoverAll(dc);
}
If you need any further information, please let me know.
-Seth
----------------------------------------------------------------------
You can respond by visiting:
https://sourceforge.net/tracker/?func=detail&atid=376685&aid=526696&group_id=22866
-------------------------------------------------------
This sf.net email is sponsored by:ThinkGeek
Welcome to geek heaven.
http://thinkgeek.com/sf
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development