User: hiram
Date: 00/12/11 21:58:47
Modified: src/java/org/spydermq/server SpyderMQServiceMBean.java
SpyderMQService.java StartServer.java
Added: src/java/org/spydermq/server JMSServerQueueReceiver.java
InvocationLayerFactory.java JMSServerQueue.java
JMSServerMBean.java JMSServer.java
Removed: src/java/org/spydermq/server RemoteControl.java Main.java
RemoteControlImpl.java
Log:
Several Chanages:
- Invocation layer simplified by joing the DistributedQueueConnectionFactory and
DistributedTopicConnectionFactory into DistributedConnectionFactory
- Seperated server code from client code ( server code moved to org.spydermq.server )
- All publish() calls are now sync to the provider.
- Added a Transaction class to better represent a commit/rollback request to the
server.
- Now have a InvocationLayerFactory so that we can potentialy load multiple
invocation layers (OIL/UIL/RMI) at the same time.
Revision Changes Path
1.2 +25 -2 spyderMQ/src/java/org/spydermq/server/SpyderMQServiceMBean.java
Index: SpyderMQServiceMBean.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SpyderMQServiceMBean.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyderMQServiceMBean.java 2000/11/29 00:19:39 1.1
+++ SpyderMQServiceMBean.java 2000/12/12 05:58:43 1.2
@@ -7,19 +7,42 @@
package org.spydermq.server;
+/*
+ * jBoss, the OpenSource EJB server
+ *
+ * Distributable under GPL license.
+ * See terms of license at gnu.org.
+ */
+
+/*
+ * jBoss, the OpenSource EJB server
+ *
+ * Distributable under GPL license.
+ * See terms of license at gnu.org.
+ */
+
/**
* <description>
* MBean interface for the SpyderMQ JMX service.
*
* @see <related>
* @author Vincent Sheffer ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface SpyderMQServiceMBean
extends org.jboss.util.ServiceMBean
{
// Constants -----------------------------------------------------
public static final String OBJECT_NAME = ":service=SpyderMQ";
-
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
// Public --------------------------------------------------------
}
1.2 +73 -73 spyderMQ/src/java/org/spydermq/server/SpyderMQService.java
Index: SpyderMQService.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SpyderMQService.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyderMQService.java 2000/11/29 00:19:39 1.1
+++ SpyderMQService.java 2000/12/12 05:58:43 1.2
@@ -28,88 +28,88 @@
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @author <a href="mailto:[EMAIL PROTECTED]">Juha Lindfors</a>
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyderMQService
extends ServiceMBeanSupport
implements SpyderMQServiceMBean, MBeanRegistration
{
// Constants -----------------------------------------------------
-
+
// Attributes ----------------------------------------------------
- MBeanServer mBeanServer = null;
- Object spyderMQServer = null;
+ MBeanServer mBeanServer = null;
+ Object spyderMQServer = null;
- public SpyderMQService() {
- }
-
- // Public --------------------------------------------------------
- public ObjectName getObjectName(MBeanServer server, ObjectName name)
- throws javax.management.MalformedObjectNameException {
- this.mBeanServer = server;
- return new ObjectName(OBJECT_NAME);
- }
-
- public String getName() {
- return "SpyderMQ";
- }
-
- public void startService()
- throws Exception {
- if (spyderMQServer == null) {
- final Log log = this.log;
-
+ public SpyderMQService() {
+ }
+
+ // Public --------------------------------------------------------
+ public ObjectName getObjectName(MBeanServer server, ObjectName name)
+ throws javax.management.MalformedObjectNameException {
+ this.mBeanServer = server;
+ return new ObjectName(OBJECT_NAME);
+ }
+
+ public String getName() {
+ return "SpyderMQ";
+ }
+
+ public void startService()
+ throws Exception {
+ if (spyderMQServer == null) {
+ final Log log = this.log;
+
try {
- log.log("Testing if SpyderMQ is present....");
- try {
- spyderMQServer =
Thread.currentThread().getContextClassLoader().loadClass("org.spydermq.server.StartServer").newInstance();
- log.log("OK");
- }catch(Exception e) {
- log.log("failed");
- log.log("SpyderMQ wasn't found:");
- log.debug(e.getMessage());
- return;
- }
-
- Class[] spyderMQArgsClasses = { MBeanServer.class };
- Object[] spyderMQArgs = { mBeanServer };
-
- Method startMethod = spyderMQServer.getClass().getMethod("start",
-
spyderMQArgsClasses);
-
- Logger.log("Starting SpyderMQ...");
- startMethod.invoke(spyderMQServer, spyderMQArgs);
- }
+ log.log("Testing if SpyderMQ is present....");
+ try {
+ spyderMQServer =
Thread.currentThread().getContextClassLoader().loadClass("org.spydermq.server.StartServer").newInstance();
+ log.log("OK");
+ }catch(Exception e) {
+ log.log("failed");
+ log.log("SpyderMQ wasn't found:");
+ log.debug(e.getMessage());
+ return;
+ }
+
+ Class[] spyderMQArgsClasses = { MBeanServer.class };
+ Object[] spyderMQArgs = { mBeanServer };
+
+ Method startMethod =
spyderMQServer.getClass().getMethod("start",
+
spyderMQArgsClasses);
+
+ Logger.log("Starting SpyderMQ...");
+ startMethod.invoke(spyderMQServer, spyderMQArgs);
+ }
catch (Exception e) {
- log.error("SpyderMQ failed");
- log.exception(e);
- }
- }
- }
-
- public void stopService() {
- Class [] spyderMQArgsClasses = null;
- Method stopMethod = null;
- Object [] spyderMQArgs = null;
-
- if (this.spyderMQServer != null) {
- try {
- spyderMQArgsClasses = new Class[0];
- stopMethod = this.spyderMQServer.getClass().getMethod("stop",
-
spyderMQArgsClasses);
- spyderMQArgs = new Object[0];
-
- // [FIXME] jpl
- // This causes some error messages on the console so
- // disabled for now
-
- //stopMethod.invoke(spyderMQServer, spyderMQArgs);
- this.spyderMQServer = null;
- } catch (Exception e) {
- log.error("SpyderMQ failed");
- log.exception(e);
- }
- }
- }
+ log.error("SpyderMQ failed");
+ log.exception(e);
+ }
+ }
+ }
+
+ public void stopService() {
+ Class [] spyderMQArgsClasses = null;
+ Method stopMethod = null;
+ Object [] spyderMQArgs = null;
+
+ if (this.spyderMQServer != null) {
+ try {
+ spyderMQArgsClasses = new Class[0];
+ stopMethod =
this.spyderMQServer.getClass().getMethod("stop",
+
spyderMQArgsClasses);
+ spyderMQArgs = new Object[0];
+
+ // [FIXME] jpl
+ // This causes some error messages on the console
so
+ // disabled for now
+
+ //stopMethod.invoke(spyderMQServer, spyderMQArgs);
+ this.spyderMQServer = null;
+ } catch (Exception e) {
+ log.error("SpyderMQ failed");
+ log.exception(e);
+ }
+ }
+ }
}
1.2 +219 -279 spyderMQ/src/java/org/spydermq/server/StartServer.java
Index: StartServer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- StartServer.java 2000/08/25 02:30:25 1.1
+++ StartServer.java 2000/12/12 05:58:44 1.2
@@ -1,313 +1,253 @@
-package org.spydermq.server;
-
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
-
-import org.spydermq.JMSServer;
-import org.spydermq.security.SecurityManager;
-import org.spydermq.distributed.interfaces.DistributedJMSServer;
-import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
-import org.spydermq.distributed.interfaces.DistributedTopicConnectionFactory;
-import org.spydermq.distributed.interfaces.DistributedQueueConnectionFactory;
-import org.spydermq.distributed.JMSServerFactory;
-import org.spydermq.distributed.SpyTopicConnectionFactory;
-import org.spydermq.distributed.SpyQueueConnectionFactory;
-import org.spydermq.Log;
+package org.spydermq.server;
import javax.jms.TopicConnectionFactory;
import javax.jms.QueueConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.Queue;
-
-import org.jnp.server.NamingServer;
import javax.naming.InitialContext;
import javax.naming.Context;
-import java.util.Properties;
-import java.io.InputStream;
-import java.util.StringTokenizer;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.NotCompliantMBeanException;
import javax.management.MBeanRegistrationException;
import javax.management.InstanceAlreadyExistsException;
import javax.management.loading.*;
+
+import java.util.Properties;
+import java.io.InputStream;
+import java.util.StringTokenizer;
import java.net.*;
import java.util.Set;
import java.util.LinkedList;
import java.util.Iterator;
+import org.spydermq.security.SecurityManager;
+import org.spydermq.distributed.interfaces.DistributedJMSServer;
+import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
+import org.spydermq.SpyQueueConnectionFactory;
+import org.spydermq.server.JMSServer;
+import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
+import org.spydermq.SpyTopicConnectionFactory;
+
/**
- * Class used to start a JMS service. This can be called from
- * inside another application to start JMS.
+ * Class used to start a JMS service. This can be called from inside another
+ * application to start the JMS provider.
*
* @author Norbert Lataille ([EMAIL PROTECTED])
- * @author Rich Johns ([EMAIL PROTECTED])
- * @author Vincent Sheffer ([EMAIL PROTECTED])
+ * @author Rich Johns ([EMAIL PROTECTED])
+ * @author Vincent Sheffer ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class StartServer implements Runnable
{
- private JMSServer theServer;
- private MBeanServer mBeanServer;
- private LinkedList serviceList = new LinkedList();
-
- SpyTopicConnectionFactory topicConnectionFactory;
- SpyQueueConnectionFactory queueConnectionFactory;
- DistributedTopicConnectionFactory distributedTopicConnectionFactory;
- DistributedQueueConnectionFactory distributedQueueConnectionFactory;
- DistributedJMSServerSetup theDistributedServerSetup;
- DistributedJMSServer theDistributedServer;
- SecurityManager securityManager;
-
- //DEBUG -- For the RemoteControl object
- private static RemoteControlImpl remoteControl;
-
- /**
- * Start the JMS server running in it's own thread.
- *
- * @param server An <code>MBeanServer</code> to
- * add the JMS related JMX services to.
- */
- public void start(MBeanServer server) {
- Thread thread = null;
-
- this.mBeanServer = server;
- if (thread == null) {
- thread = new Thread(this);
- thread.start();
- }
- }
-
- /**
- * Stop the JMS service. This stops the thread and
- * unregisters the JMX services related to JMS.
- */
- public void stop() {
- theServer.stopServer();
-
- /*
- * Need to unregister all of the JMS related JMX services.
- * TODO: Don't know how to do this just yet.
- */
- while (!theServer.isStopped()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- }
-
- /*
- * Now that we know the JMS server has stopped we
- * can unregister the JMS services.
- */
- unregisterServices();
- }
-
- public StartServer()
- {
- }
-
- /**
- * Register a JMX service. This is a wrapper method for
- * <code>MBeanServer.registerMBean</code> that adds the
- * service name to a list. When the JMS service is stopped
- * all of the services that SpyderMQ started are unregistered.
- *
- * @param obj The <code>Object</code> to register.
- * @param objName The name of the service. It is this name
- * that gets added to the list of services.
- */
- private void registerService(Object obj, ObjectName objName)
- throws NotCompliantMBeanException, MBeanRegistrationException,
- InstanceAlreadyExistsException {
- this.mBeanServer.registerMBean(obj, objName);
- if (!serviceList.contains(objName)) {
- serviceList.add(objName);
- }
- }
-
- /**
- * Unregister all of the JMX services from the <code>MBeanServer</code>.
- */
- private void unregisterServices() {
- Iterator iter = null;
-
- iter = serviceList.iterator();
- while (iter.hasNext()) {
- ObjectName objName = null;
-
- objName = (ObjectName) iter.next();
- Log.notice("Unregistering '" + objName + "'");
- try {
- this.mBeanServer.unregisterMBean(objName);
- } catch (Exception e) {
- Log.error("Cannot unregister the JMS service '" +
- objName + "': " + e.getMessage());
- Log.error(e);
- }
- }
- }
-
- public void run() {
- Log.notice("SpyderMQ [v0.5]");
-
- try {
-
- //Load the property file
- InputStream in =
getClass().getClassLoader().getResource("spyderMQ.properties").openStream();
- Properties cfg=new Properties();
- cfg.load(in);
- in.close();
-
- // By default we will start a JNDI Server. We won't
- // if user explicitly tells us not to.
- String noStartJNDI = (String)cfg.get("DoNotStartJNDI" );
- if((noStartJNDI == null) ||
- (noStartJNDI.trim().length() == 0)) {
- //Start the JNDI server
-
- Log.notice( "Starting JNDI Server (JNP)" );
- new org.jnp.server.Main().start();
- } else {
- // Need to warn user because if they fail to start
- // a JNDI server prior to starting spyder, they
- // will be confused by the resulting error.
- Log.notice("[Warning]: SpyderMQ.properties specifys NOT to start a
JNDI Server.");
- Log.notice(" If a JNDI Server is not running SpyderMQ
will not start.");
- }
-
- //Get an InitialContext
- InitialContext ctx=new InitialContext();
-
- //Create a SecurityManager object
- securityManager=new SecurityManager();
-
- //Create the JMSServer object
- theServer = new JMSServer(securityManager);
- theDistributedServerSetup =
JMSServerFactory.createJMSServer(theServer,cfg);
- theDistributedServer=theDistributedServerSetup.createClient();
- String
connectionReceiverCN=(String)cfg.get("ConnectionReceiverClassName");
- registerService(theServer, new ObjectName(JMSServer.OBJECT_NAME));
- registerService(theDistributedServerSetup,
- new
ObjectName("JMS:service=DistributedJMSServerSetup"));
-
- //Get the Topic properties
- String
topicConnectionFactoryCN=(String)cfg.get("DistributedTopicConnectionFactoryClassName");
- if (topicConnectionFactoryCN == null ||
- connectionReceiverCN == null) {
- throw new RuntimeException("Missing configuration parameters");
- }
-
- //Create the distributedTopicConnectionFactory object
- distributedTopicConnectionFactory =
(DistributedTopicConnectionFactory)Class.forName(topicConnectionFactoryCN).newInstance();
- distributedTopicConnectionFactory.setServer(theDistributedServer);
- distributedTopicConnectionFactory.setCRClassName(connectionReceiverCN);
- distributedTopicConnectionFactory.setSecurityManager(securityManager);
- registerService(distributedTopicConnectionFactory,
- new
ObjectName("JMS:service=DistributedTopicConnectionFactory"));
-
- //Create the topicConnectionFactory object
- topicConnectionFactory = new
SpyTopicConnectionFactory(distributedTopicConnectionFactory);
-
- //create the known topics
- Context subcontext=ctx.createSubcontext("topic");
- String topics=(String)cfg.get("knownTopics");
-
- if (topics!=null) {
-
- StringTokenizer st = new StringTokenizer(topics,", ");
-
- while (st.hasMoreElements()) {
- String name=(String)st.nextElement();
- Topic t=theServer.newTopic(name);
- subcontext.rebind(name,t);
- }
-
- } else Log.notice("Warning: no known Topics !");
-
- //Get the queue properties
- String
queueConnectionFactoryCN=(String)cfg.get("DistributedQueueConnectionFactoryClassName");
- if (queueConnectionFactoryCN==null) throw new RuntimeException("Missing
configuration parameter");
-
- //Create the distributedTopicConnectionFactory object
- distributedQueueConnectionFactory =
(DistributedQueueConnectionFactory)Class.forName(queueConnectionFactoryCN).newInstance();
- distributedQueueConnectionFactory.setServer(theDistributedServer);
- distributedQueueConnectionFactory.setCRClassName(connectionReceiverCN);
- distributedQueueConnectionFactory.setSecurityManager(securityManager);
- registerService(distributedQueueConnectionFactory,
- new
ObjectName("JMS:service=DistributedQueueConnectionFactory"));
-
- //Create the topicConnectionFactory object
- queueConnectionFactory = new
SpyQueueConnectionFactory(distributedQueueConnectionFactory);
-
- //create the known queues
- subcontext=ctx.createSubcontext("queue");
- String queues=(String)cfg.get("knownQueues");
-
- if (queues!=null) {
-
- StringTokenizer st = new StringTokenizer(queues,", ");
-
- while (st.hasMoreElements()) {
- String name=(String)st.nextElement();
- Queue q=theServer.newQueue(name);
- subcontext.rebind(name,q);
- }
-
- } else {
- Log.notice("Warning: no known Queues !");
- }
-
- //Set the known Ids
- String ids=(String)cfg.get("knownIds");
-
- if (ids!=null) {
-
- StringTokenizer st = new StringTokenizer(ids,", ");
-
- while (st.hasMoreElements()) {
- String read=(String)st.nextElement();
- int pos=read.indexOf(':');
- if (pos==-1) throw new JMSException("Bad configuration file
(missing separator in knownIds)");
- String name=read.substring(0,pos);
- String passwd=read.substring(pos+1);
- pos=passwd.indexOf(':');
-
- if (pos==-1) {
- Log.log("[JMSServer] new user : Login = "+name+", Id =
[none]");
- securityManager.addUser(name,passwd,null);
- } else {
- String ID=passwd.substring(pos+1);
- Log.log("[JMSServer] new user : Login = "+name+", Id =
"+ID);
- securityManager.addUser(name,passwd.substring(0,pos),ID);
- }
-
- }
-
- } else {
- Log.notice("Warning: no known Ids !");
- }
-
- //(re)bind the connection factories in the JNDI namespace
- ctx.rebind("TopicConnectionFactory",topicConnectionFactory);
- ctx.rebind("QueueConnectionFactory",queueConnectionFactory);
-
- //DEBUG -- Create a RemoteControl object
- //Administration calls will soon be done using JMX.
- remoteControl=new RemoteControlImpl(theServer);
- ctx.rebind("RemoteControl",remoteControl);
-
- } catch (Exception e) {
- Log.error("Cannot start the JMS server ! "+e.getMessage());
- Log.error(e);
- }
-
- }
+ private JMSServer theServer;
+ private MBeanServer mBeanServer;
+ private LinkedList serviceList = new LinkedList();
+ SecurityManager securityManager;
+
+ /**
+ * Start the JMS server running in it's own thread.
+ *
+ * @param server An <code>MBeanServer</code> to
+ * add the JMS related JMX services to.
+ */
+ public void start(MBeanServer server) {
+ Thread thread = null;
+
+ this.mBeanServer = server;
+ if (thread == null) {
+ thread = new Thread(this);
+ thread.start();
+ }
+ }
+
+ /**
+ * Stop the JMS service. This stops the thread and
+ * unregisters the JMX services related to JMS.
+ */
+ public void stop() {
+ theServer.stopServer();
+
+ /*
+ * Need to unregister all of the JMS related JMX services.
+ * TODO: Don't know how to do this just yet.
+ */
+ while (!theServer.isStopped()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /*
+ * Now that we know the JMS server has stopped we
+ * can unregister the JMS services.
+ */
+ unregisterServices();
+ }
+
+ public StartServer()
+ {
+ }
+
+ /**
+ * Register a JMX service. This is a wrapper method for
+ * <code>MBeanServer.registerMBean</code> that adds the
+ * service name to a list. When the JMS service is stopped
+ * all of the services that SpyderMQ started are unregistered.
+ *
+ * @param obj The <code>Object</code> to register.
+ * @param objName The name of the service. It is this name
+ * that gets added to the list of services.
+ */
+ private void registerService(Object obj, ObjectName objName)
+ throws NotCompliantMBeanException, MBeanRegistrationException,
+ InstanceAlreadyExistsException {
+ this.mBeanServer.registerMBean(obj, objName);
+ if (!serviceList.contains(objName)) {
+ serviceList.add(objName);
+ }
+ }
+
+ /**
+ * Unregister all of the JMX services from the <code>MBeanServer</code>.
+ */
+ private void unregisterServices() {
+ Iterator iter = null;
+
+ iter = serviceList.iterator();
+ while (iter.hasNext()) {
+ ObjectName objName = null;
+
+ objName = (ObjectName) iter.next();
+ System.out.println("Unregistering '" + objName + "'");
+ try {
+ this.mBeanServer.unregisterMBean(objName);
+ } catch (Exception e) {
+ System.err.println("Cannot unregister the JMS service
'" +
+ objName + "': " + e.getMessage());
+ System.err.println(e);
+ }
+ }
+ }
+
+ public void run() {
+ System.out.println("SpyderMQ [v0.5]");
+
+ try {
+
+ //Load the property file
+ InputStream in =
getClass().getClassLoader().getResource("spyderMQ.properties").openStream();
+ Properties cfg=new Properties();
+ cfg.load(in);
+ in.close();
+
+ //Get an InitialContext
+ InitialContext ctx=new InitialContext();
+
+ //Create a SecurityManager object
+ securityManager=new SecurityManager();
+
+ //Create the JMSServer object
+ theServer = new JMSServer(securityManager);
+ registerService(theServer, new
ObjectName(JMSServer.OBJECT_NAME));
+
+ //create the known topics
+ Context subcontext=ctx.createSubcontext("topic");
+ String topics=(String)cfg.get("knownTopics");
+
+ if (topics!=null) {
+
+ StringTokenizer st = new StringTokenizer(topics,", ");
+
+ while (st.hasMoreElements()) {
+ String name=(String)st.nextElement();
+ Topic t=theServer.newTopic(name);
+ subcontext.rebind(name,t);
+ }
+
+ } else System.out.println("Warning: no known Topics !");
+
+ //create the known queues
+ subcontext=ctx.createSubcontext("queue");
+ String queues=(String)cfg.get("knownQueues");
+
+ if (queues!=null) {
+
+ StringTokenizer st = new StringTokenizer(queues,", ");
+
+ while (st.hasMoreElements()) {
+ String name=(String)st.nextElement();
+ Queue q=theServer.newQueue(name);
+ subcontext.rebind(name,q);
+ }
+
+ } else {
+ System.out.println("Warning: no known Queues !");
+ }
+
+ //Set the known Ids
+ String ids=(String)cfg.get("knownIds");
+
+ if (ids!=null) {
+
+ StringTokenizer st = new StringTokenizer(ids,", ");
+
+ while (st.hasMoreElements()) {
+ String read=(String)st.nextElement();
+ int pos=read.indexOf(':');
+ if (pos==-1) throw new JMSException("Bad
configuration file (missing separator in knownIds)");
+ String name=read.substring(0,pos);
+ String passwd=read.substring(pos+1);
+ pos=passwd.indexOf(':');
+
+ if (pos==-1) {
+ //System.out.println("[JMSServer] new
user : Login = "+name+", Id = [none]");
+
securityManager.addUser(name,passwd,null);
+ } else {
+ String ID=passwd.substring(pos+1);
+ //System.out.println("[JMSServer] new
user : Login = "+name+", Id = "+ID);
+
securityManager.addUser(name,passwd.substring(0,pos),ID);
+ }
+
+ }
+
+ } else {
+ System.out.println("Warning: no known Ids !");
+ }
+
+ //Set up the transports for the server
+ InvocationLayerFactory invocationLayerFactory= new
InvocationLayerFactory();
+ invocationLayerFactory.distributedJMSServerClassName =
(String)cfg.get("DistributedJMSServerClassName");
+ invocationLayerFactory.connectionReceiverClassName =
(String)cfg.get("ConnectionReceiverClassName");
+ invocationLayerFactory.distributedConnectionFactoryClassName =
(String)cfg.get("DistributedConnectionFactoryClassName");
+
+ invocationLayerFactory.createObjects(theServer);
+
+
registerService(invocationLayerFactory.distributedJMSServerSetup,
+ new ObjectName("JMS:service=DistributedJMSServerSetup"));
+
+
registerService(invocationLayerFactory.distributedConnectionFactory,
+ new ObjectName("JMS:service=DistributedConnectionFactory"));
+
+ //(re)bind the connection factories in the JNDI namespace
+
ctx.rebind("TopicConnectionFactory",invocationLayerFactory.spyTopicConnectionFactory);
+
ctx.rebind("QueueConnectionFactory",invocationLayerFactory.spyQueueConnectionFactory);
+
+
+ } catch (Exception e) {
+ System.err.println("Cannot start the JMS server !
"+e.getMessage());
+ System.err.println(e);
+ }
+
+ }
}
1.1
spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java
Index: JMSServerQueueReceiver.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.DeliveryMode;
import org.spydermq.*;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
import java.util.HashMap;
import java.util.Iterator;
import java.io.Serializable;
/**
* This class manages a connectionreceiver for a JMSServerQueue.
* Keeps track of listening state and unacknowleged messages.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSServerQueueReceiver implements Serializable {
// The queue messages will be comming from
public JMSServerQueue jmsSeverQueue;
// The connection mewssages will be going to
public SpyDistributedConnection dc;
// Used to know if the connection is accepting messages.
public boolean listeners;
public int receiveReuquests;
// Keeps track of the unacknowledged messages send to the connection.
public transient HashMap unacknowledgedMessages;
// Consturctor ---------------------------------------------------
public JMSServerQueueReceiver(JMSServerQueue serverQueue,
SpyDistributedConnection sdc) {
jmsSeverQueue = serverQueue;
dc = sdc;
listeners = false;
receiveReuquests = 0;
unacknowledgedMessages = new HashMap();
Log.log("A ServerQueueReceiver has been created for : " +
serverQueue.destination + "/" + dc.getClientID());
}
void acknowledge(String messageId, boolean ack) throws javax.jms.JMSException {
SpyMessage m;
synchronized (unacknowledgedMessages) {
m = (SpyMessage) unacknowledgedMessages.remove(messageId);
}
if (m == null)
return;
if (jmsSeverQueue.isTopic) {
// Not sure how we should handle the topic case.
// On a negative acknowledge, we don't want to
// add it back to the topic since other
// receivers might get a duplicate duplicate message.
} else {
// Was it a negative acknowledge??
if (!ack) {
Log.log("Restoring message: " + m.messageId);
jmsSeverQueue.restoreMessage(m);
} else {
if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
) {
jmsSeverQueue.spyMessageLog.logRemoveMessage(m);
jmsSeverQueue.spyMessageLog.commit();
}
Log.log("Message Ack: " + m.messageId);
}
}
}
// The connection is accepting new messages if there
// is a listener or if the receiver has requested a message.
public boolean isListening() {
return listeners || receiveReuquests > 0;
}
// Called when we send one message.
public void removeReceiver() {
if (receiveReuquests == 0)
return;
receiveReuquests--;
if (receiveReuquests == 0 && !listeners) {
jmsSeverQueue.listeners--;
}
}
// This method gets invoked as a result of a receive() method call
// on a QueueReceiver
public void addReceiver(long wait) {
// TODO: figure out a way to make a reciver eligable for
// wait amount of time.
receiveReuquests++;
if (receiveReuquests == 1 && !listeners) {
jmsSeverQueue.listeners++;
}
}
public void setListening(boolean value) {
if (value == listeners)
return;
listeners = value;
if (value && receiveReuquests == 0)
jmsSeverQueue.listeners++;
if (!value && receiveReuquests == 0)
jmsSeverQueue.listeners--;
}
void sendMultipleMessages(SpyMessage mes[]) throws Exception {
Log.log("DISPATCH: " + mes.length + " messages => " +
dc.getClientID());
if (!jmsSeverQueue.isTopic) {
synchronized (unacknowledgedMessages) {
for (int i = 0; i < mes.length; i++) {
unacknowledgedMessages.put(mes[i].getJMSMessageID(), mes[i]);
}
}
}
dc.cr.receiveMultiple(jmsSeverQueue.destination, mes);
}
void sendOneMessage(SpyMessage mes) throws Exception {
Log.log("DISPATCH: Message: " + mes + " => " + dc.getClientID());
if (!jmsSeverQueue.isTopic) {
synchronized (unacknowledgedMessages) {
unacknowledgedMessages.put(mes.getJMSMessageID(), mes);
}
}
removeReceiver();
dc.cr.receive(jmsSeverQueue.destination, mes);
}
public void close() {
Log.log("A ServerQueueReceiver has been closed: " +
jmsSeverQueue.destination + "/" + dc.getClientID());
if (isListening())
jmsSeverQueue.listeners--;
synchronized (unacknowledgedMessages) {
Iterator iter = unacknowledgedMessages.values().iterator();
Log.log("Restoring " + unacknowledgedMessages.size() + "
messages");
while (iter.hasNext()) {
SpyMessage m = (SpyMessage) iter.next();
jmsSeverQueue.restoreMessage(m);
iter.remove();
}
}
}
}
1.1
spyderMQ/src/java/org/spydermq/server/InvocationLayerFactory.java
Index: InvocationLayerFactory.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import org.spydermq.Log;
import org.spydermq.server.JMSServer;
import org.spydermq.SpyConnection;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.SpyQueueConnectionFactory;
import org.spydermq.SpyTopicConnectionFactory;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import java.util.Properties;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.Remote;
public class InvocationLayerFactory
{
// Set these attributes before the createObjects() call
String connectionReceiverClassName;
String distributedConnectionFactoryClassName;
String distributedJMSServerClassName;
// These will be set after the createObjects() call
DistributedJMSServerSetup distributedJMSServerSetup;
DistributedJMSServer distributedJMSServer;
DistributedConnectionFactory distributedConnectionFactory;
SpyQueueConnectionFactory spyQueueConnectionFactory;
SpyTopicConnectionFactory spyTopicConnectionFactory;
public void createObjects(JMSServer s) throws Exception
{
//Get the Topic properties
if (distributedJMSServerClassName == null ||
distributedConnectionFactoryClassName == null || connectionReceiverClassName==null ) {
throw new RuntimeException("Missing configuration parameters");
}
distributedJMSServerSetup =
(DistributedJMSServerSetup)Class.forName(distributedJMSServerClassName).newInstance();
distributedJMSServerSetup.setServer(s);
distributedJMSServer = distributedJMSServerSetup.createClient();
//Create the distributedTopicConnectionFactory object
distributedConnectionFactory =
(DistributedConnectionFactory)Class.forName(distributedConnectionFactoryClassName).newInstance();
distributedConnectionFactory.setServer(distributedJMSServer);
distributedConnectionFactory.setSecurityManager(s.getSecurityManager());
distributedConnectionFactory.setConnectionReceiverClassName(connectionReceiverClassName);
//Create the Topic and Queue Connection Factory objects
spyTopicConnectionFactory = new
SpyTopicConnectionFactory(distributedConnectionFactory);
spyQueueConnectionFactory = new
SpyQueueConnectionFactory(distributedConnectionFactory);
}
}
1.1 spyderMQ/src/java/org/spydermq/server/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import org.spydermq.*;
import org.spydermq.persistence.SpyMessageLog;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
/**
* This class is a message queue which is stored (hashed by Destination) on the
JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSServerQueue {
// Attributes ----------------------------------------------------
//the Destination of this queue
SpyDestination destination;
//DistributedConnection objs that have "registered" to this Destination
private HashMap subscribers;
//List of Pending messages
private TreeSet messages;
//Is a thread already working on this queue ?
//You cannot start two threads on the same destination (correct order of msgs)
private boolean threadWorking;
// Am I already in the task queue ? It is useless to put many times the same
destination in the task queue.
private boolean alreadyInTaskQueue;
//If this is linked to a temporaryDestination,
temporaryDestination=DistributedConnection of the owner, otherwise it's null
SpyDistributedConnection temporaryDestination;
//The JMSServer object
private JMSServer server;
//Am I a queue or a topic
boolean isTopic;
//Nb of listeners for this Queue
int listeners;
//Counter used to number incomming messages. (Used to order the messages.)
long messageIdCounter = Long.MIN_VALUE;
// Keeps track of the last used connection so that we can do round robin
distribution of p2p messages.
private JMSServerQueueReceiver lastUsedQueueReceiver;
// Should we use the round robin aproach to pick the next reciver of a p2p
message?
private boolean useRoundRobinMessageDistribution = true;
// Used to log the persistent messages.
SpyMessageLog spyMessageLog;
// Constructor ---------------------------------------------------
JMSServerQueue(SpyDestination dest,SpyDistributedConnection
temporary,JMSServer server) throws JMSException
{
destination=dest;
subscribers=new HashMap();
messages=new TreeSet();
threadWorking=false;
alreadyInTaskQueue=false;
temporaryDestination=temporary;
this.server=server;
isTopic=dest instanceof SpyTopic;
listeners=0;
spyMessageLog = new SpyMessageLog(
dest.getName()+"-transaction-log.dat" );
SpyMessage[] rebuild = spyMessageLog.rebuildMessagesFromLog();
for( int i=0; i < rebuild.length; i++ ) {
restoreMessage( rebuild[i] );
messageIdCounter = Math.max( messageIdCounter,
rebuild[i].messageId+1 );
}
Log.notice("Restored "+rebuild.length+" messages to "+dest.getName()+"
from the transaction log");
}
// Package protected ---------------------------------------------
void addSubscriber(SpyDistributedConnection dc) throws JMSException
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
if
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new
JMSException("You cannot subscriber to this temporary destination");
Object qr = subscribers.get(dc.getClientID());
if( qr == null ) {
subscribers.put(dc.getClientID(),new
JMSServerQueueReceiver(this,dc));
}
}
}
void removeSubscriber(SpyDistributedConnection dc,Iterator i)
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
if (qr==null) return;
qr.close();
if (i==null) {
if (subscribers.remove(dc.getClientID())==null)
Log.notice("WARNING: Could not remove "+dc.getClientID());
} else {
i.remove();
}
}
}
public void addMessage(SpyMessage mes) throws JMSException
{
//Add a message to the message list...
synchronized (messages)
{
//Add the message to the queue
//messages is now an ordered tree. The order depends
//first on the priority and then on messageId
mes.messageId = messageIdCounter++;
if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
spyMessageLog.logAddMessage(mes);
spyMessageLog.commit();
}
messages.add(mes);
if (isTopic) {
//if a thread is already working on this destination,
I don't have to myself to the taskqueue
if (!threadWorking) notifyWorkers();
} else {
if (listeners!=0&&!threadWorking) notifyWorkers();
}
}
}
//Clear the message queue
synchronized SpyMessage[] startWork()
{
if (threadWorking) throw new RuntimeException("One thread is already
working !");
synchronized (messages) {
SpyMessage[] mes=new SpyMessage[messages.size()];
mes=(SpyMessage[])messages.toArray(mes);
//<DEBUG>
messages.clear();
//</DEBUG>
threadWorking=true;
alreadyInTaskQueue=false;
return mes;
}
}
synchronized SpyMessage startWorkQueue()
{
synchronized (messages) {
threadWorking=true;
alreadyInTaskQueue=false;
if (messages.size()==0) return null;
SpyMessage m = (SpyMessage)messages.first();
messages.remove(m);
return m;
}
}
void endWork()
{
//The thread has finished his work...
threadWorking=false;
synchronized (messages) {
if (isTopic) {
//notify another thread if there is work to do !
if (!messages.isEmpty()) notifyWorkers();
} else {
if (listeners!=0&&!messages.isEmpty()) notifyWorkers();
}
}
}
void sendOneMessage(SpyMessage mes)
{
//we can only add/remove a subscribers once the message is sent (
iterator is fail-fast )
synchronized (subscribers) {
if (subscribers.isEmpty()) return;
Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)i.next();
try {
qr.sendOneMessage(mes);
} catch (Exception e) {
Log.notice("Cannot deliver this message to the
client: "+qr.dc.getClientID());
Log.notice(e);
handleConnectionFailure(qr.dc,i);
}
}
}
}
void sendMultipleMessages(SpyMessage mes[])
{
synchronized (subscribers) {
if (subscribers.isEmpty()) return;
Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)i.next();
try {
qr.sendMultipleMessages(mes);
} catch (Exception e) {
Log.error("Cannot deliver those messages to
the client "+qr.dc.getClientID());
Log.error(e);
handleConnectionFailure(qr.dc,i);
}
}
}
}
//A connection is closing
void connectionClosing(SpyDistributedConnection dc)
{
if (!subscribers.containsKey(dc.getClientID())) return;
Log.notice("Warning: The DistributedConnection was still registered
for "+destination);
removeSubscriber(dc,null);
}
void notifyWorkers()
{
//It is useless to put many times the same destination in the task
queue
if (alreadyInTaskQueue) return;
synchronized (server.taskQueue) {
alreadyInTaskQueue=true;
server.taskQueue.addLast(this);
server.taskQueue.notify();
}
}
private void handleConnectionFailure(SpyDistributedConnection dc,Iterator i)
{
//We should try again :) This behavior should under control of a
Failure-Plugin
Log.error("I remove the Connection "+dc.getClientID()+" from the
subscribers list");
//Call JMSServer.ConnectionClosing(), but ask him not to check my list.
server.connectionClosing(dc,this);
//remove this connection from the list
removeSubscriber(dc,i);
}
void doMyJob() throws JMSException
{
if (isTopic) {
//Clear the message queue
SpyMessage[] msgs=startWork();
boolean msgRemoved=false;
for( int i=0; i < msgs.length; i++ ) {
if( msgs[i].getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
spyMessageLog.logRemoveMessage(msgs[i]);
msgRemoved=true;
}
}
if( msgRemoved )
spyMessageLog.commit();
//Let the thread do its work
if (msgs.length == 1) {
if (!msgs[0].isOutdated()) {
sendOneMessage(msgs[0]);
}
} else if (msgs.length>1) {
//We can send multiple messages
sendMultipleMessages(msgs);
}
//Notify that it has finished its work : another thread can
start working on this queue
endWork();
} else {
Log.log("Dispatching messages");
synchronized (this) {
//In the Queue case, we synchronize on [this] to avoid
changes (listening modifications)
//while we are dispatching messages
while (true) {
//At first, find a receiver
//NL: We could find a better receiver (load
balancing ?)
//HC: Using Round Robin should provide some
load balancing
//Get the message ( if there is one message
pending )
SpyMessage mes=startWorkQueue();
if (mes==null) {
Log.log("Done dispatching messages: No
more message to send");
break;
}
if (mes.isOutdated()) {
if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
spyMessageLog.logRemoveMessage(mes);
spyMessageLog.commit();
}
continue;
}
// we may have to restore the
lastUsedQueueReceiver
// if message on the queue is not sent. (we
don't want to skip
// destination in the round robin)
JMSServerQueueReceiver qr;
if( useRoundRobinMessageDistribution ) {
qr=pickNextRoundRobinQueueReceiver();
} else {
qr=pickFirstFoundQueueReceiver();
}
if ( qr == null ) {
restoreMessage(mes);
Log.log("Done dispatching messages: No
receiver available for dispatch");
break;
}
//Send the message
try {
qr.sendOneMessage(mes);
} catch (NoReceiverException e) {
//Log.log(e);
Log.log("Got a NoReceiverException
from: "+qr.dc.getClientID());
restoreMessage(mes);
qr.setListening(false);
} catch (JMSException e) {
throw e;
} catch (Exception e) {
//This is a transport failure. We
should define our own Transport Failure class
//for a better execption catching
restoreMessage(mes);
Log.error("Cannot deliver this message
to the client "+qr.dc.getClientID());
Log.error(e);
handleConnectionFailure(qr.dc,null);
}
}
//Notify that it has finished its work : another
thread can start working on this queue
endWork();
}
}
}
void connectionListening(boolean mode,SpyDistributedConnection dc) throws
JMSException
{
// Before check
// Synchronized code : We want to avoid sending messages while we are
changing the connection status
synchronized (this) {
JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
if (qr==null) throw new JMSException("This
DistributedConnection is not registered");
qr.setListening(mode);
if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
synchronized (messages) {
if (!messages.isEmpty()) notifyWorkers();
}
}
}
}
public void acknowledge(SpyDistributedConnection dc, String messageId, boolean
isAck) throws JMSException {
JMSServerQueueReceiver qr =
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
if( qr==null )
throw new JMSException("You have not subscribed to this
destination.");
qr.acknowledge(messageId, isAck);
}
/**
* Get a SpyDistributedConnection object that is listening
* to this queue. Picks the first one it can find.
*/
private JMSServerQueueReceiver pickFirstFoundQueueReceiver() {
// No valid next connection will exist, return null
if (listeners == 0)
return null;
Iterator i = subscribers.values().iterator();
while (i.hasNext()) {
JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
// Test to see if the connection is valid pick
if (t.isListening()) {
return t;
}
}
// We got here because we did not find a valid item in the list.
Log.error("FIXME: The listeners count was invalid !");
return null;
}
/**
* Get a JMSServerQueueReceiver object that is listening
* to this queue. If multiple objects are listening to the queue
* this multiple calls to this method will cycle through them in a round
* robin fasion.
*/
private JMSServerQueueReceiver pickNextRoundRobinQueueReceiver() {
// No valid next connection will exist, return null
if (listeners == 0)
return null;
Iterator i = subscribers.values().iterator();
JMSServerQueueReceiver firstFoundConnection = null;
boolean enableSelectNext = false;
while (i.hasNext()) {
JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
// Select the next valid connection if we are past the last
used connection
if (t == lastUsedQueueReceiver || lastUsedQueueReceiver ==
null)
enableSelectNext = true;
// Test to see if the connection is valid pick
if (t.isListening()) {
// Store the first valid connection since the last
used might be the last
// in the list
if (firstFoundConnection == null)
firstFoundConnection = t;
// Are we past the last used? then we have the next
item in the round robin
if (enableSelectNext && t != lastUsedQueueReceiver) {
lastUsedQueueReceiver = t;
return t;
}
}
}
// We got here because we did not find a valid item in the list after
the last
// used item, so lest use the first valid item
if (firstFoundConnection != null) {
lastUsedQueueReceiver = firstFoundConnection;
return firstFoundConnection;
} else {
Log.error("FIXME: The listeners count was invalid !");
return null;
}
}
//Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
SpyMessage queueReceive(long wait, SpyDistributedConnection dc) throws
JMSException
{
// Synchronized code : We want to avoid sending messages while we are
changing the connection status
synchronized (this) {
JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
if (qr==null) throw new JMSException("This
DistributedConnection is not registered");
if( wait < 0 ) {
synchronized (messages) {
if (messages.size()==0) return null;
SpyMessage m = (SpyMessage)messages.first();
messages.remove(m);
return m;
}
} else {
qr.addReceiver(wait);
if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue)
{
synchronized (messages) {
if (!messages.isEmpty())
notifyWorkers();
}
}
return null;
}
}
}
//Used to put a message that was added previously to the queue, back in the
queue
public void restoreMessage(SpyMessage mes)
{
//restore a message to the message list...
synchronized (messages) {
messages.add(mes);
if (isTopic) {
//if a thread is already working on this destination,
I don't have to myself to the taskqueue
if (!threadWorking) notifyWorkers();
} else {
if (listeners!=0&&!threadWorking) notifyWorkers();
}
}
}
}
1.1 spyderMQ/src/java/org/spydermq/server/JMSServerMBean.java
Index: JMSServerMBean.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
import org.spydermq.*;
/**
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface JMSServerMBean
{
public SpyTopic newTopic(String name) throws JMSException;
public SpyQueue newQueue(String name) throws JMSException;
}
1.1 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
import javax.jms.Destination;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
import javax.jms.Topic;
import javax.jms.Queue;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.Iterator;
import org.spydermq.*;
import org.spydermq.security.SecurityManager;
/**
* This class implements the JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
{
// Constants -----------------------------------------------------
//number of threads in the pool (TO DO: this value should be dynamic)
final int NB_THREADS=1;
public static final String OBJECT_NAME = "JMS:service=JMSServer";
// Attributes ----------------------------------------------------
//messages pending for a Destination ( HashMap of JMSServerQueue objects )
public HashMap messageQueue;
//list of tasks pending ( linked list of JMSServerQueue objects )
LinkedList taskQueue; //look when we unregister a temporaryTopic/Queue
//last id given to a client
private int lastID;
//last id given to a temporary topic
private int lastTemporaryTopic;
//last id given to a temporary queue
private int lastTemporaryQueue;
//The security manager
SecurityManager securityManager;
/**
* <code>true</code> when the server is running. <code>false</code> when the
* server should stop running.
*/
private boolean alive = true;
/**
* Because there can be a delay between killing the JMS service and the
* service actually dying, this field is used to tell external classes
* that that server has actually stopped.
*/
private boolean stopped = true;
// Constructor ---------------------------------------------------
public JMSServer(SecurityManager securityManager)
{
taskQueue=new LinkedList();
messageQueue=new HashMap();
for(int i=0;i<NB_THREADS;i++)
{
Thread oneThread=new Thread(this);
oneThread.setDaemon(true);
oneThread.setName(new Integer(i).toString());
oneThread.start();
}
lastID=1;
lastTemporaryTopic=1;
this.securityManager=securityManager;
}
/**
* Returns <code>false</code> if the JMS server is currently
* running and handling requests, <code>true</code> otherwise.
*
* @return <code>false</code> if the JMS server is currently
* running and handling requests, <code>true</code>
* otherwise.
*/
public boolean isStopped() {
return this.stopped;
}
// Public --------------------------------------------------------
//This is a correct threading system, but this is not ideal...
//We should let threads cycle through the JMSServerQueue list, and
synchronized on the queue they are working on.
public void run() {
while (alive) {
JMSServerQueue queue = null;
this.stopped = false;
//Wait (and sleep) until it can find something to do
synchronized (taskQueue) {
while (queue == null && alive) {
// size() is O(1) in LinkedList...
int size=taskQueue.size();
if (size!=0) {
//<DEBUG>
queue =
(JMSServerQueue)taskQueue.removeFirst();
//queue=(JMSServerQueue)taskQueue.getFirst();
//</DEBUG>
//One other thread can start
working on the task queue...
if (size > 1) {
taskQueue.notify();
}
} else {
try {
//Log.log("I'm going
to bed...");
taskQueue.wait(5000);
//Log.log("I wake up");
} catch (InterruptedException
e) {
}
}
}
}
if (alive) {
//Ask the queue to do its job
try {
queue.doMyJob();
} catch (JMSException e) {
Log.error(e);
}
}
}
Log.log("JMS service stopped.");
this.stopped = true;
}
public void stopServer() {
this.alive = false;
}
// Administration calls
public SpyTopic newTopic(String name) throws JMSException
{
Log.notice("[JMSServer] new topic : "+name);
SpyTopic newTopic=new SpyTopic(name);
if (messageQueue.containsKey(newTopic)) throw new JMSException("This
topic already exists !");
JMSServerQueue queue=new JMSServerQueue(newTopic,null,this);
//Add this new JMSServerQueue to the list
synchronized (messageQueue) {
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(newTopic,queue);
messageQueue=newMap;
}
return newTopic;
}
public SpyQueue newQueue(String name) throws JMSException
{
Log.notice("[JMSServer] new queue : "+name);
SpyQueue newQueue=new SpyQueue(name);
if (messageQueue.containsKey(newQueue)) throw new JMSException("This
queue already exists !");
JMSServerQueue queue=new JMSServerQueue(newQueue,null,this);
//Add this new JMSServerQueue to the list
synchronized (messageQueue) {
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(newQueue,queue);
messageQueue=newMap;
}
return newQueue;
}
// -----------------------------------------
// Callbacks for the invocation layer ------
// -----------------------------------------
//Get a new ClientID for a connection
public String getID()
{
String ID=null;
while (true) {
try {
ID="ID"+(new Integer(lastID++).toString());
securityManager.addClientID(ID);
break;
} catch (Exception e) {
}
}
return ID;
}
public synchronized SpyTopic createTopic(String name) throws JMSException
{
Log.log("createTopic("+name+")");
SpyTopic newTopic=new SpyTopic(name);
if (!messageQueue.containsKey(newTopic)) throw new JMSException("This
destination does not exist !");
return newTopic;
}
public synchronized SpyQueue createQueue(String name) throws JMSException
{
Log.log("createQueue("+name+")");
SpyQueue newQueue=new SpyQueue(name);
if (!messageQueue.containsKey(newQueue)) throw new JMSException("This
destination does not exist !");
return newQueue;
}
public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection
dc) throws JMSException
{
SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new
Integer(lastTemporaryTopic++).toString()),dc);
synchronized (messageQueue) {
JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(topic,queue);
messageQueue=newMap;
}
return topic;
}
public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection
dc) throws JMSException
{
SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new
Integer(lastTemporaryQueue++).toString()),dc);
synchronized (messageQueue) {
JMSServerQueue sessionQueue=new
JMSServerQueue(newQueue,dc,this);
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(newQueue,sessionQueue);
messageQueue=newMap;
}
return newQueue;
}
//A connection is closing [error or notification]
public synchronized void connectionClosing(SpyDistributedConnection
dc,JMSServerQueue noCheck)
{
Log.log("connectionClosing(dc="+dc+",noCheck="+noCheck+")");
if (dc==null) return;
//unregister its clientID
if (dc.getClientID()!=null)
securityManager.removeID(dc.getClientID());
//Remove the connection from the subscribers list
synchronized (messageQueue) {
HashMap newMap=null;
Iterator i=messageQueue.values().iterator();
boolean modified=false; //don't waste memory
while (i.hasNext()) {
JMSServerQueue sq=(JMSServerQueue)i.next();
if (dc.equals(sq.temporaryDestination)) {
if (!modified)
newMap=(HashMap)messageQueue.clone();
newMap.remove(sq.destination);
modified=true;
} else {
if (sq==noCheck) continue;
sq.connectionClosing(dc);
}
}
if (modified) messageQueue=newMap;
}
}
public synchronized void deleteTemporaryDestination(SpyDestination dest)
{
Log.log("JMSServer: deleteDestination(dest="+dest.toString()+")");
synchronized (messageQueue) {
HashMap newMap=(HashMap)messageQueue.clone();
newMap.remove(dest);
messageQueue=newMap;
}
}
public void checkID(String ID) throws JMSException
{
securityManager.addClientID(ID);
}
public void finalize()
{
Log.error("JMSServer.finalize()");
}
//Sent by a client to Ack or Nack a message.
public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws JMSException
{
JMSServerQueue
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
if (serverQueue==null) throw new JMSException("Destination does not
exist: "+item.jmsDestination);
serverQueue.acknowledge(dc, item.jmsMessageID, item.isAck);
}
//A connection has sent a new message
public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws
JMSException
{
Log.notice("INCOMING: "+dc.getClientID()+" => "+val.jmsDestination);
JMSServerQueue
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
if (queue==null) throw new JMSException("This destination does not
exist !");
//Add the message to the queue
queue.addMessage(val);
}
public void connectionListening(SpyDistributedConnection dc,boolean
mode,Destination dest) throws JMSException
{
JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
if (serverQueue==null) throw new JMSException("This destination does
not exist !");
serverQueue.connectionListening(mode,dc);
}
public org.spydermq.security.SecurityManager getSecurityManager() {
return securityManager;
}
//Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
public SpyMessage queueReceive(SpyDistributedConnection dc,Queue queue, long
wait) throws JMSException
{
Log.log("JMSserver: queueReceive(queue="+queue+",wait="+wait+")");
JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
if (serverQueue==null) throw new JMSException("This destination does
not exist !");
return serverQueue.queueReceive(wait,dc);
}
//A connection object wants to subscribe to a Destination
public void subscribe(SpyDistributedConnection dc,Destination dest) throws
JMSException
{
Log.log("Server:
subscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
if (queue==null) throw new JMSException("This destination does not
exist !");
queue.addSubscriber(dc);
}
/**
* TODO: The following function has to be performed as a Unit Of Work.
*
* for now we just do a quick hack to make it work most of the time.
*/
public void transact(SpyDistributedConnection dc, Transaction t) throws
JMSException {
if( t.messages != null ) {
for( int i=0; i < t.messages.length; i++ ) {
addMessage(dc, t.messages[i]);
}
}
if( t.acks != null ) {
for( int i=0; i < t.acks.length; i++ ) {
acknowledge(dc, t.acks[i]);
}
}
}
public void unsubscribe(SpyDistributedConnection dc,Destination dest) throws
JMSException
{
Log.log("Server:
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
if (queue==null) throw new JMSException("This destination does not
exist !");
queue.removeSubscriber(dc,null);
}
}