Author: ningjiang
Date: Thu Oct 23 21:04:22 2008
New Revision: 707537

URL: http://svn.apache.org/viewvc?rev=707537&view=rev
Log:
CAMEL-1020 Added the correlation of request and response example for the loan 
broker 

Modified:
    
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
    
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
    
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
    
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java

Modified: 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
--- 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
 (original)
+++ 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
 Thu Oct 23 21:04:22 2008
@@ -17,20 +17,35 @@
 package org.apache.camel.loanbroker.queue.version;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 //START SNIPPET: aggregation
-public class BankResponseAggregationStrategy implements AggregationStrategy {
+public class BankResponseAggregationStrategy implements AggregationStrategy {  
  
     private static final transient Log LOG = 
LogFactory.getLog(BankResponseAggregationStrategy.class);
-
+    private boolean aggregatingOutMessage;
+    
+    public BankResponseAggregationStrategy setAggregatingOutMessage(boolean 
flag) {
+        aggregatingOutMessage = flag;
+        return this;
+    }
+    
     // Here we put the bank response together
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         LOG.debug("Get the exchange to aggregate, older: " + oldExchange + " 
newer:" + newExchange);
-
-        Double oldRate = 
oldExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
-        Double newRate = 
newExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
+        Message oldMessage = null;
+        Message newMessage = null;
+        if (aggregatingOutMessage) {
+            oldMessage = oldExchange.getOut(false);
+            newMessage = newExchange.getOut(false);
+        } else {
+            oldMessage = oldExchange.getIn();
+            newMessage = newExchange.getIn();
+        }
+        Double oldRate = oldMessage.getHeader(Constants.PROPERTY_RATE, 
Double.class);
+        Double newRate = newMessage.getHeader(Constants.PROPERTY_RATE, 
Double.class);
         Exchange result = null;
 
         if (newRate >= oldRate) {

Modified: 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
--- 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
 (original)
+++ 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/Client.java
 Thu Oct 23 21:04:22 2008
@@ -21,6 +21,7 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
@@ -47,9 +48,27 @@
         for (int i = 0; i < 2; i++) {
             template.sendBodyAndHeader("jms:queue:loanRequestQueue",
                                        "Quote for the lowerst rate of loaning 
bank",
-                                       Constants.PROPERTY_SSN, "Client" + i);
+                                       Constants.PROPERTY_SSN, "Client-A" + i);
             Thread.sleep(100);
         }
+        // wait for the response
+        Thread.sleep(2000);
+        
+        // send the request and get the response from the same queue       
+        Exchange exchange = 
template.send("jms:queue2:parallelLoanRequestQueue", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.setPattern(ExchangePattern.InOut);
+                exchange.getIn().setBody("Quote for the lowerst rate of 
loaning bank");
+                exchange.getIn().setHeader(Constants.PROPERTY_SSN, "Client-B");
+            }
+        });
+        
+        String bank = 
(String)exchange.getOut().getHeader(Constants.PROPERTY_BANK);
+        Double rate = 
(Double)exchange.getOut().getHeader(Constants.PROPERTY_RATE);
+        String ssn = 
(String)exchange.getOut().getHeader(Constants.PROPERTY_SSN);
+        System.out.println("Loan quotion for Client " + ssn + "."
+                           + " The lowest rate bank is " + bank + ", the rate 
is " + rate);
+        
         // Wait a while before stop the context
         Thread.sleep(1000 * 5);
         context.stop();

Modified: 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
--- 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
 (original)
+++ 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
 Thu Oct 23 21:04:22 2008
@@ -93,5 +93,21 @@
             .process(new Translator()).to("jms:queue:loanReplyQueue");
 
     // END SNIPPET: dsl
+        
+    // START SNIPPET: dsl-2
+        // CreditAgency will get the request from parallelLoanRequestQueue
+        from("jms:queue2:parallelLoanRequestQueue").process(new CreditAgency())
+            // Set the aggregation strategy for aggregating the out message    
        
+            .multicast(new 
BankResponseAggregationStrategy().setAggregatingOutMessage(true))
+                // Send out the request the below three different banks 
parallelly
+                .setParallelProcessing(true).to("jms:queue2:bank1", 
"jms:queue2:bank2", "jms:queue2:bank3");
+        
+        // Each bank processor will process the message and put the response 
message back
+        from("jms:queue2:bank1").process(new Bank("bank1"));
+        from("jms:queue2:bank2").process(new Bank("bank2"));
+        from("jms:queue2:bank3").process(new Bank("bank3"));
+        
+
+    // END SNIPPET: dsl-2
     }
 }

Modified: 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java?rev=707537&r1=707536&r2=707537&view=diff
==============================================================================
--- 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
 (original)
+++ 
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
 Thu Oct 23 21:04:22 2008
@@ -92,9 +92,11 @@
         // Router 2 to call the bank endpoints parallelly
         from(Constants.PARALLEL_LOANBROKER_URI)
             .process(new CreditScoreProcessor(Constants.CREDITAGENCY_ADDRESS))
-                // Using the thread pool to send out message to the below 
three different banks parallelly,
-                // the parameter of true is for processing the output 
processors parallelly
-                .multicast(new BankResponseAggregationStrategy(), true)
+                // Using the thread pool to send out message to the below 
three different banks parallelly                
+                .multicast(new BankResponseAggregationStrategy())
+                    // Camel will create a thread pool with the size of the 
send to endpoints
+                    // for sending the message parallelly
+                    .setParallelProcessing(true)
                     .to(Constants.BANK1_URI, Constants.BANK2_URI, 
Constants.BANK3_URI);
 
     //END SNIPPET: dsl


Reply via email to