Author: ruwan
Date: Mon Oct 29 19:34:10 2007
New Revision: 589942

URL: http://svn.apache.org/viewvc?rev=589942&view=rev
Log:
Adding the Aggregate Mediator - :)

Added:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
   (with props)
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
   (with props)
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
   (with props)
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
   (with props)
Modified:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java?rev=589942&view=auto
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
 Mon Oct 29 19:34:10 2007
@@ -0,0 +1,167 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
+import org.apache.synapse.mediators.builtin.DropMediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.xpath.AXIOMXPath;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jaxen.JaxenException;
+
+import javax.xml.namespace.QName;
+
+/**
+ * <aggregate>
+ *  <corelateOn expression="XPATH-expression"/>
+ *  <completeCondition timeout="time-in-seconds">
+ *   <messageCount min="int-min" max="int-max"/>
+ *  </completeCondition>
+ *  <onComplete expression="XPATH-expression" sequence="sequence-ref">
+ *   (mediator +)?
+ *  </onComplete>
+ *  <invalidate sequence="sequence-ref" timeout="time-in-seconds">
+ *   (mediator +)?
+ *  </invalidate>
+ * </aggregate>
+ */
+public class AggregateMediatorFactory extends AbstractMediatorFactory {
+
+    private static final Log log = 
LogFactory.getLog(AggregateMediatorFactory.class);
+
+    private static final QName AGGREGATE_Q = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregate");
+    private static final QName CORELATE_ON_Q = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "corelateOn");
+    private static final QName COMPLETE_CONDITION_Q
+            = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, 
"completeCondition");
+    private static final QName MESSAGE_COUNT_Q
+            = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "messageCount");
+    private static final QName ON_COMPLETE_Q = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "onComplete");
+    private static final QName INVALIDATE_Q = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "invalidate");
+
+    private static final QName TIME_TO_LIVE_Q = new 
QName(XMLConfigConstants.NULL_NAMESPACE, "timeToLive");
+    private static final QName EXPRESSION_Q = new 
QName(XMLConfigConstants.NULL_NAMESPACE, "expression");
+    private static final QName TIMEOUT_Q = new 
QName(XMLConfigConstants.NULL_NAMESPACE, "timeout");
+    private static final QName MIN_Q = new 
QName(XMLConfigConstants.NULL_NAMESPACE, "min");
+    private static final QName MAX_Q = new 
QName(XMLConfigConstants.NULL_NAMESPACE, "max");
+    private static final QName TYPE_Q = new 
QName(XMLConfigConstants.NULL_NAMESPACE, "type");
+    private static final QName SEQUENCE_Q = new 
QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
+
+    public Mediator createMediator(OMElement elem) {
+
+        AggregateMediator mediator = new AggregateMediator();
+        processTraceState(mediator, elem);
+        // todo: need to fix
+        OMAttribute timeToLive = elem.getAttribute(TIME_TO_LIVE_Q);
+        if (timeToLive != null) {
+            
mediator.setTimeToInvalidate(Long.parseLong(timeToLive.getAttributeValue()) * 
1000);
+        }
+
+        OMElement corelateOn = elem.getFirstChildWithName(CORELATE_ON_Q);
+        if (corelateOn != null) {
+            OMAttribute corelateExpr = corelateOn.getAttribute(EXPRESSION_Q);
+            if (corelateExpr != null) {
+                try {
+                    AXIOMXPath xp = new 
AXIOMXPath(corelateExpr.getAttributeValue());
+                    OMElementUtils.addNameSpaces(xp, corelateOn, log);
+                    mediator.setCorelateExpression(xp);
+                } catch (JaxenException e) {
+                    handleException("Unable to load the corelate XPATH 
expression", e);
+                }
+            }
+        }
+
+        OMElement completeCond = 
elem.getFirstChildWithName(COMPLETE_CONDITION_Q);
+        if (completeCond != null) {
+            OMAttribute completeTimeout = completeCond.getAttribute(TIMEOUT_Q);
+            if (completeTimeout != null) {
+                mediator.setCompleteTimeout(
+                        Long.parseLong(completeTimeout.getAttributeValue()) * 
1000);
+            }
+
+            OMElement messageCount = 
completeCond.getFirstChildWithName(MESSAGE_COUNT_Q);
+            if (messageCount != null) {
+                OMAttribute min = messageCount.getAttribute(MIN_Q);
+                if (min != null) {
+                    
mediator.setMinMessagesToComplete(Integer.parseInt(min.getAttributeValue()));
+                }
+
+                OMAttribute max = messageCount.getAttribute(MAX_Q);
+                if (max != null) {
+                    
mediator.setMaxMessagesToComplete(Integer.parseInt(max.getAttributeValue()));
+                }
+            }
+        }
+
+        OMElement invalidate = elem.getFirstChildWithName(INVALIDATE_Q);
+        if (invalidate != null) {
+            OMAttribute sequenceRef = invalidate.getAttribute(SEQUENCE_Q);
+            if (sequenceRef != null) {
+                
mediator.setInvalidMsgSequenceRef(sequenceRef.getAttributeValue());
+            } else if (invalidate.getFirstElement() != null) {
+                mediator.setInvalidMsgSequence(
+                        (new 
SequenceMediatorFactory()).createAnonymousSequence(invalidate));
+            }
+
+            OMAttribute timeout = invalidate.getAttribute(TIMEOUT_Q);
+            if (timeout != null) {
+                
mediator.setInvlidateToDestroyTime(Long.parseLong(timeout.getAttributeValue()));
+            } else {
+                mediator.setInvlidateToDestroyTime(300);
+            }
+        }
+
+        OMElement onComplete = elem.getFirstChildWithName(ON_COMPLETE_Q);
+        if (onComplete != null) {
+
+            OMAttribute aggregateExpr = onComplete.getAttribute(EXPRESSION_Q);
+            if (aggregateExpr != null) {
+                try {
+                    AXIOMXPath xp = new 
AXIOMXPath(aggregateExpr.getAttributeValue());
+                    OMElementUtils.addNameSpaces(xp, onComplete, log);
+                    mediator.setAggregationExpression(xp);
+                } catch (JaxenException e) {
+                    handleException("Unable to load the aggregating XPATH", e);
+                }
+            }
+
+            OMAttribute onCompleteSequence = 
onComplete.getAttribute(SEQUENCE_Q);
+            if (onCompleteSequence != null) {
+                
mediator.setOnCompleteSequenceRef(onCompleteSequence.getAttributeValue());
+            } else if (onComplete.getFirstElement() != null) {
+                mediator.setOnCompleteSequence(
+                        (new 
SequenceMediatorFactory()).createAnonymousSequence(onComplete));
+            } else {
+                SequenceMediator sequence = new SequenceMediator();
+                sequence.addChild(new DropMediator());
+                mediator.setOnCompleteSequence(sequence);
+            }
+        }
+        return mediator;
+    }
+
+    public QName getTagQName() {
+        return AGGREGATE_Q;
+    }
+}

