Hi devs ,
When invoking an in only service via an Endpoint we need to set OUT-ONLY
property in the Message Context (if we do not do this call backs will
get accumulated in the SynapseCallbackReceiver : there is a map keeping
those to correlate request and response )
In success scenario that will work fine. But the problem is in
a scenario where the actual service is unavailable, The problem is, in that
case Synapse Endpoint does not get suspended,
In OUT-ONLY fault scenario since there is no callback registered,
the relevant fault handler not get called. So as a result the endpoint/s
will not get notified with a fault in an OUT_ONLY scenario. So this is a
synapse level bug.
I looked in to this issue and created a patch to resolve this problem.I
m attaching the patch for review. If devs are ok with the patch i'll create
a JIRA and attach this.
thanks,
Charith
--
Charith Dhanushka Wickramarachchi
http://charithwiki.blogspot.com/
Index:
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
===================================================================
---
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
(revision 1037626)
+++
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
(working copy)
@@ -715,8 +715,10 @@
if (responseMsgCtx == null ||
outMsgCtx.getOptions().isUseSeparateListener()) {
// This means that we received a 202 accepted for
an out-only ,
- // for which we do not need a dummy message anyway
- return;
+ // Since we need to notify the SynapseCallback
receiver to remove the
+ // call backs registered we set a custom property
+
outMsgCtx.setProperty(NhttpConstants.HTTP_202_RECEIVED,"true");
+ mr.receive(outMsgCtx);
}
responseMsgCtx.setServerSide(true);
responseMsgCtx.setDoingREST(outMsgCtx.isDoingREST());
Index:
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
===================================================================
---
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
(revision 1037626)
+++
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
(working copy)
@@ -128,4 +128,11 @@
public static final String NHTTP_INPUT_STREAM = "nhttp.input.stream";
/** Output stram of the message is set to this message context property */
public static final String NHTTP_OUTPUT_STREAM = "nhttp.output.stream";
+
+ /**
+ * A message context property indicating "TRUE", This will set on success
scenarios
+ */
+
+ public static final String HTTP_202_RECEIVED = "HTTP_202_RECEIVED";
+
}
Index:
modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
===================================================================
---
modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
(revision 1037626)
+++
modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
(working copy)
@@ -304,16 +304,11 @@
axisCfgCtx, (AxisServiceGroup) anoymousService.getParent());
ServiceContext serviceCtx = sgc.getServiceContext(anoymousService);
- boolean outOnlyMessage =
"true".equals(synapseOutMessageContext.getProperty(
- SynapseConstants.OUT_ONLY)) ||
WSDL2Constants.MEP_URI_IN_ONLY.equals(
- originalInMsgCtx.getOperationContext()
- .getAxisOperation().getMessageExchangePattern());
+
// get a reference to the DYNAMIC operation of the Anonymous Axis2
service
- AxisOperation axisAnonymousOperation = anoymousService.getOperation(
- outOnlyMessage ?
- new QName(AnonymousServiceFactory.OUT_ONLY_OPERATION) :
- new QName(AnonymousServiceFactory.OUT_IN_OPERATION));
+ AxisOperation axisAnonymousOperation =
+ anoymousService.getOperation(new
QName(AnonymousServiceFactory.OUT_IN_OPERATION));
Options clientOptions =
MessageHelper.cloneOptions(originalInMsgCtx.getOptions());
clientOptions.setUseSeparateListener(separateListener);
@@ -369,23 +364,23 @@
axisOutMsgCtx.setProperty(SynapseConstants.SEND_TIMEOUT,
endpoint.getTimeoutDuration());
}
- if (!outOnlyMessage) {
- // always set a callback as we decide if the send it blocking or
non blocking within
- // the MEP client. This does not cause an overhead, as we simply
create a 'holder'
- // object with a reference to the outgoing synapse message context
- // synapseOutMessageContext
- AsyncCallback callback = new
AsyncCallback(synapseOutMessageContext);
- if (endpoint != null) {
- // set the timeout time and the timeout action to the
callback, so that the
- // TimeoutHandler can detect timed out callbacks and take
approprite action.
- callback.setTimeOutOn(System.currentTimeMillis() +
endpoint.getTimeoutDuration());
- callback.setTimeOutAction(endpoint.getTimeoutAction());
- } else {
- callback.setTimeOutOn(System.currentTimeMillis());
- }
- mepClient.setCallback(callback);
+
+ // always set a callback as we decide if the send it blocking or non
blocking within
+ // the MEP client. This does not cause an overhead, as we simply
create a 'holder'
+ // object with a reference to the outgoing synapse message context
+ // synapseOutMessageContext
+ AsyncCallback callback = new AsyncCallback(synapseOutMessageContext);
+ if (endpoint != null) {
+ // set the timeout time and the timeout action to the callback, so
that the
+ // TimeoutHandler can detect timed out callbacks and take
approprite action.
+ callback.setTimeOutOn(System.currentTimeMillis() +
endpoint.getTimeoutDuration());
+ callback.setTimeOutAction(endpoint.getTimeoutAction());
+ } else {
+ callback.setTimeOutOn(System.currentTimeMillis());
}
+ mepClient.setCallback(callback);
+
// with the nio transport, this causes the listener not to write a 202
// Accepted response, as this implies that Synapse does not yet know if
// a 202 or 200 response would be written back.
Index:
modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
===================================================================
---
modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
(revision 1037626)
+++
modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
(working copy)
@@ -96,6 +96,21 @@
}
/**
+ * Remove the callback registered with the given message id.
+ *
+ * @param MsgId
+ */
+ private void removeCallBack(String MsgId) {
+ if (callbackStore.containsKey(MsgId)) {
+ callbackStore.remove(MsgId);
+ if (log.isDebugEnabled()) {
+ log.debug("CallBack registered with Message id : " + MsgId + "
removed from the " +
+ "callback store since we got a accepted
Notification");
+ }
+ }
+ }
+
+ /**
* Everytime a response message is received this method gets invoked. It
will then select
* the outgoing *Synapse* message context for the reply we received, and
determine what action
* to take at the Synapse level
@@ -107,6 +122,17 @@
String messageID = null;
+ /**
+ * In an In-only scenario if the client receives a HTTP 202 accepted
we need to
+ * remove the call back/s registered for that request.
+ * This if will check weather this is a message sent in a that
scenario and remove the callback
+ */
+ if (messageCtx.getProperty("HTTP_202_RECEIVED") != null &&
"true".equals(
+ messageCtx.getProperty("HTTP_202_RECEIVED"))) {
+ removeCallBack(messageCtx.getMessageID());
+ return;
+ }
+
if (messageCtx.getOptions() != null &&
messageCtx.getOptions().getRelatesTo() != null) {
// never take a chance with a NPE at this stage.. so check at each
level :-)
Options options = messageCtx.getOptions();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]