Author: amilas
Date: Wed May 13 06:50:43 2009
New Revision: 774237
URL: http://svn.apache.org/viewvc?rev=774237&view=rev
Log:
reformat the code
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?rev=774237&r1=774236&r2=774237&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
Wed May 13 06:50:43 2009
@@ -76,39 +76,40 @@
/**
* Prosesses incoming MakeConnection request messages.
* A message is selected by the set of SenderBeans that are waiting to
be sent.
- * This is processed using a SenderWorker.
+ * This is processed using a SenderWorker.
*/
public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction
transaction) throws AxisFault {
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("Enter:
MakeConnectionProcessor::processInMessage " +
rmMsgCtx.getSOAPEnvelope().getBody());
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("Enter:
MakeConnectionProcessor::processInMessage " +
rmMsgCtx.getSOAPEnvelope().getBody());
try {
-
+
MakeConnection makeConnection =
rmMsgCtx.getMakeConnection();
-
+
String address = makeConnection.getAddress();
Identifier identifier = makeConnection.getIdentifier();
-
+
// If there is no address or identifier - make the
MissingSelection Fault.
if (address == null && identifier == null)
FaultManager.makeMissingSelectionFault(rmMsgCtx);
-
+
if (makeConnection.getUnexpectedElement() != null)
FaultManager.makeUnsupportedSelectionFault(rmMsgCtx,
makeConnection.getUnexpectedElement());
- //some initial setup
+ //some initial setup
ConfigurationContext configurationContext =
rmMsgCtx.getConfigurationContext();
- StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configurationContext,
configurationContext.getAxisConfiguration());
SecurityManager secManager =
SandeshaUtil.getSecurityManager(configurationContext);
SecurityToken token =
secManager.getSecurityToken(rmMsgCtx.getMessageContext());
-
+
//we want to find valid sender beans
List<RMSequenceBean> possibleBeans = new
ArrayList<RMSequenceBean>();
int possibleBeanIndex = -10;
SenderBean findSenderBean = new SenderBean();
boolean secured = false;
- if(token!=null && identifier==null){
+ if (token != null && identifier == null) {
secured = true;
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("token found " + token);
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("token found " + token);
//this means we have to scope our search for
sender beans that belong to sequences that own the same token
String data =
secManager.getTokenRecoveryData(token);
//first look for RMS beans
@@ -117,200 +118,206 @@
finderRMS.setToEPR(address);
List<RMSBean> tempList2 =
storageManager.getRMSBeanMgr().find(finderRMS);
possibleBeans.addAll(tempList2);
-
+
//try looking for RMD beans too
RMDBean finderRMD = new RMDBean();
finderRMD.setSecurityTokenData(data);
finderRMD.setToAddress(address);
List<RMDBean> tempList =
storageManager.getRMDBeanMgr().find(finderRMD);
-
+
//combine these two into one list
possibleBeans.addAll(tempList);
-
+
int size = possibleBeans.size();
-
- if(size>0){
+
+ if (size > 0) {
//select one at random: TODO better
method?
- Random random = new Random ();
+ Random random = new Random();
possibleBeanIndex =
random.nextInt(size);
- RMSequenceBean selectedSequence =
(RMSequenceBean)possibleBeans.get(possibleBeanIndex);
+ RMSequenceBean selectedSequence =
(RMSequenceBean) possibleBeans.get(possibleBeanIndex);
findSenderBean.setSequenceID(selectedSequence.getSequenceID());
- if(LoggingControl.isAnyTracingEnabled()
&& log.isDebugEnabled()) log.debug("sequence selected " +
findSenderBean.getSequenceID());
- }
- else{
+ if
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("sequence selected "
+ findSenderBean.getSequenceID());
+ } else {
//we cannot match a RMD with the
correct security credentials so we cannot process this msg under RSP
- if(LoggingControl.isAnyTracingEnabled()
&& log.isDebugEnabled()) log.debug("Exit:
MakeConnectionProcessor::processInMessage : no RM sequence bean with security
credentials" );
- return false;
+ if
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit:
MakeConnectionProcessor::processInMessage : no RM sequence bean with security
credentials");
+ return false;
}
-
+
// Commit this transaction to clear up held
RMS/RMDBeans
- if (transaction != null &&
transaction.isActive())
- transaction.commit();
-
+ if (transaction != null &&
transaction.isActive())
+ transaction.commit();
+
// Get a new transaction
- transaction = storageManager.getTransaction();
- }
+ transaction = storageManager.getTransaction();
+ }
//lookup a sender bean
SenderBeanMgr senderBeanMgr =
storageManager.getSenderBeanMgr();
-
+
//selecting the set of SenderBeans that suit the given
criteria.
findSenderBean.setSend(true);
findSenderBean.setTransportAvailable(false);
-
- if (address!=null)
+
+ if (address != null)
findSenderBean.setToAddress(address);
- if (identifier!=null){
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("identifier set, this violates RSP " +
identifier);
+ if (identifier != null) {
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("identifier set, this
violates RSP " + identifier);
findSenderBean.setSequenceID(identifier.getIdentifier());
}
-
+
SenderBean senderBean = null;
boolean pending = false;
- while (true){
+ while (true) {
// Set the time to send field to be now
findSenderBean.setTimeToSend(System.currentTimeMillis());
-
+
//finding the beans that go with the criteria
of the passed SenderBean
//The reSend flag is ignored for this
selection, so there is no need to
//set it.
Collection<SenderBean> collection =
senderBeanMgr.find(findSenderBean);
-
+
//removing beans that does not pass the resend
test
- for (Iterator<SenderBean>
it=collection.iterator();it.hasNext();) {
+ for (Iterator<SenderBean> it =
collection.iterator(); it.hasNext();) {
SenderBean bean = (SenderBean)
it.next();
- if (!bean.isReSend() &&
bean.getSentCount()>0)
+ if (!bean.isReSend() &&
bean.getSentCount() > 0)
it.remove();
}
-
+
//selecting a bean to send RANDOMLY. TODO-
Should use a better mechanism.
int size = collection.size();
- int itemToPick=-1;
-
+ int itemToPick = -1;
+
pending = false;
- if (size>0) {
- Random random = new Random ();
+ if (size > 0) {
+ Random random = new Random();
itemToPick = random.nextInt(size);
}
-
- if (size>1)
+
+ if (size > 1)
pending = true; //there are more than
one message to be delivered using the makeConnection.
- //So
the MessagePending header should have value true;
-
+ //So the MessagePending header should have
value true;
+
Iterator<SenderBean> it = collection.iterator();
-
+
senderBean = null;
- for (int item=0;item<size;item++) {
- senderBean = (SenderBean) it.next();
- if (item==itemToPick)
+ for (int item = 0; item < size; item++) {
+ senderBean = (SenderBean) it.next();
+ if (item == itemToPick)
break;
}
-
- if (senderBean==null) {
+
+ if (senderBean == null) {
//If secured try another sequence
//Remove old one from the list and pick
another random one
- if(secured){
+ if (secured) {
possibleBeans.remove(possibleBeanIndex);
int possBeansSize =
possibleBeans.size();
-
- if(possBeansSize > 0){
+
+ if (possBeansSize > 0) {
//select one at random:
TODO better method?
- Random random = new
Random ();
+ Random random = new
Random();
possibleBeanIndex =
random.nextInt(possBeansSize);
- RMSequenceBean
selectedSequence = (RMSequenceBean)possibleBeans.get(possibleBeanIndex);
+ RMSequenceBean
selectedSequence = (RMSequenceBean) possibleBeans.get(possibleBeanIndex);
findSenderBean.setSequenceID(selectedSequence.getSequenceID());
-
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("sequence selected " + findSenderBean.getSequenceID());
- }
- else {
-
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message
found");
+ if
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+
log.debug("sequence selected " + findSenderBean.getSequenceID());
+ } else {
+ if
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+
log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message
found");
return false;
}
} else {
-
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message
found");
+ if
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit:
MakeConnectionProcessor::processInMessage, no matching message found");
return false;
- }
+ }
} else {
- break;
- }
+ break;
+ }
}
- if (transaction != null && transaction.isActive()) {
+ if (transaction != null && transaction.isActive()) {
transaction.commit();
transaction = storageManager.getTransaction();
}
replyToPoll(rmMsgCtx, senderBean, storageManager,
pending, makeConnection.getNamespaceValue(), transaction);
-
+
} finally {
if (transaction != null && transaction.isActive()) {
transaction.rollback();
}
}
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("Exit:
MakeConnectionProcessor::processInMessage");
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("Exit:
MakeConnectionProcessor::processInMessage");
return false;
}
-
+
public static void replyToPoll(RMMsgContext pollMessage,
- SenderBean matchingMessage,
- StorageManager storageManager,
- boolean pending,
- String makeConnectionNamespace,
- Transaction transaction)
- throws AxisFault
- {
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::replyToPoll");
+ SenderBean
matchingMessage,
+
StorageManager storageManager,
+ boolean
pending,
+ String
makeConnectionNamespace,
+ Transaction
transaction)
+ throws AxisFault {
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("Enter:
MakeConnectionProcessor::replyToPoll");
TransportOutDescription transportOut =
pollMessage.getMessageContext().getTransportOut();
- if (transportOut==null) {
+ if (transportOut == null) {
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug(message);
- throw new SandeshaException (message);
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug(message);
+ throw new SandeshaException(message);
}
-
+
String messageStorageKey =
matchingMessage.getMessageContextRefKey();
- MessageContext returnMessage =
storageManager.retrieveMessageContext(messageStorageKey,pollMessage.getConfigurationContext());
- if (returnMessage==null) {
+ MessageContext returnMessage =
storageManager.retrieveMessageContext(messageStorageKey,
pollMessage.getConfigurationContext());
+ if (returnMessage == null) {
String message = "Cannot find the message stored with
the key:" + messageStorageKey;
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug(message);
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug(message);
// Someone else has either removed the sender &
message, or another make connection got here first.
return;
}
-
- if(pending) addMessagePendingHeader(returnMessage,
makeConnectionNamespace);
+
+ if (pending) addMessagePendingHeader(returnMessage,
makeConnectionNamespace);
boolean continueSending = true;
RMMsgContext returnRMMsg =
MsgInitializer.initializeMessage(returnMessage);
- if(returnRMMsg.getRMNamespaceValue()==null){
- //this is the case when a stored application response
msg was not sucecsfully returned
+ if (returnRMMsg.getRMNamespaceValue() == null) {
+ //this is the case when a stored application response
msg was not sucecsfully returned
//on the sending transport's backchannel. Since the msg
was stored without a sequence header
//we need to lookup the namespace using the RMS bean
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("Looking up rmNamespace from RMS bean");
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("Looking up rmNamespace from RMS
bean");
String sequenceID = matchingMessage.getSequenceID();
- if(sequenceID!=null){
+ if (sequenceID != null) {
RMSBean rmsBean = new RMSBean();
rmsBean.setSequenceID(sequenceID);
rmsBean =
storageManager.getRMSBeanMgr().findUnique(rmsBean);
- if(rmsBean!=null){
+ if (rmsBean != null) {
returnRMMsg.setRMNamespaceValue(SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion()));
- }
- else{
- //we will never be able to reply to
this msg - at the moment the best bet is
+ } else {
+ //we will never be able to reply to
this msg - at the moment the best bet is
//to not process the reply anymore
- if(LoggingControl.isAnyTracingEnabled()
&& log.isDebugEnabled()) log.debug("Could not find RMS bean for polled msg");
+ if
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Could not find RMS
bean for polled msg");
continueSending = false;
//also remove the sender bean so that
we do not select this again
storageManager.getSenderBeanMgr().delete(matchingMessage.getMessageID());
}
}
}
-
- if(continueSending){
+
+ if (continueSending) {
// Commit the current transaction, so that the
SenderWorker can do it's own locking
// this transaction should be commited out before
gettting the worker lock.
// otherwise a dead lock can happen.
- if(transaction != null && transaction.isActive())
transaction.commit();
-
+ if (transaction != null && transaction.isActive())
transaction.commit();
+
SandeshaThread sender = storageManager.getSender();
WorkerLock lock = sender.getWorkerLock();
@@ -327,40 +334,40 @@
}
}
-
- setTransportProperties (returnMessage, pollMessage);
-
+
+ setTransportProperties(returnMessage, pollMessage);
+
// Link the response to the request
AxisOperation operation =
SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE,
pollMessage.getRMSpecVersion(),
pollMessage.getMessageContext().getAxisService());
- OperationContext context = new OperationContext
(operation, pollMessage.getMessageContext().getServiceContext());
-
+ OperationContext context = new
OperationContext(operation,
pollMessage.getMessageContext().getServiceContext());
+
context.addMessageContext(returnMessage);
returnMessage.setServiceContext(null);
returnMessage.setOperationContext(context);
-
+
returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE,
Boolean.TRUE);
returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
-
//running the MakeConnection through a SenderWorker.
//This will allow Sandesha2 to consider both of
following senarios equally.
// 1. A message being sent by the Sender thread.
// 2. A message being sent as a reply to an
MakeConnection.
-
+
worker.setMessage(returnRMMsg);
- worker.run();
-
+ worker.run();
+
TransportUtils.setResponseWritten(pollMessage.getMessageContext(), true);
}
-
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
+
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("Exit: MakeConnectionProcessor::replyToPoll");
}
-
- private static void addMessagePendingHeader (MessageContext
returnMessage, String namespace) {
+
+ private static void addMessagePendingHeader(MessageContext
returnMessage, String namespace) {
MessagePending messagePending = new MessagePending();
messagePending.setPending(true);
- if(returnMessage.getEnvelope().getHeader() == null){
+ if (returnMessage.getEnvelope().getHeader() == null) {
int SOAPVersion = Sandesha2Constants.SOAPVersion.v1_1;
if (!returnMessage.isSOAP11())
SOAPVersion =
Sandesha2Constants.SOAPVersion.v1_2;
@@ -375,13 +382,13 @@
return false;
}
- private static void setTransportProperties (MessageContext
returnMessage, RMMsgContext makeConnectionMessage) {
-
returnMessage.setProperty(MessageContext.TRANSPORT_OUT,makeConnectionMessage.getProperty(MessageContext.TRANSPORT_OUT));
-
returnMessage.setProperty(Constants.OUT_TRANSPORT_INFO,makeConnectionMessage.getProperty(Constants.OUT_TRANSPORT_INFO));
-
- Object contentType =
makeConnectionMessage.getProperty(Constants.Configuration.CONTENT_TYPE);
- returnMessage.setProperty(Constants.Configuration.CONTENT_TYPE,
contentType);
+ private static void setTransportProperties(MessageContext
returnMessage, RMMsgContext makeConnectionMessage) {
+ returnMessage.setProperty(MessageContext.TRANSPORT_OUT,
makeConnectionMessage.getProperty(MessageContext.TRANSPORT_OUT));
+ returnMessage.setProperty(Constants.OUT_TRANSPORT_INFO,
makeConnectionMessage.getProperty(Constants.OUT_TRANSPORT_INFO));
+
+ Object contentType =
makeConnectionMessage.getProperty(Constants.Configuration.CONTENT_TYPE);
+ returnMessage.setProperty(Constants.Configuration.CONTENT_TYPE,
contentType);
-
returnMessage.setTransportOut(makeConnectionMessage.getMessageContext().getTransportOut());
+
returnMessage.setTransportOut(makeConnectionMessage.getMessageContext().getTransportOut());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]