Propchange: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java?rev=589942&view=auto
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
 Mon Oct 29 19:34:10 2007
@@ -0,0 +1,89 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
+import org.apache.synapse.mediators.ext.ClassMediator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * <aggregate>
+ *  <corelateOn expression="XPATH-expression"/>
+ *  <completeCondition timeout="time-in-seconds">
+ *   <messageCount min="int-min" max="int-max"/>
+ *  </completeCondition>
+ *  <onComplete expression="XPATH-expression" sequence="sequence-ref">
+ *   (mediator +)?
+ *  </onComplete>
+ *  <invalidate sequence="sequence-ref" timeout="time-in-seconds">
+ *   (mediator +)?
+ *  </invalidate>
+ * </aggregate>
+ */
+public class AggregateMediatorSerializer extends AbstractMediatorSerializer {
+
+    private static final Log log = 
LogFactory.getLog(AggregateMediatorSerializer.class);
+
+    public OMElement serializeMediator(OMElement parent, Mediator m) {
+
+        if (!(m instanceof AggregateMediator)) {
+            handleException("Unsupported mediator passed in for serialization 
: " + m.getType());
+        }
+        AggregateMediator mediator = (AggregateMediator) m;
+        OMElement aggregator = fac.createOMElement("aggregate", synNS);
+        saveTracingState(aggregator, mediator);
+
+        if (mediator.getCorelateExpression() != null) {
+            OMElement corelateOn = fac.createOMElement("corelateOn", synNS);
+            corelateOn.addAttribute("expression", 
mediator.getCorelateExpression().toString(), nullNS);
+            super.serializeNamespaces(corelateOn, 
mediator.getCorelateExpression());
+            aggregator.addChild(corelateOn);
+        }
+
+        OMElement completeCond = fac.createOMElement("completeCondition", 
synNS);
+        if (mediator.getCompleteTimeout() != 0) {
+            completeCond.addAttribute("timeout", "" + 
mediator.getCompleteTimeout(), nullNS);
+        }
+        OMElement messageCount = fac.createOMElement("messageCount", synNS);
+        if (mediator.getMinMessagesToComplete() != 0) {
+            messageCount.addAttribute("min", "" + 
mediator.getMinMessagesToComplete(), nullNS);
+        }
+        if (mediator.getMaxMessagesToComplete() != 0) {
+            messageCount.addAttribute("max", "" + 
mediator.getMaxMessagesToComplete(), nullNS);
+        }
+        completeCond.addChild(messageCount);
+        aggregator.addChild(completeCond);
+
+        OMElement aggregatorElem = fac.createOMElement("aggregator", synNS);
+//        aggregatorElem.addAttribute("type", 
mediator.getAggregator().getClass().getName(), nullNS);
+//        aggregatorElem.addAttribute("expression", mediator.get)
+
+        return aggregator;
+    }
+
+    public String getMediatorClassName() {
+        return AggregateMediator.class.getName();
+    }
+}

