|
Page Created :
CAMEL :
Loan Broker Example
Loan Broker Example has been created by willem jiang (Jun 06, 2008). Content:Loan Broker ExampleThis example shows how to use Camel to implement the EIP's loan broker example The example has two versions, one is queue version which leverages the message queue to combinate the credit agency and bank loan quote processing and it uses the InOnly exchage pattern; the other is web service version which shows how to integrate the credit agency and bank web services together and it uses the InOut exchange pattern. Implementation with message queue (JMS)The queue version of loan broker The example should run if you type To stop the example hit ctrl + c let's take a look how this service modules are put together. // Put the message from loanRequestQueue to the creditRequestQueue from("test-jms:queue:loanRequestQueue").to("test-jms:queue:creditRequestQueue"); // Now we can let the CreditAgency process the request, then the message will be put into creditResponseQueue from("test-jms:queue:creditRequestQueue").process(new CreditAgency()).to("test-jms:queue:creditResponseQueue"); // Here we use the multicast pattern to send the message to three different bank queue from("test-jms:queue:creditResponseQueue").multicast().to("test-jms:queue:bank1", "test-jms:queue:bank2", "test-jms:queue:bank3"); // Each bank process will process the message and put the response message into the bankReplyQueue from("test-jms:queue:bank1").process(new Bank("bank1")).to("test-jms:queue:bankReplyQueue"); from("test-jms:queue:bank2").process(new Bank("bank2")).to("test-jms:queue:bankReplyQueue"); from("test-jms:queue:bank3").process(new Bank("bank3")).to("test-jms:queue:bankReplyQueue"); // Now we aggregating the response message by using the Constants.PROPERTY_SSN header // The aggregation will completed when the three bank responses are received from("test-jms:queue:bankReplyQueue") .aggregator(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy()) .completedPredicate(header("aggregated").isEqualTo(3)) // Here we do some translation and put the message back to loanReplyQueue .process(new Translator()).to("test-jms:queue:loanReplyQueue"); The CreditAgency , Bank and Translator are all the implementation of Processor interface. We implement the business logicals in the void process(Exchange exchange) method. CreditAgency public class CreditAgency implements Processor { private static final transient Log LOG = LogFactory.getLog(CreditAgency.class); public void process(Exchange exchange) throws Exception { LOG.info("Receiving credit agency request"); String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN); int score = (int) (Math.random() * 600 + 300); int hlength = (int) (Math.random() * 19 + 1); exchange.getOut().setHeader(Constants.PROPERTY_SCORE, new Integer(score)); exchange.getOut().setHeader(Constants.PROPERTY_HISTORYLENGTH, new Integer(hlength)); exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn); } } Bank public class Bank implements Processor { private static final transient Log LOG = LogFactory.getLog(Bank.class); private String bankName; public Bank(String name) { bankName = name; } public void process(Exchange exchange) throws Exception { LOG.info("Receiving bank request"); String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN); double rate = Math.random() * 10; LOG.info("The bank: " + bankName + " for client: " + ssn + " 's rate " + rate); exchange.getOut().setHeader(Constants.PROPERTY_RATE, new Double(rate)); exchange.getOut().setHeader(Constants.PROPERTY_BANK, bankName); exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn); // Sleep some time try { Thread.sleep((int) (Math.random() * 10) * 100); } catch (InterruptedException e) { // Discard } } } Translator public class Translator implements Processor { public void process(Exchange exchange) throws Exception { String bank = (String)exchange.getIn().getHeader(Constants.PROPERTY_BANK); Double rate = (Double)exchange.getIn().getHeader(Constants.PROPERTY_RATE); String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN); exchange.getOut().setBody("Loan quotion for Client " + ssn + "The lowest rate of bank is " + bank + ", the rate is " + rate); } } You may found we set a custom aggregation strategy to find out the lowest loan rate from bank response message. public class BankResponseAggregationStrategy implements AggregationStrategy { // Here we put the bank response together public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Integer old = (Integer) oldExchange.getProperty("aggregated"); Double oldRate = (Double) oldExchange.getIn().getHeader(Constants.PROPERTY_RATE); Double newRate = (Double) newExchange.getIn().getHeader(Constants.PROPERTY_RATE); Exchange result = null; if (old == null) { old = 1; } if (newRate >= oldRate) { result = oldExchange; } else { result = newExchange; } // Set the property for the completeness condition result.setProperty("aggregated", old + 1); return result; } } We start the loan broker after we start up the ActiveMq broker and the connection factory of Camel-JMS component. public static void main(String... args) throws Exception { CamelContext context = new DefaultCamelContext(); JmsBroker broker = new JmsBroker(); broker.start(); // Set up the ActiveMQ JMS Components ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // Note we can explicitly name the component context.addComponent("test-jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new LoanBroker()); // Start the loan broker context.start(); Thread.sleep(5 * 60 * 1000); context.stop(); Thread.sleep(1000); broker.stop(); } /** * Lets configure the Camel routing rules using Java code... */ public void configure() { // Put the message from loanRequestQueue to the creditRequestQueue from("test-jms:queue:loanRequestQueue").to("test-jms:queue:creditRequestQueue"); // Now we can let the CreditAgency process the request, then the message will be put into creditResponseQueue from("test-jms:queue:creditRequestQueue").process(new CreditAgency()).to("test-jms:queue:creditResponseQueue"); // Here we use the multicast pattern to send the message to three different bank queue from("test-jms:queue:creditResponseQueue").multicast().to("test-jms:queue:bank1", "test-jms:queue:bank2", "test-jms:queue:bank3"); // Each bank process will process the message and put the response message into the bankReplyQueue from("test-jms:queue:bank1").process(new Bank("bank1")).to("test-jms:queue:bankReplyQueue"); from("test-jms:queue:bank2").process(new Bank("bank2")).to("test-jms:queue:bankReplyQueue"); from("test-jms:queue:bank3").process(new Bank("bank3")).to("test-jms:queue:bankReplyQueue"); // Now we aggregating the response message by using the Constants.PROPERTY_SSN header // The aggregation will completed when the three bank responses are received from("test-jms:queue:bankReplyQueue") .aggregator(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy()) .completedPredicate(header("aggregated").isEqualTo(3)) // Here we do some translation and put the message back to loanReplyQueue .process(new Translator()).to("test-jms:queue:loanReplyQueue"); } } Now we can send the request from client // send out the request message for (int i = 0; i < 2; i++) { template.sendBodyAndHeader("test-jms:queue:loanRequestQueue", "Quote for the lowerst rate of loaning bank", Constants.PROPERTY_SSN, "Client" + i); } Here is how we pull the response message from("test-jms:queue:loanReplyQueue").process(new Processor() { public void process(Exchange exchange) throws Exception { // Print out the response message System.out.println(exchange.getIn().getBody()); } }); |
Unsubscribe or edit your notifications preferences
