Author: ningjiang
Date: Thu Oct 23 21:04:22 2008
New Revision: 707537
URL: http://svn.apache.org/viewvc?rev=707537&view=rev
Log:
CAMEL-1020 Added the correlation of request and response example for the loan
broker
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
Thu Oct 23 21:04:22 2008
@@ -17,20 +17,35 @@
package org.apache.camel.loanbroker.queue.version;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
//START SNIPPET: aggregation
-public class BankResponseAggregationStrategy implements AggregationStrategy {
+public class BankResponseAggregationStrategy implements AggregationStrategy {
private static final transient Log LOG =
LogFactory.getLog(BankResponseAggregationStrategy.class);
-
+ private boolean aggregatingOutMessage;
+
+ public BankResponseAggregationStrategy setAggregatingOutMessage(boolean
flag) {
+ aggregatingOutMessage = flag;
+ return this;
+ }
+
// Here we put the bank response together
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
LOG.debug("Get the exchange to aggregate, older: " + oldExchange + "
newer:" + newExchange);
-
- Double oldRate =
oldExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
- Double newRate =
newExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
+ Message oldMessage = null;
+ Message newMessage = null;
+ if (aggregatingOutMessage) {
+ oldMessage = oldExchange.getOut(false);
+ newMessage = newExchange.getOut(false);
+ } else {
+ oldMessage = oldExchange.getIn();
+ newMessage = newExchange.getIn();
+ }
+ Double oldRate = oldMessage.getHeader(Constants.PROPERTY_RATE,
Double.class);
+ Double newRate = newMessage.getHeader(Constants.PROPERTY_RATE,
Double.class);
Exchange result = null;
if (newRate >= oldRate) {
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
Thu Oct 23 21:04:22 2008
@@ -21,6 +21,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
@@ -47,9 +48,27 @@
for (int i = 0; i < 2; i++) {
template.sendBodyAndHeader("jms:queue:loanRequestQueue",
"Quote for the lowerst rate of loaning
bank",
- Constants.PROPERTY_SSN, "Client" + i);
+ Constants.PROPERTY_SSN, "Client-A" + i);
Thread.sleep(100);
}
+ // wait for the response
+ Thread.sleep(2000);
+
+ // send the request and get the response from the same queue
+ Exchange exchange =
template.send("jms:queue2:parallelLoanRequestQueue", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.setPattern(ExchangePattern.InOut);
+ exchange.getIn().setBody("Quote for the lowerst rate of
loaning bank");
+ exchange.getIn().setHeader(Constants.PROPERTY_SSN, "Client-B");
+ }
+ });
+
+ String bank =
(String)exchange.getOut().getHeader(Constants.PROPERTY_BANK);
+ Double rate =
(Double)exchange.getOut().getHeader(Constants.PROPERTY_RATE);
+ String ssn =
(String)exchange.getOut().getHeader(Constants.PROPERTY_SSN);
+ System.out.println("Loan quotion for Client " + ssn + "."
+ + " The lowest rate bank is " + bank + ", the rate
is " + rate);
+
// Wait a while before stop the context
Thread.sleep(1000 * 5);
context.stop();
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
Thu Oct 23 21:04:22 2008
@@ -93,5 +93,21 @@
.process(new Translator()).to("jms:queue:loanReplyQueue");
// END SNIPPET: dsl
+
+ // START SNIPPET: dsl-2
+ // CreditAgency will get the request from parallelLoanRequestQueue
+ from("jms:queue2:parallelLoanRequestQueue").process(new CreditAgency())
+ // Set the aggregation strategy for aggregating the out message
+ .multicast(new
BankResponseAggregationStrategy().setAggregatingOutMessage(true))
+ // Send out the request the below three different banks
parallelly
+ .setParallelProcessing(true).to("jms:queue2:bank1",
"jms:queue2:bank2", "jms:queue2:bank3");
+
+ // Each bank processor will process the message and put the response
message back
+ from("jms:queue2:bank1").process(new Bank("bank1"));
+ from("jms:queue2:bank2").process(new Bank("bank2"));
+ from("jms:queue2:bank3").process(new Bank("bank3"));
+
+
+ // END SNIPPET: dsl-2
}
}
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
Thu Oct 23 21:04:22 2008
@@ -92,9 +92,11 @@
// Router 2 to call the bank endpoints parallelly
from(Constants.PARALLEL_LOANBROKER_URI)
.process(new CreditScoreProcessor(Constants.CREDITAGENCY_ADDRESS))
- // Using the thread pool to send out message to the below
three different banks parallelly,
- // the parameter of true is for processing the output
processors parallelly
- .multicast(new BankResponseAggregationStrategy(), true)
+ // Using the thread pool to send out message to the below
three different banks parallelly
+ .multicast(new BankResponseAggregationStrategy())
+ // Camel will create a thread pool with the size of the
send to endpoints
+ // for sending the message parallelly
+ .setParallelProcessing(true)
.to(Constants.BANK1_URI, Constants.BANK2_URI,
Constants.BANK3_URI);
//END SNIPPET: dsl