Author: chathura_ce
Date: Mon Apr 16 06:35:20 2007
New Revision: 529235

URL: http://svn.apache.org/viewvc?view=rev&rev=529235
Log:
Implemented the response timeout functionality. Now user can specify the 
timeout duration and timeout action in the endpoint configuration, so that all 
the messages flowing through that endpoint are subjected to specified timeouts. 
Synapse can either simply ignore responding to timed out messages or activate 
an error sequence when a timeout occurs.

Added:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
Modified:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/utils/EndpointDefinition.java

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java?view=diff&rev=529235&r1=529234&r2=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
 Mon Apr 16 06:35:20 2007
@@ -149,8 +149,7 @@
     int STATISTICS_ON =1;
     /** The statistics state-unset */
     int STATISTICS_UNSET=2;
-    
-    /** */
+
     String SYNAPSE_ERROR ="syapse_error" ;
 
     /** key for lookup sequence statistics stack */
@@ -172,5 +171,26 @@
     int  PROXYSERVICE_STATISTICS = 1;
 
     /** Endpoint statistics category*/
-    int ENDPOINT_STATISTICS = 2;    
+    int ENDPOINT_STATISTICS = 2;
+
+    /**
+     * don't do anything for response timeouts. this means infinite timeout. 
this is the default
+     * action, if the timeout configuration is not explicitly set.
+     */
+    int NONE = 100;
+
+    /** Discard the callback if the timeout for the response is expired */
+    int DISCARD = 101;
+
+    /**
+     * Discard the callback and activate specified fault sequence if the 
timeout for the response
+     * is expired
+     */
+    int DISCARD_AND_FAULT = 102;
+
+    /**
+     * Error codes for message sending
+     */
+    String TIME_OUT = "500";
+    String SENDING_FAULT = "600";
 }

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java?view=diff&rev=529235&r1=529234&r2=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java
 Mon Apr 16 06:35:20 2007
@@ -39,6 +39,11 @@
  *   <address uri="url" [format="soap|pox"] [optimize="mtom|swa"]>
  *      .. extensibility ..
  *
+ *      <timeout>
+ *          <duration>duration in milliseconds</duration>
+ *          <action>discard | fault</action>
+ *      </timeout>
+ * 
  *      <enableRM [policy="key"]/>+ <enableSec [policy="key"]/>+ 
<enableAddressing
  *      separateListener="true|false"/>+
  *   </address>
@@ -185,6 +190,33 @@
                     org.apache.synapse.config.xml.Constants.NULL_NAMESPACE, 
"policy"));
             if (policy != null) {
                 endpoint.setWsRMPolicyKey(policy.getAttributeValue());
+            }
+        }
+
+        // set the timeout configuration
+        OMElement timeout = elem.getFirstChildWithName(new QName(
+                org.apache.synapse.config.xml.Constants.SYNAPSE_NAMESPACE, 
"timeout"));
+        if (timeout != null) {
+            OMElement duration = timeout.getFirstChildWithName(new QName(
+                    org.apache.synapse.config.xml.Constants.SYNAPSE_NAMESPACE, 
"duration"));
+            if (duration != null) {
+                String d = duration.getText();
+                if (d != null) {
+                    endpoint.setTimeoutDuration(new Long(d).longValue());
+                }
+            }
+
+            OMElement action = timeout.getFirstChildWithName(new QName(
+                    org.apache.synapse.config.xml.Constants.SYNAPSE_NAMESPACE, 
"action"));
+            if (action != null) {
+                String a = action.getText();
+                if (a != null) {
+                    if (a.equalsIgnoreCase("discard")) {
+                        endpoint.setTimeoutAction(Constants.DISCARD);
+                    } else if (a.equalsIgnoreCase("fault")) {
+                        endpoint.setTimeoutAction(Constants.DISCARD_AND_FAULT);
+                    }
+                }
             }
         }
 

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java?view=diff&rev=529235&r1=529234&r2=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java
 Mon Apr 16 06:35:20 2007
@@ -53,6 +53,11 @@
  *    <enableSec/>+
  *    <enableAddressing/>+
  *
+ *    <timeout>
+ *      <duration>duration in milliseconds</duration>
+ *      <action>discard | fault</action>
+ *    </timeout>
+ *
  *  </address>
  * </endpoint>
  */
@@ -149,6 +154,21 @@
                         "policy", null, endpt.getWsSecPolicyKey()));
             }
             address.addChild(sec);
