Author: ruwan
Date: Mon Dec  3 22:21:41 2007
New Revision: 600800

URL: http://svn.apache.org/viewvc?rev=600800&view=rev
Log:
Improved the caching code and the mediator

Modified:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=600800&r1=600799&r2=600800&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 Mon Dec  3 22:21:41 2007
@@ -32,9 +32,10 @@
 import org.apache.synapse.mediators.base.SequenceMediator;
 import org.apache.synapse.util.FixedByteArrayOutputStream;
 import org.apache.synapse.util.MessageHelper;
-import org.wso2.caching.Cache;
+import org.wso2.caching.CacheManager;
 import org.wso2.caching.CachedObject;
 import org.wso2.caching.CachingConstants;
+import org.wso2.caching.CachingException;
 import org.wso2.caching.digest.DigestGenerator;
 
 import javax.xml.soap.MessageFactory;
@@ -47,12 +48,17 @@
 import java.io.IOException;
 
 /**
+ * Cacche Mediator will cache the response messages indexed using the hash 
value of the
+ * request message, and subsequent messages with the same request (request 
hash will be
+ * generated and checked for the equality) within the cache expiration period 
will be served
+ * from the stored resposnses in the cache
  *
+ * @see org.apache.synapse.Mediator
  */
 public class CacheMediator extends AbstractMediator {
 
     private String id = null;
-    private String scope = CachingConstants.SCOPE_PER_HOST;
+    private String scope = CachingConstants.SCOPE_PER_HOST;// global
     private boolean collector = false;
     private DigestGenerator digestGenerator = 
CachingConstants.DEFAULT_XML_IDENTIFIER;
     private int inMemoryCacheSize = CachingConstants.DEFAULT_CACHE_SIZE;
@@ -63,8 +69,8 @@
     private SequenceMediator onCacheHitSequence = null;
     private String onCacheHitRef = null;
     private int maxMessageSize = 0;
-    private String cacheObjKey = CachingConstants.CACHE_OBJECT; // default 
per-host
-    private static final String CACHE_OBJ_PREFIX = "synapse.cache_obj_";
+    private String cacheManagerKey = CachingConstants.CACHE_MANAGER; // 
default per-host
+    private static final String CACHE_MANAGER_PREFIX = 
"synapse.cache_manager_";
 
     public boolean mediate(MessageContext synCtx) {
 
@@ -107,42 +113,42 @@
 
         if (traceOrDebugOn) {
             traceOrDebug(traceOn,
-                "Looking up cache at scope : " + scope + " with ID : " + 
cacheObjKey);
+                "Looking up cache at scope : " + scope + " with ID : " + 
cacheManagerKey);
         }
 
         // look up cache
-        Object prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
-        Cache cache;
-        if (prop != null && prop instanceof Cache) {
-            cache = (Cache) prop;
-
+        Object prop = cfgCtx.getPropertyNonReplicable(cacheManagerKey);
+        CacheManager cacheManager;
+        if (prop != null && prop instanceof CacheManager) {
+            cacheManager = (CacheManager) prop;
         } else {
             synchronized (cfgCtx) {
                 // check again after taking the lock to make sure no one else 
did it before us
-                prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
-                if (prop != null && prop instanceof Cache) {
-                    cache = (Cache) prop;
+                prop = cfgCtx.getPropertyNonReplicable(cacheManagerKey);
+                if (prop != null && prop instanceof CacheManager) {
+                    cacheManager = (CacheManager) prop;
 
                 } else {
                     if (traceOrDebugOn) {
                         traceOrDebug(traceOn, "Creating/recreating the cache 
object");
                     }
-                    cache = new Cache();
-                    cfgCtx.setProperty(cacheObjKey, cache);
+                    cacheManager = new CacheManager();
+                    cfgCtx.setProperty(cacheManagerKey, cacheManager);
                 }
             }
         }
 
         boolean result = true;
-        if (synCtx.isResponse()) {
-            processResponseMessage(synCtx, cfgCtx, traceOrDebugOn, traceOn, 
cache);
-
-        } else {
-            result = processRequestMessage(synCtx, cfgCtx, traceOrDebugOn, 
traceOn, cache, fbaos);
-        }
-
         try {
-            Replicator.replicate(cfgCtx);
+            
+            if (synCtx.isResponse()) {
+                processResponseMessage(synCtx, cfgCtx, traceOrDebugOn, 
traceOn, cacheManager);
+
+            } else {
+                result = processRequestMessage(
+                    synCtx, cfgCtx, traceOrDebugOn, traceOn, cacheManager);
+            }
+            
         } catch (ClusteringFault clusteringFault) {
             if (traceOrDebugOn) {
                 traceOrDebug(traceOn, "Unable to replicate Cache mediator 
state among the cluster");
@@ -152,6 +158,7 @@
         if (traceOrDebugOn) {
             traceOrDebug(traceOn, "End : Cache mediator");
         }
+        
         return result;
     }
 
@@ -163,10 +170,11 @@
      * @param traceOn        is tracing on?
      * @param synCtx         the current message (response)
      * @param cfgCtx         the abstract context in which the cache will be 
kept
-     * @param cache          the cache
+     * @param cacheManager   the cache manager
+     * @throws ClusteringFault is there is an error in replicating the cfgCtx
      */
     private void processResponseMessage(MessageContext synCtx, 
ConfigurationContext cfgCtx,
-        boolean traceOrDebugOn, boolean traceOn, Cache cache) {
+        boolean traceOrDebugOn, boolean traceOn, CacheManager cacheManager) 
throws ClusteringFault {
 
         if (!collector) {
             handleException("Response messages cannot be handled in a non 
collector cache", synCtx);
@@ -177,18 +185,16 @@
         if (requestHash != null) {
             if (traceOrDebugOn) {
                 traceOrDebug(traceOn, "Storing the response message into the 
cache at scope : " +
-                    scope + " with ID : " + cacheObjKey + " for request hash : 
" + requestHash);
+                    scope + " with ID : " + cacheManagerKey + " for request 
hash : " + requestHash);
             }
 
-            Object obj = cache.getResponseForKey(requestHash, cfgCtx);
+            CachedObject cachedObj = 
cacheManager.getResponseForKey(requestHash, cfgCtx);
+            if (cachedObj != null) {
 
-            if (obj != null && obj instanceof CachedObject) {
-
-                CachedObject cachedObj = (CachedObject) obj;
                 if (traceOrDebugOn) {
                     traceOrDebug(traceOn, "Storing the response for the 
message with ID : " +
                         synCtx.getMessageID() + " with request hash ID : " +
-                        cachedObj.getRequestHash() + " in the cache : " + 
cacheObjKey);
+                        cachedObj.getRequestHash() + " in the cache : " + 
cacheManagerKey);
                 }
 
                 ByteArrayOutputStream outStream = new ByteArrayOutputStream();
@@ -207,9 +213,10 @@
                 // cachedObj.setResponseHash(cache.getGenerator().getDigest(
                 //     ((Axis2MessageContext) 
synCtx).getAxis2MessageContext()));
 
-                cachedObj.setExpireTime(System.currentTimeMillis() + 
cachedObj.getTimeout());
+                cachedObj.setExpireTimeMillis(System.currentTimeMillis() + 
cachedObj.getTimeout());
 
-                cfgCtx.setProperty(cacheObjKey, cache);
+                cfgCtx.setProperty(cacheManagerKey, cacheManager);
+                Replicator.replicate(cfgCtx);
 
             } else {
                 auditWarn("A response message without a valid mapping to the " 
+
@@ -231,31 +238,35 @@
      * @param cfgCtx         the AbstractContext in which the cache will be 
kept
      * @param traceOrDebugOn is tracing or debug logging on?
      * @param traceOn        is tracing on?
-     * @param cache          the cache
-     * @param fbaos          the serialized request envelope
+     * @param cacheManager   the cache manager
      * @return should this mediator terminate further processing?
+     * @throws ClusteringFault if there is an error in replicating the cfgCtx
      */
     private boolean processRequestMessage(MessageContext synCtx, 
ConfigurationContext cfgCtx,
-        boolean traceOrDebugOn, boolean traceOn, Cache cache, 
FixedByteArrayOutputStream fbaos) {
+        boolean traceOrDebugOn, boolean traceOn, CacheManager cacheManager) 
throws ClusteringFault {
 
         if (collector) {
             handleException("Request messages cannot be handled in a collector 
cache", synCtx);
         }
 
-        String requestHash = digestGenerator
-            .getDigest(((Axis2MessageContext) 
synCtx).getAxis2MessageContext());
-        synCtx.setProperty(CachingConstants.REQUEST_HASH_KEY, requestHash);
+        String requestHash = null;
+        try {  
+            requestHash = digestGenerator.getDigest(((Axis2MessageContext) 
synCtx).getAxis2MessageContext());
+            synCtx.setProperty(CachingConstants.REQUEST_HASH_KEY, requestHash);
+        } catch (CachingException e) {
+            handleException("Error in calculating the hash value of the 
request", e, synCtx);
+        }
 
         if (traceOrDebugOn) {
             traceOrDebug(traceOn, "Generated request hash : " + requestHash);
         }
 
-        if (cache.containsKey(requestHash) &&
-            cache.getResponseForKey(requestHash, cfgCtx) instanceof 
CachedObject) {
+        if (cacheManager.containsKey(requestHash) &&
+            cacheManager.getResponseForKey(requestHash, cfgCtx) != null) {
 
             // get the response from the cache and attach to the context and 
change the
             // direction of the message
-            CachedObject cachedObj = (CachedObject) 
cache.getResponseForKey(requestHash, cfgCtx);
+            CachedObject cachedObj = 
cacheManager.getResponseForKey(requestHash, cfgCtx);
 
             if (!cachedObj.isExpired() && cachedObj.getResponseEnvelope() != 
null) {
 
@@ -273,15 +284,17 @@
                     org.apache.axiom.soap.SOAPEnvelope omSOAPEnv =
                         
SAAJUtil.toOMSOAPEnvelope(smsg.getSOAPPart().getDocumentElement());
 
+                    // todo: if there is a WSA messageID in the response, is 
that need to be unique on each and every resp
+
                     synCtx.setEnvelope(omSOAPEnv);
                 } catch (AxisFault axisFault) {
-                    handleException("Error setting response envelope from 
cache : " + cacheObjKey,
+                    handleException("Error setting response envelope from 
cache : " + cacheManagerKey,
                         synCtx);
                 } catch (IOException ioe) {
-                    handleException("Error setting response envelope from 
cache : " + cacheObjKey,
+                    handleException("Error setting response envelope from 
cache : " + cacheManagerKey,
                         ioe, synCtx);
                 } catch (SOAPException soape) {
-                    handleException("Error setting response envelope from 
cache : " + cacheObjKey,
+                    handleException("Error setting response envelope from 
cache : " + cacheManagerKey,
                         soape, synCtx);
                 }
 
@@ -306,7 +319,7 @@
 
                     if (traceOrDebugOn) {
                         traceOrDebug(traceOn, "Request message " + 
synCtx.getMessageID() +
-                            " has served from the cache : " + cacheObjKey);
+                            " was served from the cache : " + cacheManagerKey);
                     }
                     // send the response back if there is not onCacheHit is 
specified
                     synCtx.setTo(null);
@@ -317,29 +330,31 @@
 
             } else {
                 // cache exists, but has expired...
-                cachedObj.clearCache();
+                cachedObj.expire();
                 if (traceOrDebugOn) {
                     traceOrDebug(traceOn,
                         "Existing cached response has expired. Reset cache 
element");
                 }
 
-                cfgCtx.setProperty(cacheObjKey, cache);
+                cfgCtx.setProperty(cacheManagerKey, cacheManager);
+                Replicator.replicate(cfgCtx);
             }
 
         } else {
 
+            // todo: find a proper way of achieving the cache size check
             // if not found in cache, check if we can cache this request
-            if (cache.getCacheKeys().size() == inMemoryCacheSize) {
-                cache.removeExpiredResponses(cfgCtx);
-                if (cache.getCacheKeys().size() == inMemoryCacheSize) {
+            if (cacheManager.getCacheKeys().size() == inMemoryCacheSize) {
+                cacheManager.removeExpiredResponses(cfgCtx);
+                if (cacheManager.getCacheKeys().size() == inMemoryCacheSize) {
                     if (traceOrDebugOn) {
                         traceOrDebug(traceOn, "In-memory cache is full. Unable 
to cache");
                     }
                 } else {
-                    storeRequestToCache(synCtx, cfgCtx, requestHash, cache, 
fbaos);
+                    storeRequestToCache(cfgCtx, requestHash, cacheManager);
                 }
             } else {
-                storeRequestToCache(synCtx, cfgCtx, requestHash, cache, fbaos);
+                storeRequestToCache(cfgCtx, requestHash, cacheManager);
             }
         }
         return true;
@@ -348,34 +363,23 @@
     /**
      * Store request message to the cache
      *
-     * @param synCtx      the request message
-     * @param cfgCtx      the Abstract context in which the cache will be kept
-     * @param requestHash the request hash that has already been computed
-     * @param cache       the cache
-     * @param fbaos       the serialized request envelope
+     * @param cfgCtx        - the Abstract context in which the cache will be 
kept
+     * @param requestHash   - the request hash that has already been computed
+     * @param cacheManager  - the cache
+     * @throws ClusteringFault if there is an error in replicating the cfgCtx
      */
-    private void storeRequestToCache(MessageContext synCtx, 
ConfigurationContext cfgCtx,
-        String requestHash, Cache cache, FixedByteArrayOutputStream fbaos) {
+    private void storeRequestToCache(ConfigurationContext cfgCtx,
+        String requestHash, CacheManager cacheManager) throws ClusteringFault {
         
         CachedObject cachedObj = new CachedObject();
-        if (fbaos != null) {
-            cachedObj.setRequestEnvelope(fbaos.toByteArray());
-        } else {
-            // this else block can be commented out for the perf improvements,
-            // because we are not using this for the moment
-            ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
-            try {
-                
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
-                cachedObj.setRequestEnvelope(requestStream.toByteArray());
-            } catch (XMLStreamException e) {
-                handleException("Unable to store the request in to the cache", 
e, synCtx);
-            }
-        }
         cachedObj.setRequestHash(requestHash);
+        // this does not set the expiretime but just sets the timeout and the 
espiretime will
+        // be set when the response is availabel
         cachedObj.setTimeout(timeout);
-        cache.addResponseWithKey(requestHash, cachedObj, cfgCtx);
+        cacheManager.addResponseWithKey(requestHash, cachedObj, cfgCtx);
 
-        cfgCtx.setProperty(cacheObjKey, cache);
+        cfgCtx.setProperty(cacheManagerKey, cacheManager);
+        Replicator.replicate(cfgCtx);
     }
 
     public String getId() {
@@ -393,7 +397,7 @@
     public void setScope(String scope) {
         this.scope = scope;
         if (CachingConstants.SCOPE_PER_MEDIATOR.equals(scope)) {
-            cacheObjKey = CACHE_OBJ_PREFIX + id;
+            cacheManagerKey = CACHE_MANAGER_PREFIX + id;
         }
     }
 
@@ -429,6 +433,7 @@
         this.diskCacheSize = diskCacheSize;
     }
 
+    // change the variable to Timeout milis seconds
     public long getTimeout() {
         return timeout / 1000;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to