Author: ruwan
Date: Thu Jan 10 22:52:15 2008
New Revision: 611068
URL: http://svn.apache.org/viewvc?rev=611068&view=rev
Log:
Fixing the timeout based completion of the aggregate mediator
Modified:
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
Modified:
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=611068&r1=611067&r2=611068&view=diff
==============================================================================
---
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
(original)
+++
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
Thu Jan 10 22:52:15 2008
@@ -27,11 +27,12 @@
import java.util.List;
import java.util.ArrayList;
+import java.util.TimerTask;
/**
* This holds the Aggregate properties and the list of messages which
participate in the aggregation
*/
-public class Aggregate {
+public class Aggregate extends TimerTask {
/**
*
@@ -68,10 +69,12 @@
*/
private String corelation = null;
+ private AggregateMediator mediator = null;
+
/**
*
*/
- private List messages = new ArrayList();
+ private List<MessageContext> messages = new ArrayList<MessageContext>();
/**
* This is the constructor of the Aggregate which will set the timeout
depending on the
@@ -81,8 +84,9 @@
* @param timeout -
* @param min -
* @param max -
+ * @param mediator -
*/
- public Aggregate(String corelation, long timeout, int min, int max) {
+ public Aggregate(String corelation, long timeout, int min, int max,
AggregateMediator mediator) {
this.corelation = corelation;
if (timeout > 0) {
this.timeout = System.currentTimeMillis() + expireTime;
@@ -93,6 +97,7 @@
if (max > 0) {
this.maxCount = max;
}
+ this.mediator = mediator;
}
/**
@@ -175,7 +180,7 @@
return messages;
}
- public void setMessages(List messages) {
+ public void setMessages(List<MessageContext> messages) {
this.messages = messages;
}
@@ -187,4 +192,7 @@
this.expireTime = expireTime;
}
+ public void run() {
+ mediator.completeAggregate(this);
+ }
}
Modified:
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=611068&r1=611067&r2=611068&view=diff
==============================================================================
---
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
(original)
+++
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
Thu Jan 10 22:52:15 2008
@@ -116,7 +116,7 @@
/**
* This will hold the map of active aggregates at any given time
*/
- private Map activeAggregates = new HashMap();
+ private Map<String, Aggregate> activeAggregates = new HashMap<String,
Aggregate>();
/**
* This will hold the expired aggregates at any given time, these will be
cleaned by a timer
@@ -162,7 +162,7 @@
}
}
-// todo: revisit this
+// todo: revisit this
// if (!isTimerSet) {
// synCtx.getConfiguration().getSynapseTimer()
// .schedule(new AggregateCollector(this), 5000);
@@ -186,7 +186,8 @@
} else {
aggregate = new
Aggregate(this.corelateExpression.toString(),
this.completeTimeout, this.minMessagesToComplete,
- this.maxMessagesToComplete);
+ this.maxMessagesToComplete, this);
+
synCtx.getConfiguration().getSynapseTimer().schedule(aggregate,
completeTimeout);
activeAggregates.put(this.corelateExpression.toString(),
aggregate);
}
@@ -224,7 +225,8 @@
} else {
aggregate = new Aggregate(corelation,
this.completeTimeout,
- this.minMessagesToComplete,
this.maxMessagesToComplete);
+ this.minMessagesToComplete,
this.maxMessagesToComplete, this);
+
synCtx.getConfiguration().getSynapseTimer().schedule(aggregate,
completeTimeout);
activeAggregates.put(corelation, aggregate);
}
@@ -311,12 +313,12 @@
if ((this.corelateExpression != null && !this.corelateExpression
.toString().equals(aggregate.getCorelation())) ||
- this.corelateExpression == null) {
+ this.corelateExpression == null) {
-// aggregate.setExpireTime(
-// System.currentTimeMillis() +
this.invlidateToDestroyTime);
+// aggregate.setExpireTime(
+// System.currentTimeMillis() +
this.invlidateToDestroyTime);
expiredAggregates.put(aggregate.getCorelation(),
- new Long(System.currentTimeMillis() +
this.invlidateToDestroyTime));
+ new Long(System.currentTimeMillis() +
this.invlidateToDestroyTime));
if (this.onCompleteSequence != null) {
this.onCompleteSequence.mediate(newSynCtx);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]