Author: ningjiang
Date: Fri Jun 27 05:52:56 2008
New Revision: 672261
URL: http://svn.apache.org/viewvc?rev=672261&view=rev
Log:
Walk around the CAMEL-642 by using direct endpoint instead of jms endpoint
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Bank.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Bank.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Bank.java?rev=672261&r1=672260&r2=672261&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Bank.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Bank.java
Fri Jun 27 05:52:56 2008
@@ -32,7 +32,7 @@
}
public void process(Exchange exchange) throws Exception {
- LOG.info("Receiving bank request");
+ LOG.debug("Receiving bank request");
String ssn =
(String)exchange.getIn().getHeader(Constants.PROPERTY_SSN);
double rate = Math.random() * 10;
LOG.info("The bank: " + bankName + " for client: " + ssn + " 's rate "
+ rate);
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=672261&r1=672260&r2=672261&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
Fri Jun 27 05:52:56 2008
@@ -18,12 +18,15 @@
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
//START SNIPPET: aggregation
public class BankResponseAggregationStrategy implements AggregationStrategy {
-
+ private static final transient Log LOG =
LogFactory.getLog(BankResponseAggregationStrategy.class);
// Here we put the bank response together
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ LOG.debug("Get the exchange to aggregate, older: " + oldExchange + "
newer:" + newExchange);
Integer old = (Integer) oldExchange.getProperty("aggregated");
Double oldRate = (Double)
oldExchange.getIn().getHeader(Constants.PROPERTY_RATE);
Double newRate = (Double)
newExchange.getIn().getHeader(Constants.PROPERTY_RATE);
@@ -36,6 +39,7 @@
} else {
result = newExchange;
}
+ LOG.debug("Get the lower rate exchange " + result);
// Set the property for the completeness condition
result.setProperty("aggregated", old + 1);
return result;
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java?rev=672261&r1=672260&r2=672261&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
Fri Jun 27 05:52:56 2008
@@ -48,11 +48,12 @@
template.sendBodyAndHeader("test-jms:queue:loanRequestQueue",
"Quote for the lowerst rate of loaning
bank",
Constants.PROPERTY_SSN, "Client" + i);
+ Thread.sleep(100);
}
// END SNIPPET: sending
// Start the loan broker
- Thread.sleep(1000 * 60);
+ Thread.sleep(1000 * 5);
context.stop();
}
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java?rev=672261&r1=672260&r2=672261&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
Fri Jun 27 05:52:56 2008
@@ -21,11 +21,16 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spring.Main;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
@@ -55,6 +60,7 @@
context.addRoutes(new LoanBroker());
// Start the loan broker
context.start();
+ System.out.println("Server is ready");
Thread.sleep(5 * 60 * 1000);
context.stop();
@@ -79,13 +85,13 @@
from("test-jms:queue:creditResponseQueue").multicast().to("test-jms:queue:bank1",
"test-jms:queue:bank2", "test-jms:queue:bank3");
// Each bank processor will process the message and put the response
message into the bankReplyQueue
- from("test-jms:queue:bank1").process(new
Bank("bank1")).to("test-jms:queue:bankReplyQueue");
- from("test-jms:queue:bank2").process(new
Bank("bank2")).to("test-jms:queue:bankReplyQueue");
- from("test-jms:queue:bank3").process(new
Bank("bank3")).to("test-jms:queue:bankReplyQueue");
+ from("test-jms:queue:bank1").process(new
Bank("bank1")).to("direct:queue:bankReplyQueue");
+ from("test-jms:queue:bank2").process(new
Bank("bank2")).to("direct:queue:bankReplyQueue");
+ from("test-jms:queue:bank3").process(new
Bank("bank3")).to("direct:queue:bankReplyQueue");
// Now we aggregating the response message by using the
Constants.PROPERTY_SSN header
// The aggregation will completed when all the three bank responses
are received
- from("test-jms:queue:bankReplyQueue")
+ from("direct:queue:bankReplyQueue")
.aggregator(header(Constants.PROPERTY_SSN), new
BankResponseAggregationStrategy())
.completedPredicate(header("aggregated").isEqualTo(3))