Propchange: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=589942&r1=589941&r2=589942&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
 Mon Oct 29 19:34:10 2007
@@ -65,6 +65,7 @@
         POJOCommandMediatorFactory.class,
         CloneMediatorFactory.class,
         IterateMediatorFactory.class,
+        AggregateMediatorFactory.class,
         DBReportMediatorFactory.class,
         DBLookupMediatorFactory.class,
         CacheMediatorFactory.class

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java?rev=589942&r1=589941&r2=589942&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
 Mon Oct 29 19:34:10 2007
@@ -53,6 +53,7 @@
         POJOCommandMediatorSerializer.class,
         CloneMediatorSerializer.class,
         IterateMediatorSerializer.class,
+        AggregateMediatorSerializer.class,
         DBLookupMediatorSerializer.class,
         DBReportMediatorSerializer.class,
         CacheMediatorSerializer.class

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=589942&view=auto
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
 Mon Oct 29 19:34:10 2007
@@ -0,0 +1,190 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.mediators.eip.aggregator;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.mediators.eip.EIPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * This holds the Aggregate properties and the list of messages which 
participate in the aggregation
+ */
+public class Aggregate {
+
+    /**
+     *
+     */
+    private static final Log log = LogFactory.getLog(Aggregate.class);
+
+    /**
+     *
+     */
+    private static final Log trace = 
LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
+
+    /**
+     *
+     */
+    private long timeout = 0;
+
+    /**
+     *
+     */
+    private long expireTime = 0;
+
+    /**
+     *
+     */
+    private int minCount = -1;
+
+    /**
+     *
+     */
+    private int maxCount = -1;
+
+    /**
+     *
+     */
+    private String corelation = null;
+
+    /**
+     *
+     */
+    private List messages = new ArrayList();
+
+    /**
+     * This is the constructor of the Aggregate which will set the timeout 
depending on the
+     * timeout for the aggregate
+     *
+     * @param corelation - String representing the corelation name of the 
messages in the aggregate
+     * @param timeout -
+     * @param min -
+     * @param max -
+     */
+    public Aggregate(String corelation, long timeout, int min, int max) {
+        this.corelation = corelation;
+        if (timeout > 0) {
+            this.timeout = System.currentTimeMillis() + expireTime;
+        }
+        if (min > 0) {
+            this.minCount = min;
+        }
+        if (max > 0) {
+            this.maxCount = max;
+        }
+    }
+
+    /**
+     * @param synCtx -
+     * @return true if the message was added and false if not
+     */
+    public boolean addMessage(MessageContext synCtx) {
+        if (this.maxCount > 0 && this.messages.size() < this.maxCount || 
this.maxCount <= 0) {
+            this.messages.add(synCtx);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * @return boolean stating the completeness of the corelation
+     */
+    public boolean isComplete() {
+
+        boolean completed = false;
+        if (!messages.isEmpty()) {
+
+            Object o = messages.get(0);
+            if (o instanceof MessageContext) {
+
+                Object prop = ((MessageContext) 
o).getProperty(EIPConstants.MESSAGE_SEQUENCE);
+                if (prop instanceof String) {
+
+                    String[] msgSequence
+                            = 
prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
+                    if (messages.size() >= Integer.parseInt(msgSequence[1])) {
+                        completed = true;
+                    }
+                }
+            }
+        }
+
+        if (!completed && this.minCount > 0) {
+            completed = this.messages.size() >= this.minCount
+                    || this.timeout < System.currentTimeMillis();
+        }
+
+        return completed;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getMinCount() {
+        return minCount;
+    }
+
+    public void setMinCount(int minCount) {
+        this.minCount = minCount;
+    }
+
+    public int getMaxCount() {
+        return maxCount;
+    }
+
+    public void setMaxCount(int maxCount) {
+        this.maxCount = maxCount;
+    }
+
+    public String getCorelation() {
+        return corelation;
+    }
+
+    public void setCorelation(String corelation) {
+        this.corelation = corelation;
+    }
+
+    public List getMessages() {
+        return messages;
+    }
+
+    public void setMessages(List messages) {
+        this.messages = messages;
+    }
+
+    public long getExpireTime() {
+        return expireTime;
+    }
+
+    public void setExpireTime(long expireTime) {
+        this.expireTime = expireTime;
+    }
+
+}

Propchange: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=589942&view=auto
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
 Mon Oct 29 19:34:10 2007
@@ -0,0 +1,453 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.mediators.eip.aggregator;
+
+import org.apache.axiom.om.xpath.AXIOMXPath;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.eip.EIPUtils;
+import org.apache.synapse.mediators.eip.EIPConstants;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.jaxen.JaxenException;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * This mediator will aggregate the messages flowing in to this with the 
specified message types
+ * and build a one message
+ */
+public class AggregateMediator extends AbstractMediator {
+
+    private static final Log log = LogFactory.getLog(AggregateMediator.class);
+
+    private static final Log trace = 
LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
+
+    /**
+     * This will hold the maximum lifetime of an aggregate and if a particular 
aggregate does not
+     * completed before its life time it will be invalidated and taken off 
from the activeAggregates
+     * map and put in to the expiredAggregates map and the invalidate sequence 
will be called to
+     * mediate the messages in the expired aggregate if there are any
+     */
+    private long timeToInvalidate = 0;
+
+    /**
+     * Messages comming to the aggregator will be examined for the existance 
of a node described
+     * in this XPATH and if it contains the XPATH pick that, if not try to 
find the messageSequence
+     * property for the corelation and if not pass the message through
+     */
+    private AXIOMXPath corelateExpression = null;
+
+    /**
+     * This will be used in the complete condition to complete the aggregation 
after waiting a
+     * specified timeout and send the messages gatherd in the aggregate after 
aggregation
+     * if there are any messages
+     */
+    private long completeTimeout = 0;
+
+    /**
+     * Minimum number of messages required to evaluate the complete condition 
to true unless the
+     * aggregate has timedout with the provided timeout if there is a one
+     */
+    private int minMessagesToComplete = -1;
+
+    /**
+     * Maximum number of messages that can be contained in a particular 
aggregation
+     */
+    private int maxMessagesToComplete = -1;
+
+    /**
+     * This will hold the implementation of the aggregation algorithm and upon 
validating the
+     * complete condition getAggregatedMessage method of the aggregator will 
be called to get
+     * the aggregated message
+     */
+    private AXIOMXPath aggregationExpression = null;
+
+    /**
+     * Holds a String reference to the Named Sequence which will be called to 
mediate the invalid
+     * messages coming in to the aggregator
+     */
+    private String invalidMsgSequenceRef = null;
+
+    /**
+     * Sequece which will be called to mediate the invalid messages comming in 
to aggregator
+     */
+    private SequenceMediator invalidMsgSequence = null;
+
+    /**
+     * This will be used to destroy the aggreagtes which were kept in the 
expiredAggregates map
+     */
+    private long invlidateToDestroyTime = 0;
+
+    /**
+     * This holds the reference sequence name of the
+     */
+    private String onCompleteSequenceRef = null;
+
+    /**
+     *
+     */
+    private SequenceMediator onCompleteSequence = null;
+
+    /**
+     * This will hold the map of active aggragates at any given time
+     */
+    private Map activeAggregates = new HashMap();
+
+    /**
+     * This will hold the expired aggregates at any given time, these will be 
cleaned by a timer
+     * task time to time in order to ensure uncontroled growth
+     */
+    private Map expiredAggregates = new HashMap();
+
+    private boolean isTimerSet = false;
+
+    public AggregateMediator() {
+        try {
+            aggregationExpression = new 
AXIOMXPath("s11:Body/child::*[position()=1] | " +
+                "s12:Body/child::*[position()=1]");
+            aggregationExpression.addNamespace("s11", 
SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+            aggregationExpression.addNamespace("s12", 
SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+        } catch (JaxenException e) {
+            if (log.isDebugEnabled()) {
+                handleException("Unable to set the default " +
+                    "aggregationExpression for the aggregation", e, null);
+            }
+        }
+    }
+
+    /**
+     * This is the mediate method implementation of the AggregateMediator. And 
this will aggregate
+     * the messages going through this mediator according to the corelation 
criteria and the
+     * aggregation algorith specified to it
+     *
+     * @param synCtx - MessageContext to be mediated and aggregated
+     * @return boolean true if the complete condition for the particular 
aggregate is validated
+     *         false if not
+     */
+    public boolean mediate(MessageContext synCtx) {
+        // tracing and debuggin related mediation initiation
+        boolean traceOn = isTraceOn(synCtx);
+        boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
+
+        if (traceOrDebugOn) {
+            traceOrDebug(traceOn, "Start : Aggregate mediator");
+
+            if (traceOn && trace.isTraceEnabled()) {
+                trace.trace("Message : " + synCtx.getEnvelope());
+            }
+        }
+
+//        todo: revisit this         
+//        if (!isTimerSet) {
+//            synCtx.getConfiguration().getSynapseTimer()
+//                    .schedule(new AggregateCollector(this), 5000);
+//        }
+
+        try {
+            Aggregate aggregate = null;
+
+            // if the corelate aggregationExpression is provided and there is 
a coresponding
+            // element in the message corelate the messages on that
+            if (this.corelateExpression != null
+                    && this.corelateExpression.evaluate(synCtx.getEnvelope()) 
!= null) {
+
+                if 
(activeAggregates.containsKey(this.corelateExpression.toString())) {
+                    Object o = 
activeAggregates.get(this.corelateExpression.toString());
+                    if (o instanceof Aggregate) {
+                        aggregate = (Aggregate) o;
+                    } else {
+                        handleException("Undefined aggregate type.", synCtx);
+                    }
+                } else {
+                    aggregate = new 
Aggregate(this.corelateExpression.toString(),
+                            this.completeTimeout, this.minMessagesToComplete,
+                            this.maxMessagesToComplete);
+                    activeAggregates.put(this.corelateExpression.toString(), 
aggregate);
+                }
+
+            // if the corelattion can not be found using the 
aggregationExpression try to find the
+            // corelation on the default criteria which is through the 
aggregate corelation
+            // property of the message
+            } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORELATION) 
!= null) {
+
+                String corelation = synCtx.getProperty(
+                    EIPConstants.AGGREGATE_CORELATION) instanceof String ? 
synCtx.getProperty(
+                    EIPConstants.AGGREGATE_CORELATION).toString() : null;
+
+                // check whether the message corelation name is in the expired 
aggregates
+                if (expiredAggregates.containsKey(corelation)) {
+
+                    if (traceOrDebugOn) {
+                        traceOrDebug(traceOn, "Message with the corelation "
+                                + corelation + " expired. Invalidating the 
message.");
+                    }
+
+                    invalidate(synCtx, traceOrDebugOn, traceOn);
+                    return false;
+                }
+
+                if (corelation != null) {
+
+                    if (activeAggregates.containsKey(corelation)) {
+
+                        Object o = activeAggregates.get(corelation);
+                        if (o instanceof Aggregate) {
+                            aggregate = (Aggregate) o;
+                        } else {
+                            handleException("Undefined aggregate type.", 
synCtx);
+                        }
+
+                    } else {
+                        aggregate = new Aggregate(corelation, 
this.completeTimeout,
+                                this.minMessagesToComplete, 
this.maxMessagesToComplete);
+                        activeAggregates.put(corelation, aggregate);
+                    }
+
+                } else {
+                    if (traceOrDebugOn) {
+                        traceOrDebug(traceOn,
+                            "Error in getting corelation details. Skip the 
aggregator.");
+                    }
+                    return true;
+                }
+            } else {
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn,
+                        "Unable to find the aggregation corelation. Skip the 
aggregation");
+                }
+                return true;
+            }
+
+            // if there is an aggregate continue on aggregation
+            if (aggregate != null) {
+
+                // add the message to the aggregate and if the maximum count 
of the aggregate is
+                // exceeded invalidate the message
+                if (!aggregate.addMessage(synCtx)) {
+                    if (traceOrDebugOn) {
+                        traceOrDebug(traceOn, "Can not exceed aggregate " +
+                                "max message count. Invalidating message");
+                    }
+                    invalidate(synCtx, traceOrDebugOn, traceOn);
+                    return false;
+                }
+
+                // check the completeness of the aggregate and is completed 
aggregate the messages
+                // if not completed return false and block the message 
sequence till it completes
+                if (aggregate.isComplete()) {
+                    return completeAggregate(aggregate);
+                }
+
+            // if the aggregation corelation can not be found then continue 
the message on the
+            // normal path by returning true
+            } else {
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, "Unable to find the aggregate. Skip 
the aggregation");
+                }
+                return true;
+            }
+
+        } catch (JaxenException e) {
+            handleException("Unable to execute the XPATH over the message", e, 
synCtx);
+        }
+
+        // finalize tracing and debugging
+        if (traceOrDebugOn) {
+            traceOrDebug(traceOn, "End : Aggregate mediator");
+        }
+
+        return false;
+    }
+
+    private void invalidate(MessageContext synCtx, boolean traceOrDebugOn, 
boolean traceOn) {
+
+        if (this.invalidMsgSequenceRef != null && synCtx.getConfiguration()
+                .getSequence(invalidMsgSequenceRef) != null) {
+
+            // use the sequence reference to get the sequence for mediation
+            
synCtx.getConfiguration().getSequence(invalidMsgSequenceRef).mediate(synCtx);
+
+        } else if (this.invalidMsgSequence != null) {
+
+            // use the sequence to mediate the invalidated messages
+            invalidMsgSequence.mediate(synCtx);
+
+        } else {
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, "No invalid message sequence defined. 
Dropping the message");
+            }
+        }
+    }
+
+    public boolean completeAggregate(Aggregate aggregate) {
+
+            MessageContext newSynCtx = getAggregatedMessage(aggregate);
+            activeAggregates.remove(aggregate.getCorelation());
+
+            if ((this.corelateExpression != null && !this.corelateExpression
+                    .toString().equals(aggregate.getCorelation())) ||
+                    this.corelateExpression == null) {
+
+//                            aggregate.setExpireTime(
+//                                    System.currentTimeMillis() + 
this.invlidateToDestroyTime);
+                expiredAggregates.put(aggregate.getCorelation(),
+                        new Long(System.currentTimeMillis() + 
this.invlidateToDestroyTime));
+
+                if (this.onCompleteSequence != null) {
+                    this.onCompleteSequence.mediate(newSynCtx);
+                } else if (this.onCompleteSequenceRef != null
+                        && newSynCtx.getSequence(this.onCompleteSequenceRef) 
!= null) {
+                    
newSynCtx.getSequence(this.onCompleteSequenceRef).mediate(newSynCtx);
+                } else {
+                    handleException("Unable to find the sequence for the 
mediation " +
+                            "of the aggregated message", newSynCtx);
+                }
+                return false;
+            } else {
+                return true;
+            }
+    }
+
+    public MessageContext getAggregatedMessage(Aggregate aggregate) {
+        MessageContext newCtx = null;
+        Iterator itr = aggregate.getMessages().iterator();
+        while (itr.hasNext()) {
+            Object o = itr.next();
+            if (o instanceof MessageContext) {
+                MessageContext synCtx = (MessageContext) o;
+                if (newCtx == null) {
+                    newCtx = synCtx;
+                } else {
+                    try {
+                        EIPUtils.enrichEnvelope(
+                            newCtx.getEnvelope(), synCtx.getEnvelope(), 
this.aggregationExpression);
+                    } catch (JaxenException e) {
+                        handleException("Unable to get the aggreagated 
message", e, synCtx);
+                    }
+                }
+            }
+        }
+        return newCtx;
+    }
+
+    public AXIOMXPath getCorelateExpression() {
+        return corelateExpression;
+    }
+
+    public void setCorelateExpression(AXIOMXPath corelateExpression) {
+        this.corelateExpression = corelateExpression;
+    }
+
+    public String getInvalidMsgSequenceRef() {
+        return invalidMsgSequenceRef;
+    }
+
+    public void setInvalidMsgSequenceRef(String invalidMsgSequenceRef) {
+        this.invalidMsgSequenceRef = invalidMsgSequenceRef;
+    }
+
+    public SequenceMediator getInvalidMsgSequence() {
+        return invalidMsgSequence;
+    }
+
+    public void setInvalidMsgSequence(SequenceMediator invalidMsgSequence) {
+        this.invalidMsgSequence = invalidMsgSequence;
+    }
+
+    public long getTimeToInvalidate() {
+        return timeToInvalidate;
+    }
+
+    public void setTimeToInvalidate(long timeToInvalidate) {
+        this.timeToInvalidate = timeToInvalidate;
+    }
+
+    public long getCompleteTimeout() {
+        return completeTimeout;
+    }
+
+    public void setCompleteTimeout(long completeTimeout) {
+        this.completeTimeout = completeTimeout;
+    }
+
+    public int getMinMessagesToComplete() {
+        return minMessagesToComplete;
+    }
+
+    public void setMinMessagesToComplete(int minMessagesToComplete) {
+        this.minMessagesToComplete = minMessagesToComplete;
+    }
+
+    public int getMaxMessagesToComplete() {
+        return maxMessagesToComplete;
+    }
+
+    public void setMaxMessagesToComplete(int maxMessagesToComplete) {
+        this.maxMessagesToComplete = maxMessagesToComplete;
+    }
+
+    public AXIOMXPath getAggregationExpression() {
+        return aggregationExpression;
+    }
+
+    public void setAggregationExpression(AXIOMXPath aggregationExpression) {
+        this.aggregationExpression = aggregationExpression;
+    }
+
+    public long getInvlidateToDestroyTime() {
+        return invlidateToDestroyTime;
+    }
+
+    public void setInvlidateToDestroyTime(long invlidateToDestroyTime) {
+        this.invlidateToDestroyTime = invlidateToDestroyTime;
+    }
+
+    public String getOnCompleteSequenceRef() {
+        return onCompleteSequenceRef;
+    }
+
+    public void setOnCompleteSequenceRef(String onCompleteSequenceRef) {
+        this.onCompleteSequenceRef = onCompleteSequenceRef;
+    }
+
+    public SequenceMediator getOnCompleteSequence() {
+        return onCompleteSequence;
+    }
+
+    public void setOnCompleteSequence(SequenceMediator onCompleteSequence) {
+        this.onCompleteSequence = onCompleteSequence;
+    }
+
+    public Map getExpiredAggregates() {
+        return expiredAggregates;
+    }
+
+    public Map getActiveAggregates() {
+        return activeAggregates;
+    }
+}

Propchange: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to