jaliya 2005/01/10 04:19:03
Modified: sandesha/src/org/apache/sandesha/server
ServerStorageManager.java
Log:
Revision Changes Path
1.12 +18 -41
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- ServerStorageManager.java 24 Dec 2004 04:47:42 -0000 1.11
+++ ServerStorageManager.java 10 Jan 2005 12:19:02 -0000 1.12
@@ -42,6 +42,11 @@
private String tempSeqId = null; // used by getNextMessageToProcess();
+ IServerDAO accessor;
+
+ public ServerStorageManager(){
+ accessor =
ServerDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ }
/**
* A very important method. Makes life easy for the thread or thread pool
* that is using this. Every thread just have to create an instance of
@@ -52,9 +57,6 @@
*
*/
public RMMessageContext getNextMessageToProcess() {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
-
if (tempSeqId == null)
tempSeqId = accessor.getRandomSeqIdToProcess();
@@ -73,9 +75,7 @@
}
public void setAcknowledged(String seqID, long msgNumber) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
- //TODO decide this in implementing the ServerSender.
+ //TODO decide this in implementing the ServerSender.
accessor.moveOutgoingMessageToBin(seqID, new Long(msgNumber));
}
@@ -89,14 +89,10 @@
* created.
*/
public boolean isSequenceExist(String sequenceID) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
return accessor.isIncomingSequenceExists(sequenceID);
}
public boolean isResponseSequenceExist(String sequenceID) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
//return accessor.isIncomingSequenceExists(sequenceID);
return accessor.isOutgoingSequenceExists(sequenceID);
}
@@ -106,8 +102,6 @@
* sender will use this.
*/
public RMMessageContext getNextMessageToSend() {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
RMMessageContext msg;
msg = accessor.getNextPriorityMessageContextToSend();
@@ -135,8 +129,6 @@
* Will be used to add a new Sequence Hash to the In Queue.
*/
public void addSequence(String sequenceId) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
boolean result = accessor.addIncomingSequence(sequenceId);
if (!result)
@@ -148,10 +140,6 @@
* This will be used to send Acks.
*/
public Map getListOfMessageNumbers(String sequenceID) {
-
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
-
Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(sequenceID);
Iterator it = st.iterator();
@@ -182,8 +170,6 @@
}
public boolean isMessageExist(String sequenceID, long messageNumber) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
return accessor.isIncomingMessageExists(sequenceID, new Long(
messageNumber));
}
@@ -211,9 +197,6 @@
}
private void addPriorityMessage(RMMessageContext msg) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
-
accessor.addPriorityMessage(msg);
}
@@ -224,17 +207,12 @@
* java.lang.String)
*/
public void setTemporaryOutSequence(String sequenceId, String
outSequenceId) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
-
accessor.setOutSequence(sequenceId, outSequenceId);
accessor.setOutSequenceApproved(sequenceId, false);
}
public boolean setApprovedOutSequence(String oldOutsequenceId,
String newOutSequenceId) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
boolean done = false;
String sequenceID =
accessor.getSequenceOfOutSequence(oldOutsequenceId);
@@ -252,8 +230,6 @@
}
public long getNextMessageNumber(String sequenceID) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
long l = accessor.getNextOutgoingMessageNumber(sequenceID);
return l;
}
@@ -264,9 +240,7 @@
}
public void insertOutgoingMessage(RMMessageContext msg) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
- String sequenceId = msg.getSequenceID();
+ String sequenceId = msg.getSequenceID();
boolean exists = accessor.isOutgoingSequenceExists(sequenceId);
if (!exists)
@@ -277,9 +251,6 @@
}
public void insertIncomingMessage(RMMessageContext rmMessageContext) {
- IServerDAO accessor = ServerDAOFactory
- .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
-
//No need to use this property
//RMHeaders rmHeaders =
// (RMHeaders) rmMessageContext.getMsgContext().getProperty(
@@ -328,16 +299,15 @@
* @see
org.apache.sandesha.IStorageManager#isAckComplete(java.lang.String)
*/
public boolean isAckComplete(String sequenceID) {
- // TODO Auto-generated method stub
- return false;
+ boolean result = accessor.compareAcksWithSequence(sequenceID);
//For client
+ return result;
}
/* (non-Javadoc)
* @see
org.apache.sandesha.IStorageManager#insertTerminateSeqMessage(org.apache.sandesha.RMMessageContext)
*/
- public void insertTerminateSeqMessage(RMMessageContext
terminateSeqMessage) {
- // TODO Auto-generated method stub
-
+ public void insertTerminateSeqMessage(RMMessageContext
terminateSeqMessage) {
+ accessor.addPriorityMessage(terminateSeqMessage);
}
/* (non-Javadoc)
@@ -357,4 +327,11 @@
}
+ /* (non-Javadoc)
+ * @see
org.apache.sandesha.IStorageManager#terminateSequence(java.lang.String)
+ */
+ public void terminateSequence(String sequenceID) {
+ // TODO Auto-generated method stub
+
+ }
}