jaliya 2005/02/24 05:51:15
Modified: sandesha/src/org/apache/sandesha Constants.java
RMInitiator.java RMTransport.java
sandesha/src/org/apache/sandesha/client
ClientPropertyValidator.java
ClientStorageManager.java RMSender.java
RMService.java
sandesha/src/org/apache/sandesha/server Sender.java
sandesha/src/org/apache/sandesha/storage/dao
ISandeshaDAO.java SandeshaDatabaseDAO.java
SandeshaQueueDAO.java
sandesha/src/org/apache/sandesha/storage/queue
SandeshaQueue.java
Log:
Refactored the code
Revision Changes Path
1.30 +6 -0 ws-fx/sandesha/src/org/apache/sandesha/Constants.java
Index: Constants.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/Constants.java,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -r1.29 -r1.30
--- Constants.java 21 Feb 2005 12:08:21 -0000 1.29
+++ Constants.java 24 Feb 2005 13:51:13 -0000 1.30
@@ -44,6 +44,8 @@
*/
public static final int SOURCE_LISTEN_PORT = 8090;
+ public static final int DEFAULR_CLIENT_SIDE_LISTENER_PORT=8090;
+
/**
* Namespace for wsu.
*/
@@ -266,6 +268,10 @@
public static final int CLIENT_RESPONSE_CHECKING_INTERVAL = 500;
+
+ public static final long CLIENT_WAIT_PERIOD_FOR_COMPLETE=10000l;
+
+
public interface FaultMessages {
public static final String SERVER_INTERNAL_ERROR="Server Interanal
Error";
1.4 +56 -26 ws-fx/sandesha/src/org/apache/sandesha/RMInitiator.java
Index: RMInitiator.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/RMInitiator.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RMInitiator.java 22 Feb 2005 13:49:02 -0000 1.3
+++ RMInitiator.java 24 Feb 2005 13:51:13 -0000 1.4
@@ -16,27 +16,22 @@
*/
package org.apache.sandesha;
-import org.apache.axis.Handler;
-import org.apache.axis.SimpleChain;
-import org.apache.axis.deployment.wsdd.WSDDDocument;
import org.apache.axis.deployment.wsdd.WSDDDeployment;
-import org.apache.axis.configuration.SimpleProvider;
-import org.apache.axis.description.JavaServiceDesc;
+import org.apache.axis.deployment.wsdd.WSDDDocument;
import org.apache.axis.handlers.soap.SOAPService;
-import org.apache.axis.message.addressing.handler.AddressingHandler;
+import org.apache.axis.server.AxisServer;
import org.apache.axis.transport.http.SimpleAxisServer;
import org.apache.sandesha.client.ClientStorageManager;
import org.apache.sandesha.server.RMInvoker;
import org.apache.sandesha.server.Sender;
import org.apache.sandesha.server.ServerStorageManager;
-import org.apache.sandesha.ws.rm.handlers.RMServerRequestHandler;
import org.apache.sandesha.ws.rm.providers.RMProvider;
+import org.apache.util.PropertyLoader;
import org.w3c.dom.Document;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.DocumentBuilder;
import javax.xml.namespace.QName;
-import java.io.IOException;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.net.ServerSocket;
@@ -53,14 +48,17 @@
private static boolean senderStarted = false;
private static boolean listenerStarted = false;
private static SimpleAxisServer sas = null;
+ private static Thread thSender;
+ private static Thread thInvoker;
+ private static Sender sender;
public static IStorageManager init(boolean client) {
if (client) {
IStorageManager storageManager = new ClientStorageManager();
if (!senderStarted) {
System.out.println("INFO: Sender Thread started .....\n");
- Sender sender = new Sender(storageManager);
- Thread thSender = new Thread(sender);
+ sender = new Sender(storageManager);
+ thSender = new Thread(sender);
thSender.setDaemon(false);
senderStarted = true;
thSender.start();
@@ -78,7 +76,7 @@
if (!rmInvokerStarted) {
System.out.println("INFO: RMInvoker thread started ....\n");
RMInvoker rmInvoker = new RMInvoker();
- Thread thInvoker = new Thread(rmInvoker);
+ thInvoker = new Thread(rmInvoker);
thInvoker.setDaemon(true);
rmInvokerStarted = true;
thInvoker.start();
@@ -95,33 +93,65 @@
}
}
+ public static RMStatus stopClient() {
+
+ //This should check whether we have received all the acks or
reponses if any
+ IStorageManager storageManager = new ClientStorageManager();
+
+// while(!storageManager.isAllSequenceComplete()){
+// try {
+// System.out.println("Checking to stop the
client......................");
+// Thread.sleep(1000);
+// } catch (InterruptedException e) {
+// e.printStackTrace(); //To change body of catch statement
use File | Settings | File Templates.
+// }
+// }
+
+ if (listenerStarted)
+ sas.stop();
+ try {
+ Thread.sleep(Constants.CLIENT_WAIT_PERIOD_FOR_COMPLETE);
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use
File | Settings | File Templates.
+ }
+ sender.setRunning(false);
+ return new RMStatus();
+
+
+ }
+
private static void startListener() {
- /*
- try {
- SimpleAxisServer sas = new SimpleAxisServer();
+
+ try {
+ System.out.println("Sandesha Client Side Listener Started ....");
+ sas = new SimpleAxisServer();
DocumentBuilderFactory dbf =
DocumentBuilderFactory.newInstance();
dbf.setNamespaceAware(true);
DocumentBuilder db = dbf.newDocumentBuilder();
- //Need to change the path
- Document doc = db.parse(new File("config\\listener-config.wsdd"));
-
//"C:/SandeshaPrj/SandeshaIMPL/PrjWorkspace/bin/server-config.wsdd"));
+ Document doc = db.parse(new
File("config/client-listener-config.wsdd"));
WSDDDocument wsdddoc = new WSDDDocument(doc);
WSDDDeployment wsdddep = wsdddoc.getDeployment();
sas.setMyConfig(wsdddep);
- //Set the port 9090 to the SimpleAxisServer.
+ //Set the port 9090 to the SimpleAxisServer.
+ sas.getMyConfig().configureEngine(new AxisServer());
+ SOAPService service = sas.getMyConfig().getService(new
QName("RMService"));
+ RMProvider rmP = (RMProvider) service.getPivotHandler();
+ rmP.setClient(true);
+ System.out.println(" Service " + service.getPivotHandler());
- sas.getMyConfig().getServiceByNamespaceURI("RMService");
-
-
- sas.setServerSocket(new ServerSocket(9090));
+ sas.setServerSocket(new
ServerSocket(PropertyLoader.getClientSideListenerPort()));
Thread serverThread = new Thread(sas);
serverThread.start();
} catch (Exception e) {
e.printStackTrace();
}
- */
+
+
+
+
+/*
sas = new SimpleAxisServer();
@@ -170,7 +200,7 @@
} catch (IOException ex) {
ex.printStackTrace();
}
-
+ */
}
}
\ No newline at end of file
1.2 +17 -7 ws-fx/sandesha/src/org/apache/sandesha/RMTransport.java
Index: RMTransport.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/RMTransport.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RMTransport.java 18 Feb 2005 13:00:33 -0000 1.1
+++ RMTransport.java 24 Feb 2005 13:51:13 -0000 1.2
@@ -1,11 +1,21 @@
-package org.apache.sandesha;
+/*
+* Copyright 1999-2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License"); you may not
+* use this file except in compliance with the License. You may obtain a copy
of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations
under
+* the License.
+*
+*/
-/**
- * @author JEkanayake
- *
- * To change the template for this generated type comment go to
- * Window>Preferences>Java>Code Generation>Code and Comments
- */
+package org.apache.sandesha;
import org.apache.axis.AxisEngine;
import org.apache.axis.AxisFault;
1.10 +6 -3
ws-fx/sandesha/src/org/apache/sandesha/client/ClientPropertyValidator.java
Index: ClientPropertyValidator.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientPropertyValidator.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- ClientPropertyValidator.java 22 Feb 2005 13:49:03 -0000 1.9
+++ ClientPropertyValidator.java 24 Feb 2005 13:51:13 -0000 1.10
@@ -20,6 +20,7 @@
import org.apache.axis.client.Call;
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
+import org.apache.util.PropertyLoader;
import javax.xml.namespace.QName;
import java.net.InetAddress;
@@ -45,7 +46,10 @@
String replyTo = getReplyTo(call);
try {
- sourceURL = getSourceURL(call);
+ if(from==null)
+ sourceURL = getSourceURL(call);
+ else
+ sourceURL=from;
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
throw new AxisFault(e.getMessage());
@@ -105,7 +109,6 @@
private static boolean getSync(Call call) {
Boolean synchronous = (Boolean) call.getProperty("sync");
-
if (synchronous != null) {
return synchronous.booleanValue();
} else
@@ -131,7 +134,7 @@
String sourceURI = null;
InetAddress addr = InetAddress.getLocalHost();
- sourceURI = "http://" + addr.getHostAddress() + ":" +
Constants.SOURCE_ADDRESS_PORT
+ sourceURI = "http://" + addr.getHostAddress() + ":" +
PropertyLoader.getClientSideListenerPort()
+ "/axis/services/RMService";
//sourceURI="http://192.248.18.51:8080/axis/services/RMService";
1.22 +11 -3
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
Index: ClientStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- ClientStorageManager.java 21 Feb 2005 12:08:21 -0000 1.21
+++ ClientStorageManager.java 24 Feb 2005 13:51:13 -0000 1.22
@@ -28,6 +28,7 @@
import org.apache.sandesha.ws.rm.RMHeaders;
import java.util.Map;
+import java.util.Iterator;
/**
* @author Jaliya
@@ -141,7 +142,9 @@
msg = accessor.getNextOutgoingMsgContextToSend();
if (msg == null) {
- msg = accessor.getNextLowPriorityMessageContextToSend(); //
checks whether all the request messages hv been acked
+ msg = accessor.getNextLowPriorityMessageContextToSend();
+
+ // checks whether all the request messages hv been acked
}
return msg;
}
@@ -271,8 +274,13 @@
* @see org.apache.sandesha.IStorageManager#isAllSequenceComplete()
*/
public boolean isAllSequenceComplete() {
- // TODO Auto-generated method stub
- return false;
+ boolean result=false;
+ Iterator ite=accessor.getAllOutgoingSequences();
+ while(ite.hasNext()){
+ result=isAckComplete((String)ite.next());
+ }
+
+ return result;
}
/* (non-Javadoc)
1.26 +19 -194
ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java
Index: RMSender.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -r1.25 -r1.26
--- RMSender.java 21 Feb 2005 12:08:21 -0000 1.25
+++ RMSender.java 24 Feb 2005 13:51:13 -0000 1.26
@@ -17,58 +17,38 @@
package org.apache.sandesha.client;
import org.apache.axis.AxisFault;
-import org.apache.axis.Message;
import org.apache.axis.MessageContext;
-import org.apache.axis.client.Call;
-import org.apache.axis.components.uuid.UUIDGen;
-import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.handlers.BasicHandler;
-import org.apache.axis.message.SOAPEnvelope;
-import org.apache.axis.message.SOAPHeaderElement;
-import org.apache.axis.message.addressing.*;
-import org.apache.axis.transport.http.SimpleAxisServer;
-import org.apache.axis.types.URI;
-import org.apache.axis.types.URI.MalformedURIException;
import org.apache.sandesha.Constants;
-import org.apache.sandesha.*;
-import org.apache.sandesha.server.Sender;
-
-import java.util.Iterator;
-import java.util.Vector;
+import org.apache.sandesha.IStorageManager;
+import org.apache.sandesha.RMMessageContext;
+import org.apache.util.RMMessageCreator;
public class RMSender extends BasicHandler {
- private static boolean senderStarted = false;
- private static boolean serverStarted = false;
private IStorageManager storageManager;
- private static SimpleAxisServer sas = null;
- private static Sender sender = null;
- private static Thread senderThread = null;
public void invoke(MessageContext msgContext) throws AxisFault {
//TODO This we need to check.
- MessageContext newMsgContext = cloneMsgContext(msgContext);
- RMMessageContext requestMesssageContext =
getRMMessageContext(newMsgContext);
- //Initialize the storage manager. We are in the client side
- //So initialize the client Storage Manager.
+ //Initialize the storage manager. We are in the client side So
initialize the client Storage Manager.
storageManager = new ClientStorageManager();
- RMInitiator.initClient(requestMesssageContext.getSync());
+ // RMInitiator.initClient(requestMesssageContext.getSync());
try {
+ RMMessageContext requestMesssageContext =
RMMessageCreator.createServiceRequestMsg(msgContext);
String sequenceID = requestMesssageContext.getSequenceID();
- AddressingHeaders addrHeaders =
getAddressingHeaders(requestMesssageContext);
- long msgNo=requestMesssageContext.getMsgNumber();
-
+ //AddressingHeaders addrHeaders =
getAddressingHeaders(requestMesssageContext);
+ long msgNo = requestMesssageContext.getMsgNumber();
+
if (msgNo == 1) {
- requestMesssageContext =
processFirstMessage(requestMesssageContext, addrHeaders,
- requestMesssageContext.getSync());
+ requestMesssageContext =
processFirstRequestMessage(requestMesssageContext,
requestMesssageContext.getSync());
} else {
- requestMesssageContext =
processNonFirstMessage(requestMesssageContext, addrHeaders,
requestMesssageContext.getSync());
+ requestMesssageContext =
processRequestMessage(requestMesssageContext);
}
if (requestMesssageContext.isLastMessage()) {
-
storageManager.insertTerminateSeqMessage(getTerminateSeqMessage(requestMesssageContext));
+
storageManager.insertTerminateSeqMessage(RMMessageCreator.createTerminateSeqMsg(requestMesssageContext));
}
if (requestMesssageContext.isHasResponse() &&
!requestMesssageContext.getSync()) {
@@ -77,180 +57,38 @@
//TODO Need to check for errors in the queue.
//If the queue has an error message, then need to report
it
// to client.
-
- responseMessageContext =
checkTheQueueForResponse(sequenceID,
- requestMesssageContext.getMessageID());
+ responseMessageContext =
checkTheQueueForResponse(sequenceID, requestMesssageContext.getMessageID());
Thread.sleep(Constants.CLIENT_RESPONSE_CHECKING_INTERVAL);
}
-
-
- Vector headers =
responseMessageContext.getMsgContext().getRequestMessage().getSOAPEnvelope().getHeaders();
- Iterator ite = headers.iterator();
-
- while (ite.hasNext()) {
- SOAPHeaderElement headerElement = (SOAPHeaderElement)
ite.next();
- headerElement.setMustUnderstand(false);
- headerElement.setProcessed(true);
- }
-
-
msgContext.setResponseMessage(responseMessageContext.getMsgContext()
- .getRequestMessage());
} else {
boolean gotAck = false;
while (!gotAck) {
- gotAck =
checkTheQueueForAck(requestMesssageContext.getSequenceID(),
+ gotAck =
checkTheQueueForAck(requestMesssageContext.getSequenceID(),
requestMesssageContext.getMessageID());
Thread.sleep(Constants.CLIENT_RESPONSE_CHECKING_INTERVAL);
}
-
msgContext.setResponseMessage(null);
-
}
-
} catch (Exception ex) {
ex.printStackTrace();
}
-
- }
-
- /**
- * @return
- */
- private RMMessageContext getTerminateSeqMessage(RMMessageContext
rmMessageContext) {
- RMMessageContext terSeqRMMsgContext = new RMMessageContext();
- MessageContext terSeqMsgContext = new
MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
- terSeqRMMsgContext.setSequenceID(rmMessageContext.getSequenceID());
-
terSeqRMMsgContext.setAddressingHeaders(rmMessageContext.getAddressingHeaders());
-
terSeqRMMsgContext.setOutGoingAddress(rmMessageContext.getOutGoingAddress());
- terSeqRMMsgContext.setMsgContext(terSeqMsgContext);
-
terSeqRMMsgContext.setMessageType(Constants.MSG_TYPE_TERMINATE_SEQUENCE);
- // TODO Auto-generated method stub
- return terSeqRMMsgContext;
- }
-
- /**
- * @param msgContext
- * @return @throws
- * AxisFault
- */
- private MessageContext cloneMsgContext(MessageContext msgContext) throws
AxisFault {
- MessageContext clone = new
MessageContext(msgContext.getAxisEngine());
- String str = msgContext.getRequestMessage().getSOAPPartAsString();
- Message msg = new Message(str);
- clone.setRequestMessage(msg);
- RMMessageContext.copyMessageContext(msgContext, clone);
- return clone;
}
- private RMMessageContext getCreateSeqRMContext(RMMessageContext
rmMsgContext,
- AddressingHeaders
addrHeaders, String uuid) throws MalformedURIException {
- //String toAddress = (String)
- // msgContext.getProperty(MessageContext.TRANS_URL);
- MessageContext msgContext = rmMsgContext.getMsgContext();
- String toAddress = rmMsgContext.getOutGoingAddress();
-
- //Set the action
- Action action = new Action(new
URI(Constants.ACTION_CREATE_SEQUENCE));
- addrHeaders.setAction(action);
-
- //Create the RMMessageContext to hold the create Sequence Request.
- RMMessageContext createSeqRMMsgContext = new RMMessageContext();
- createSeqRMMsgContext.setAddressingHeaders(addrHeaders);
- createSeqRMMsgContext.setSync(rmMsgContext.getSync());
-
- //Set the outgoing address these need to be corrected.
- createSeqRMMsgContext.setOutGoingAddress(toAddress);
- SOAPEnvelope resEnvelope =
EnvelopeCreator.createCreateSequenceEnvelope(uuid,
- createSeqRMMsgContext, Constants.CLIENT);
- MessageContext createSeqMsgContext = new
MessageContext(msgContext.getAxisEngine());
-
- //This should be a clone operation.
- RMMessageContext.copyMessageContext(msgContext, createSeqMsgContext);
- createSeqMsgContext.setRequestMessage(new Message(resEnvelope));
- createSeqRMMsgContext.setMsgContext(createSeqMsgContext);
-
- //Set the message type
-
createSeqRMMsgContext.setMessageType(Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST);
- return createSeqRMMsgContext;
- }
-
-
- private AddressingHeaders getAddressingHeaders(RMMessageContext
rmMsgContext)
- throws MalformedURIException {
-
- // MessageContext msgContext= rmMsgContext.getMsgContext();
- //Variable to hold the status of the asynchronous or synchronous
state.
- boolean sync = rmMsgContext.getSync();
- AddressingHeaders addrHeaders = new AddressingHeaders();
- From from = null;
- ReplyTo replyTo = null;
- String fromURL = rmMsgContext.getFrom();
- String replyToURL = rmMsgContext.getReplyTo();
-
- //Need to use the anonymous_URI if the client is synchronous.
- if (!sync) {
- from = new From(new Address(rmMsgContext.getSourceURL()));
- addrHeaders.setFrom(from);
-
- if (replyToURL != null) {
- replyTo = new ReplyTo(new Address(replyToURL));
- addrHeaders.setReplyTo(replyTo);
- } else {
- replyTo = new ReplyTo(new
Address(rmMsgContext.getSourceURL()));
- addrHeaders.setReplyTo(replyTo);
- }
-
- } else {
- from = new From(new Address(Constants.ANONYMOUS_URI));
- addrHeaders.setFrom(from);
- if (rmMsgContext.isHasResponse()) {
- replyTo = new ReplyTo(new Address(replyToURL));
- addrHeaders.setReplyTo(replyTo);
- }
-
- }
- //Set the target endpoint URL
- To to = new To(new Address(rmMsgContext.getOutGoingAddress()));
- addrHeaders.setTo(to);
- return addrHeaders;
- }
-
- private RMMessageContext processFirstMessage(RMMessageContext
reqRMMsgContext,
- AddressingHeaders
addrHeaders, boolean sync) throws Exception {
- long nextMsgNumber = reqRMMsgContext.getMsgNumber();
- UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- //Set the tempUUID
- String tempUUID = uuidGen.nextUUID();
- RMMessageContext createSeqRMMsgContext =
getCreateSeqRMContext(reqRMMsgContext, addrHeaders, tempUUID);
- createSeqRMMsgContext.setMessageID(Constants.UUID + tempUUID);
- //Create a sequence first.
-
+ private RMMessageContext processFirstRequestMessage(RMMessageContext
reqRMMsgContext, boolean sync) throws Exception {
+ RMMessageContext createSeqRMMsgContext =
RMMessageCreator.createCreateSeqMsg(reqRMMsgContext);
storageManager.addOutgoingSequence(reqRMMsgContext.getSequenceID());
-
storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(),Constants.UUID
+ tempUUID);
+
storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(),
createSeqRMMsgContext.getMessageID());
//Set the processing state to the RMMessageContext
createSeqRMMsgContext.setSync(sync);
storageManager.addCreateSequenceRequest(createSeqRMMsgContext);
- reqRMMsgContext.setAddressingHeaders(addrHeaders);
- reqRMMsgContext.setOutGoingAddress(addrHeaders.getTo().toString());
- reqRMMsgContext.setMsgNumber(nextMsgNumber);
- reqRMMsgContext.setMessageType(Constants.MSG_TYPE_SERVICE_REQUEST);
- reqRMMsgContext.setMessageID(Constants.UUID+ uuidGen.nextUUID());
- storageManager.insertOutgoingMessage(reqRMMsgContext);
+ processRequestMessage(reqRMMsgContext);
return reqRMMsgContext;
}
- private RMMessageContext processNonFirstMessage(RMMessageContext
reqRMMsgContext,
- AddressingHeaders
addrHeaders, boolean sync) throws Exception {
- UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- reqRMMsgContext.setAddressingHeaders(addrHeaders);
- reqRMMsgContext.setOutGoingAddress(addrHeaders.getTo().toString());
- reqRMMsgContext.setMessageType(Constants.MSG_TYPE_SERVICE_REQUEST);
- reqRMMsgContext.setMessageID(Constants.UUID + uuidGen.nextUUID());
- //Set the processing state of the RMMessageContext
- reqRMMsgContext.setSync(sync);
+ private RMMessageContext processRequestMessage(RMMessageContext
reqRMMsgContext) throws Exception {
storageManager.insertOutgoingMessage(reqRMMsgContext);
return reqRMMsgContext;
}
@@ -265,19 +103,6 @@
}
- private RMMessageContext getRMMessageContext(MessageContext msgContext)
throws AxisFault {
- RMMessageContext requestMesssageContext = new RMMessageContext();
- //Get the message information from the client.
- Call call = (Call) msgContext.getProperty(MessageContext.CALL);
- //If the property specified by the client is not valid
- //an AxisFault will be sent at this point.
- requestMesssageContext = ClientPropertyValidator.validate(call);
- requestMesssageContext.setOutGoingAddress((String)
msgContext.getProperty(MessageContext.TRANS_URL));
- requestMesssageContext.setMsgContext(msgContext);
- return requestMesssageContext;
-
- }
-
}
1.2 +1 -0
ws-fx/sandesha/src/org/apache/sandesha/client/RMService.java
Index: RMService.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/RMService.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RMService.java 16 Feb 2005 04:46:42 -0000 1.1
+++ RMService.java 24 Feb 2005 13:51:13 -0000 1.2
@@ -20,4 +20,5 @@
public class RMService {
//The service is used as a dummy entity in
//the client side only.
+
}
1.23 +133 -237 ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java
Index: Sender.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -r1.22 -r1.23
--- Sender.java 20 Feb 2005 08:49:45 -0000 1.22
+++ Sender.java 24 Feb 2005 13:51:13 -0000 1.23
@@ -40,6 +40,8 @@
* @author JEkanayake
*/
public class Sender implements Runnable {
+ public boolean running = true;
+
private IStorageManager storageManager;
public Sender() {
@@ -50,9 +52,17 @@
this.storageManager = storageManager;
}
+ public boolean isRunning() {
+ return running;
+ }
+
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
public void run() {
- while (true) {
+ while (running) {
long startTime = System.currentTimeMillis();
boolean hasMessages = true;
//Take a messge from the storage and check whether we can send
it.
@@ -62,94 +72,33 @@
hasMessages = false;
} else {
//Send the message.
- switch (rmMessageContext.getMessageType()) {
- case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
- {
- System.out.println("INFO: SENDING CREATE
SEQUENCE REQUEST ....");
- if
((rmMessageContext.getReTransmissionCount() <=
Constants.MAXIMUM_RETRANSMISSION_COUNT)
- && ((System.currentTimeMillis() -
rmMessageContext
- .getLastPrecessedTime()) >
Constants.RETRANSMISSION_INTERVAL)) {
-
sendCreateSequenceRequest(rmMessageContext);
- } else {
- //TODO REPORT ERROR
- }
- break;
- }
- case Constants.MSG_TYPE_CREATE_SEQUENCE_RESPONSE:
- {
- System.out.println("INFO: SENDING CREATE
SEQUENCE RESPONSE ....");
- if
((rmMessageContext.getReTransmissionCount() <=
Constants.MAXIMUM_RETRANSMISSION_COUNT)
- && ((System.currentTimeMillis() -
rmMessageContext
- .getLastPrecessedTime()) >
Constants.RETRANSMISSION_INTERVAL)) {
-
sendCreateSequenceResponse(rmMessageContext);
- } else {
- //TODO REPORT ERROR
- }
- break;
- }
- case Constants.MSG_TYPE_TERMINATE_SEQUENCE:
- {
- System.out.println("INFO: SENDING TERMINATE
SEQUENCE REQUEST ....");
- if
((rmMessageContext.getReTransmissionCount() <=
Constants.MAXIMUM_RETRANSMISSION_COUNT)
- && ((System.currentTimeMillis() -
rmMessageContext
- .getLastPrecessedTime()) >
Constants.RETRANSMISSION_INTERVAL)) {
-
sendTerminateSequenceRequest(rmMessageContext);
- } else {
- //TODO REPORT ERROR
- }
- break;
- }
- case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
- {
- System.out.println("INFO: SENDING
ACKNOWLEDGEMENT ....\n");
- if
((rmMessageContext.getReTransmissionCount() <=
Constants.MAXIMUM_RETRANSMISSION_COUNT)
- && ((System.currentTimeMillis() -
rmMessageContext
- .getLastPrecessedTime()) >
Constants.RETRANSMISSION_INTERVAL)) {
- sendAcknowldgement(rmMessageContext);
- } else {
- //TODO REPORT ERROR
- }
- break;
- }
- case Constants.MSG_TYPE_SERVICE_REQUEST:
- {
- System.out.println("INFO: SENDING REQUEST
MESSAGE ....\n");
- if
((rmMessageContext.getReTransmissionCount() <=
Constants.MAXIMUM_RETRANSMISSION_COUNT)
- && ((System.currentTimeMillis() -
rmMessageContext
- .getLastPrecessedTime()) >
Constants.RETRANSMISSION_INTERVAL)) {
- sendServiceRequest(rmMessageContext);
- } else { //TODO REPORT ERROR
- }
- break;
- }
- case Constants.MSG_TYPE_SERVICE_RESPONSE:
- {
- System.out.println("INFO: SENDING RESPONSE
MESSAGE ....\n");
- if
((rmMessageContext.getReTransmissionCount() <=
Constants.MAXIMUM_RETRANSMISSION_COUNT)
- && ((System.currentTimeMillis() -
rmMessageContext
- .getLastPrecessedTime()) >
Constants.RETRANSMISSION_INTERVAL)) {
- sendServiceResponse(rmMessageContext);
- } else {
- //TODO REPORT ERROR
- }
- break;
- }
+ if ((rmMessageContext.getReTransmissionCount() <=
Constants.MAXIMUM_RETRANSMISSION_COUNT)
+ && ((System.currentTimeMillis() -
rmMessageContext
+ .getLastPrecessedTime()) >
Constants.RETRANSMISSION_INTERVAL)) {
+ try {
+ sendMessage(rmMessageContext);
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ } catch (SOAPException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } else {
+ //TODO REPORT ERROR
}
-
}
} while (hasMessages);
long timeGap = System.currentTimeMillis() - startTime;
if ((timeGap - Constants.SENDER_SLEEP_TIME) <= 0) {
try {
-
System.out.print("|"); //Sender THREAD IS SLEEPING
// -----------XXX----------\n");
Thread.sleep(Constants.SENDER_SLEEP_TIME - timeGap);
} catch (Exception ex) {
ex.printStackTrace();
}
-
}
}
}
@@ -157,120 +106,60 @@
/**
* @param rmMessageContext
*/
- private void sendTerminateSequenceRequest(RMMessageContext
rmMessageContext) {
+ private void sendTerminateSequenceRequest(RMMessageContext
rmMessageContext) throws Exception {
SOAPEnvelope terSeqEnv =
EnvelopeCreator.createTerminatSeqMessage(rmMessageContext);
Message terSeqMsg = new Message(terSeqEnv);
rmMessageContext.getMsgContext().setRequestMessage(terSeqMsg);
Call call;
- try {
-
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
- rmMessageContext
-
.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
- call = prepareCall(rmMessageContext);
- call.invoke();
- if (call.getResponseMessage() != null) {
- RMHeaders rmHeaders = new RMHeaders();
-
rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope());
- rmMessageContext.setRMHeaders(rmHeaders);
- AddressingHeaders addrHeaders = new
AddressingHeaders(call.getResponseMessage()
- .getSOAPEnvelope());
- rmMessageContext.setAddressingHeaders(addrHeaders);
-
rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage());
- IRMMessageProcessor messagePrcessor =
RMMessageProcessorIdentifier
- .getMessageProcessor(rmMessageContext,
storageManager);
- messagePrcessor.processMessage(rmMessageContext);
- }
- } catch (AxisFault e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (SOAPException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
+ rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
+
rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount()
+ 1);
+ call = prepareCall(rmMessageContext);
+ call.invoke();
+ processResponseMessage(call, rmMessageContext);
}
- private void sendServiceResponse(RMMessageContext rmMessageContext) {
+ private void sendServiceResponse(RMMessageContext rmMessageContext)
throws Exception {
SOAPEnvelope responseEnvelope = null;
responseEnvelope =
EnvelopeCreator.createServiceResponseEnvelope(rmMessageContext);
- //org.apache.axis.MessageContext resMsgCtx=new
org.apache.axis.MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
rmMessageContext.getMsgContext().setRequestMessage(new
Message(responseEnvelope));
rmMessageContext.getMsgContext().setResponseMessage(new
Message(responseEnvelope));
- //resMsgCtx.setRequestMessage(new Message(responseEnvelope));
- try {
- Service service = new Service();
- Call call = (Call) service.createCall();
-
//call.setTargetEndpointAddress(rmMessageContext.getOutGoingAddress());
-
call.setTargetEndpointAddress(rmMessageContext.getAddressingHeaders().getReplyTo()
- .getAddress().toString());
- //NOTE: WE USE THE REQUEST MESSAGE TO SEND THE RESPONSE.
- String soapMsg =
rmMessageContext.getMsgContext().getRequestMessage().getSOAPPartAsString();
- call.setRequestMessage(new Message(soapMsg));
-
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
- rmMessageContext
-
.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
- //We are not expecting the ack over the
- // same HTTP connection.
-
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),rmMessageContext.getMsgNumber());
-
- call.invoke();
-
//System.out.println(call.getResponseMessage().getSOAPPartAsString());
-
-
- } catch (ServiceException e1) {
- System.err.println("ERROR: SENDING RESPONSE MESSAGE ....");
- e1.printStackTrace();
- } catch (AxisFault af) {
- af.printStackTrace();
- }
+ Service service = new Service();
+ Call call = (Call) service.createCall();
+
call.setTargetEndpointAddress(rmMessageContext.getAddressingHeaders().getReplyTo().getAddress().toString());
+ //NOTE: WE USE THE REQUEST MESSAGE TO SEND THE RESPONSE.
+ String soapMsg =
rmMessageContext.getMsgContext().getRequestMessage().getSOAPPartAsString();
+ call.setRequestMessage(new Message(soapMsg));
+
+ rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
+
rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount()
+ 1);
+ //We are not expecting the ack over the same connection
+ storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
rmMessageContext.getMsgNumber());
+ call.invoke();
}
- private void sendCreateSequenceRequest(RMMessageContext
rmMessageContext) {
+ private void sendCreateSequenceRequest(RMMessageContext
rmMessageContext) throws Exception {
if (rmMessageContext.getMsgContext().getRequestMessage() == null) {
//The code should not come to this point.
System.err.println("ERROR: NULL REQUEST MESSAGE");
} else {
Call call;
- try {
-
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
- rmMessageContext
-
.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
- call = prepareCall(rmMessageContext);
- call.invoke();
- if (call.getResponseMessage() != null) {
- RMHeaders rmHeaders = new RMHeaders();
-
rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope());
- rmMessageContext.setRMHeaders(rmHeaders);
- AddressingHeaders addrHeaders = new
AddressingHeaders(call.getResponseMessage()
- .getSOAPEnvelope());
- rmMessageContext.setAddressingHeaders(addrHeaders);
-
rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage());
- IRMMessageProcessor messagePrcessor =
RMMessageProcessorIdentifier
- .getMessageProcessor(rmMessageContext,
storageManager);
- messagePrcessor.processMessage(rmMessageContext);
- }
- } catch (AxisFault e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (SOAPException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+
+
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
+ rmMessageContext
+
.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
+ call = prepareCall(rmMessageContext);
+ call.invoke();
+
+ processResponseMessage(call, rmMessageContext);
}
}
- private void sendCreateSequenceResponse(RMMessageContext
rmMessageContext) {
+ private void sendCreateSequenceResponse(RMMessageContext
rmMessageContext) throws Exception {
//Here there is no concept of sending synchronous
CreateSequenceRequest
// response.
//i.e. we are not expecting any response for this.
@@ -278,23 +167,16 @@
//The code should not come to this point.
System.err.println("ERROR: NULL REQUEST MESSAGE");
} else {
- try {
-
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
- rmMessageContext
-
.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
- Call call = prepareCall(rmMessageContext);
-
call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
- call.invoke();
-
- } catch (ServiceException e) {
- e.printStackTrace();
- } catch (AxisFault e) {
- e.printStackTrace();
- }
+
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
+ rmMessageContext
+
.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
+ Call call = prepareCall(rmMessageContext);
+
call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
+ call.invoke();
}
}
- private void sendAcknowldgement(RMMessageContext rmMessageContext) {
+ private void sendAcknowldgement(RMMessageContext rmMessageContext)
throws Exception {
// Here there is no concept of sending synchronous
CreateSequenceRequest
// resposne.
if (rmMessageContext.getMsgContext().getResponseMessage() == null) {
@@ -302,19 +184,12 @@
} else {
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount()
+ 1);
- try {
- Call call = prepareCall(rmMessageContext);
-
call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
- call.invoke();
- if (call.getResponseMessage() != null) {
- System.out.println("RESPONSE MESSAGE IS NOT NULL");
-
System.out.println(call.getResponseMessage().getSOAPEnvelope().toString());
- }
- } catch (ServiceException e1) {
- System.err.println("ERROR: SERVICE EXCEPTION WHEN SENDING
RESPONSE");
- e1.printStackTrace();
- } catch (AxisFault e) {
- e.printStackTrace();
+ Call call = prepareCall(rmMessageContext);
+
call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
+ call.invoke();
+ if (call.getResponseMessage() != null) {
+ System.out.println("RESPONSE MESSAGE IS NOT NULL");
+
System.out.println(call.getResponseMessage().getSOAPEnvelope().toString());
}
}
}
@@ -334,8 +209,6 @@
call.setClientHandlers(null, sc);
-
//call.setRequestMessage(rmMessageContext.getMsgContext().getRequestMessage());
-
if (rmMessageContext.getMsgContext().getRequestMessage() != null) {
String soapMsg =
rmMessageContext.getMsgContext().getRequestMessage().getSOAPPartAsString();
call.setRequestMessage(new Message(soapMsg));
@@ -343,7 +216,7 @@
return call;
}
- private void sendServiceRequest(RMMessageContext rmMessageContext) {
+ private void sendServiceRequest(RMMessageContext rmMessageContext)
throws Exception {
if (rmMessageContext.getMsgContext().getRequestMessage() == null) {
System.err.println("ERROR: NULL REQUEST MESSAGE");
} else {
@@ -355,51 +228,18 @@
rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount()
+ 1);
if (rmMessageContext.getSync()) {
Call call;
- try {
- call = prepareCall(rmMessageContext);
+ call = prepareCall(rmMessageContext);
+ //CHECK THIS
+
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
rmMessageContext.getMsgNumber());
+ call.invoke();
+ processResponseMessage(call, rmMessageContext);
- //CHECK THIS
-
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),rmMessageContext.getMsgNumber());
- call.invoke();
- if (call.getResponseMessage() != null) {
- System.out.println("RESPONSE MESSAGE IS NOT NULL");
- RMHeaders rmHeaders = new RMHeaders();
-
rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope());
- rmMessageContext.setRMHeaders(rmHeaders);
- AddressingHeaders addrHeaders = new
AddressingHeaders(call
- .getResponseMessage().getSOAPEnvelope());
- rmMessageContext.setAddressingHeaders(addrHeaders);
-
rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage());
- IRMMessageProcessor messageProcessor =
RMMessageProcessorIdentifier
- .getMessageProcessor(rmMessageContext,
storageManager);
- messageProcessor.processMessage(rmMessageContext);
- System.out.println(messageProcessor);
- }
- } catch (AxisFault e) {
- // TODO Auto-generated catch block
- System.err.println("ERROR: SENDING REQUEST ....");
- e.printStackTrace();
- } catch (SOAPException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
} else {
- try {
- Call call = prepareCall(rmMessageContext);
- //Send the createSequnceRequest Asynchronously.
-
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),rmMessageContext.getMsgNumber());
- call.invoke();
- } catch (AxisFault e) {
- System.err.println("ERROR: SENDING REQUEST ....");
- e.printStackTrace();
- } catch (ServiceException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ Call call = prepareCall(rmMessageContext);
+ //Send the createSequnceRequest Asynchronously.
+
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
rmMessageContext.getMsgNumber());
+ call.invoke();
}
@@ -407,4 +247,60 @@
}
+ private void sendMessage(RMMessageContext rmMessageContext) throws
Exception {
+ switch (rmMessageContext.getMessageType()) {
+ case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
+ {
+ System.out.println("INFO: SENDING CREATE SEQUENCE
REQUEST ....");
+ sendCreateSequenceRequest(rmMessageContext);
+ break;
+ }
+ case Constants.MSG_TYPE_CREATE_SEQUENCE_RESPONSE:
+ {
+ System.out.println("INFO: SENDING CREATE SEQUENCE
RESPONSE ....");
+ sendCreateSequenceResponse(rmMessageContext);
+ break;
+ }
+ case Constants.MSG_TYPE_TERMINATE_SEQUENCE:
+ {
+ System.out.println("INFO: SENDING TERMINATE SEQUENCE
REQUEST ....");
+ sendTerminateSequenceRequest(rmMessageContext);
+ break;
+ }
+ case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
+ {
+ System.out.println("INFO: SENDING ACKNOWLEDGEMENT
....\n");
+ sendAcknowldgement(rmMessageContext);
+ break;
+ }
+ case Constants.MSG_TYPE_SERVICE_REQUEST:
+ {
+ System.out.println("INFO: SENDING REQUEST MESSAGE
....\n");
+ sendServiceRequest(rmMessageContext);
+ break;
+ }
+ case Constants.MSG_TYPE_SERVICE_RESPONSE:
+ {
+ System.out.println("INFO: SENDING RESPONSE MESSAGE
....\n");
+ sendServiceResponse(rmMessageContext);
+ break;
+ }
+ }
+ }
+
+ private void processResponseMessage(Call call, RMMessageContext
rmMessageContext) throws Exception {
+ if (call.getResponseMessage() != null) {
+ RMHeaders rmHeaders = new RMHeaders();
+
rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope());
+ rmMessageContext.setRMHeaders(rmHeaders);
+ AddressingHeaders addrHeaders = new
AddressingHeaders(call.getResponseMessage()
+ .getSOAPEnvelope());
+ rmMessageContext.setAddressingHeaders(addrHeaders);
+
rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage());
+ IRMMessageProcessor messagePrcessor =
RMMessageProcessorIdentifier
+ .getMessageProcessor(rmMessageContext, storageManager);
+ messagePrcessor.processMessage(rmMessageContext);
+ }
+ }
+
}
\ No newline at end of file
1.5 +6 -3
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java
Index: ISandeshaDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ISandeshaDAO.java 20 Feb 2005 17:42:38 -0000 1.4
+++ ISandeshaDAO.java 24 Feb 2005 13:51:14 -0000 1.5
@@ -20,6 +20,7 @@
import org.apache.sandesha.storage.queue.SandeshaQueue;
import java.util.Set;
+import java.util.Iterator;
/**
* @author Chamikara Jayalath
@@ -108,7 +109,9 @@
//Two methods below will be used to get this key from the actual
sequenceid.
public String getKeyFromIncomingSequenceId(String incomingSeqID);
- public String getKeyFromOutgoingSequenceId(String outgoingSeqID);
-
-
+ public String getKeyFromOutgoingSequenceId(String outgoingSeqID);
+
+ public Iterator getAllOutgoingSequences();
+
+
}
\ No newline at end of file
1.5 +5 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java
Index: SandeshaDatabaseDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SandeshaDatabaseDAO.java 20 Feb 2005 17:42:38 -0000 1.4
+++ SandeshaDatabaseDAO.java 24 Feb 2005 13:51:14 -0000 1.5
@@ -22,6 +22,7 @@
import org.apache.sandesha.storage.queue.IncomingSequence;
import java.util.Set;
+import java.util.Iterator;
/**
* @author Chamikara Jayalath
@@ -386,4 +387,8 @@
// TODO Auto-generated method stub
return null;
}
+
+ public Iterator getAllOutgoingSequences() {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
}
\ No newline at end of file
1.6 +6 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
Index: SandeshaQueueDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SandeshaQueueDAO.java 20 Feb 2005 17:42:38 -0000 1.5
+++ SandeshaQueueDAO.java 24 Feb 2005 13:51:14 -0000 1.6
@@ -26,6 +26,7 @@
import java.util.Random;
import java.util.Set;
import java.util.Vector;
+import java.util.Iterator;
/**
* @author Chamikara Jayalath
@@ -443,4 +444,9 @@
SandeshaQueue sq = SandeshaQueue.getInstance();
return sq.getKeyFromOutgoingSequenceId(seqID);
}
+
+ public Iterator getAllOutgoingSequences() {
+ SandeshaQueue sq=SandeshaQueue.getInstance();
+ return sq.getAllOutgoingSequences();
+ }
}
1.6 +5 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
Index: SandeshaQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SandeshaQueue.java 20 Feb 2005 17:42:38 -0000 1.5
+++ SandeshaQueue.java 24 Feb 2005 13:51:14 -0000 1.6
@@ -65,6 +65,11 @@
/**
* This will not replace messages automatically.
*/
+
+ public Iterator getAllOutgoingSequences(){
+ return outgoingMap.keySet().iterator();
+ }
+
public boolean addMessageToIncomingSequence(String seqId, Long messageNo,
RMMessageContext msgCon) throws QueueException {
boolean successful = false;