+        }
+
+        if (endpt.getTimeoutAction() != Constants.NONE) {
+            OMElement timeout = fac.createOMElement("timeout", 
Constants.SYNAPSE_OMNAMESPACE);
+
+            OMElement duration = fac.createOMElement("duration", 
Constants.SYNAPSE_OMNAMESPACE);
+            duration.setText(Long.toString(endpt.getTimeoutDuration()));
+            timeout.addChild(duration);
+
+            OMElement action = fac.createOMElement("action", 
Constants.SYNAPSE_OMNAMESPACE);
+            if (endpt.getTimeoutAction() == Constants.DISCARD) {
+                action.setText("discard");
+            } else if (endpt.getTimeoutAction() == 
Constants.DISCARD_AND_FAULT) {
+                action.setText("fault");
+            }
         }
 
         return address;

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java?view=diff&rev=529235&r1=529234&r2=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
 Mon Apr 16 06:35:20 2007
@@ -22,6 +22,7 @@
 import org.apache.axis2.client.async.Callback;
 import org.apache.axis2.client.async.AsyncResult;
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.Constants;
 
 /**
  * This class only "holds" the Synapse out message context for the Synapse 
callback message
@@ -31,6 +32,17 @@
 
     MessageContext synapseOutMsgCtx = null;
 
+    /**
+     * Time to timeout this callback.
+     */
+    private long timeOutOn;
+
+    /**
+     * Action to perform when timeout occurs.
+     */
+    private int timeOutAction = Constants.NONE;
+
+
     public AsyncCallback(org.apache.synapse.MessageContext synapseOutMsgCtx) {
         this.synapseOutMsgCtx = synapseOutMsgCtx;
     }
@@ -45,5 +57,21 @@
 
     public MessageContext getSynapseOutMsgCtx() {
         return synapseOutMsgCtx;
+    }
+
+    public long getTimeOutOn() {
+        return timeOutOn;
+    }
+
+    public void setTimeOutOn(long timeOutOn) {
+        this.timeOutOn = timeOutOn;
+    }
+
+    public int getTimeOutAction() {
+        return timeOutAction;
+    }
+
+    public void setTimeOutAction(int timeOutAction) {
+        this.timeOutAction = timeOutAction;
     }
 }

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java?view=diff&rev=529235&r1=529234&r2=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
 Mon Apr 16 06:35:20 2007
@@ -172,7 +172,14 @@
         // always set a callback as we decide if the send it blocking or non 
blocking within
         // the MEP client. This does not cause an overhead, as we simply 
create a 'holder'
         // object with a reference to the outgoing synapse message context 
synapseOutMessageContext
-        mepClient.setCallback(new AsyncCallback(synapseOutMessageContext));
+        AsyncCallback callback = new AsyncCallback(synapseOutMessageContext);
+        if (endpoint != null) {
+            // set the timeout time and the timeout action to the callback, so 
that the TimeoutHandler
+            // can detect timed out callbacks and take approprite action.
+            callback.setTimeOutOn(System.currentTimeMillis() + 
endpoint.getTimeoutDuration());
+            callback.setTimeOutAction(endpoint.getTimeoutAction());
+        }
+        mepClient.setCallback(callback);
         
         mepClient.execute(false);
 

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java?view=diff&rev=529235&r1=529234&r2=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
 Mon Apr 16 06:35:20 2007
@@ -40,8 +40,18 @@
 
     private Map callbackStore;  // this will be made thread safe within the 
constructor
 
+    /**
+     * Timer to schedule the timeout task.
+     */
+    private Timer timeOutTimer = null;
+
     public SynapseCallbackReceiver() {
         callbackStore = Collections.synchronizedMap(new HashMap());
+
+        // create the Timer object and a TimeoutHandler task. Schedule it to 
run every 10 seconds from here
+        TimeoutHandler timeoutHandler = new TimeoutHandler(callbackStore);
+        timeOutTimer = new Timer(true);
+        timeOutTimer.schedule(timeoutHandler, 0, 1000);
     }
 
     public void addCallback(String MsgID, Callback callback) {
@@ -100,6 +110,10 @@
                 if (e == null) {
                     e = new Exception(fault.toString());
                 }
+
+                // set an error code to the message context, so that error 
sequences can filter
+                // using that property to determine the cause of error
+                synapseOutMsgCtx.setProperty("error-code", 
Constants.SENDING_FAULT);
 
                 ((FaultHandler) 
faultStack.pop()).handleFault(synapseOutMsgCtx, e);
             }

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java?view=auto&rev=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
 Mon Apr 16 06:35:20 2007
@@ -0,0 +1,111 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.core.axis2;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.Constants;
+import org.apache.synapse.FaultHandler;
+import org.apache.synapse.mediators.transform.FaultMediator;
+import org.apache.synapse.mediators.MediatorFaultHandler;
+
+import java.util.TimerTask;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.Stack;
+
+/**
+ * An object of this class is registered to be invoked in some predefined time 
intervals. This
+ * checks the timeouts of callbacks stored in the SynapseCallbackReceiver and 
removes all expired
+ * callbacks. Timeouts of the callbacks are stored as the time, not the 
duration. So that the
+ * time or the interval of invoking this class does not affect the correctness 
of the timeouts,
+ * although longer intervals would introduce larger error between the actual 
timeout and the
+ * specified timeout.
+ *
+ * For each invocation this gets a time value to be compared against the 
timeouts of the callback
+ * objects. This time is the System.currentTimeMillis() for Java 1.4 and 
System.nanoTime() for
+ * Java 1.5 and later.
+ */
+public class TimeoutHandler extends TimerTask {
+
+    private Map callbackStore = null;
+
+    public TimeoutHandler(Map callbacks) {
+        this.callbackStore = callbacks;
+    }
+
+    /**
+     * Checks if the timeout has expired for each callback in the callback 
store. If expired, removes
+     * the callback. If specified sends a fault message to the client about 
the timeout.
+     */
+    public void run() {
+
+        // checks if callback store contains at least one entry before 
proceeding. otherwise getting
+        // the time for doing nothing would be a inefficient task.
+        if (callbackStore.size() > 0) {
+
+            long currentTime = currentTime();
+
+            Iterator i = callbackStore.keySet().iterator();
+
+            while (i.hasNext()) {
+                Object key = i.next();
+                AsyncCallback callback = (AsyncCallback) 
callbackStore.get(key);
+
+                if (callback.getTimeOutAction() != Constants.NONE) {
+
+                    if (callback.getTimeOutOn() <= currentTime) {
+                        callbackStore.remove(key);
+
+                        if (callback.getTimeOutAction() == 
Constants.DISCARD_AND_FAULT) {
+
+                            // actiavte the fault sequence of the current 
sequence mediator
+
+                            MessageContext msgContext = 
callback.getSynapseOutMsgCtx();
+
+                            // add an error code to the message context, so 
that error sequences
+                            // can identify the cause of error
+                            msgContext.setProperty("error-code", 
Constants.TIME_OUT);
+
+                            Stack faultStack = msgContext.getFaultStack();
+
+                            for (int j = 0; j < faultStack.size(); j++) {
+                                Object o = faultStack.pop();
+                                if (o instanceof MediatorFaultHandler) {
+                                    ((MediatorFaultHandler) 
o).handleFault(msgContext);
+                                }
+                            }
+
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns the current time.
+     *
+     * @return  System.currentTimeMillis() on Java 1.4
+     *          System.nanoTime() on Java 1.5 (todo: implement)
+     */
+    private long currentTime() {
+        return System.currentTimeMillis();
+    }
+}

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/utils/EndpointDefinition.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/utils/EndpointDefinition.java?view=diff&rev=529235&r1=529234&r2=529235
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/utils/EndpointDefinition.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/utils/EndpointDefinition.java
 Mon Apr 16 06:35:20 2007
@@ -52,6 +52,17 @@
     /** use SWA **/
     private boolean useSwa = false;
 
+    /**
+     * timeout duration for waiting for a response. if the user has set some 
timeout action and
+     * the timeout duration is not set, default is set to 30 seconds. note 
that if the user has
+     * not set any timeout configuration, default timeout action is set to 
NONE, which won't do
+     * anything for timeouts.
+    */
+    private long timeoutDuration = 30000;
+
+    /** action to perform when a timeout occurs (NONE | DISCARD | 
DISCARD_AND_FAULT) **/
+    private int timeoutAction = Constants.NONE;
+
     /** To decide to whether statistics should have collected or not */
     private int statisticsEnable = Constants.STATISTICS_UNSET;
 
@@ -188,6 +199,22 @@
 
     public void setUseSwa(boolean useSwa) {
         this.useSwa = useSwa;
+    }
+
+    public long getTimeoutDuration() {
+        return timeoutDuration;
+    }
+
+    public void setTimeoutDuration(long timeoutDuration) {
+        this.timeoutDuration = timeoutDuration;
+    }
+
+    public int getTimeoutAction() {
+        return timeoutAction;
+    }
+
+    public void setTimeoutAction(int timeoutAction) {
+        this.timeoutAction = timeoutAction;
     }
 
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to