Loan Broker Example has been created by willem jiang (Jun 06, 2008).

Content:

Loan Broker Example

This example shows how to use Camel to implement the EIP's loan broker example.

The example has two versions, one is queue version which leverages the message queue to combinate the credit agency and bank loan quote processing and it uses the InOnly exchage pattern; the other is web service version which shows how to integrate the credit agency and bank web services together and it uses the InOut exchange pattern.

Implementation with message queue (JMS)

The queue version of loan broker is based on the Camel-JMS component, and it shows how to using the message queue to connect the different service models (such as the credit agency , and banks).

The example should run if you type
mvn exec:java -PQueue.LoanBroker
mvn exec:java -PQueue.Client

To stop the example hit ctrl + c

let's take a look how this service modules are put together.

// Put the message from loanRequestQueue to the creditRequestQueue
from("test-jms:queue:loanRequestQueue").to("test-jms:queue:creditRequestQueue");

// Now we can let the CreditAgency process the request, then the message will be put into creditResponseQueue
from("test-jms:queue:creditRequestQueue").process(new CreditAgency()).to("test-jms:queue:creditResponseQueue");

// Here we use the multicast pattern to send the message to three different bank queue
from("test-jms:queue:creditResponseQueue").multicast().to("test-jms:queue:bank1", "test-jms:queue:bank2", "test-jms:queue:bank3");

// Each bank process will process the message and put the response message into the bankReplyQueue
from("test-jms:queue:bank1").process(new Bank("bank1")).to("test-jms:queue:bankReplyQueue");
from("test-jms:queue:bank2").process(new Bank("bank2")).to("test-jms:queue:bankReplyQueue");
from("test-jms:queue:bank3").process(new Bank("bank3")).to("test-jms:queue:bankReplyQueue");

// Now we aggregating the response message by using the Constants.PROPERTY_SSN header
// The aggregation will completed when the three bank responses are received
from("test-jms:queue:bankReplyQueue")
    .aggregator(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy())
    .completedPredicate(header("aggregated").isEqualTo(3))

// Here we do some translation and put the message back to loanReplyQueue
.process(new Translator()).to("test-jms:queue:loanReplyQueue");

The CreditAgency , Bank and Translator are all the implementation of Processor interface. We implement the business logicals in the void process(Exchange exchange) method.

CreditAgency

public class CreditAgency implements Processor {
    private static final transient Log LOG = LogFactory.getLog(CreditAgency.class);

    public void process(Exchange exchange) throws Exception {
        LOG.info("Receiving credit agency request");
        String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN);
        int score = (int) (Math.random() * 600 + 300);
        int hlength = (int) (Math.random() * 19 + 1);
        exchange.getOut().setHeader(Constants.PROPERTY_SCORE, new Integer(score));
        exchange.getOut().setHeader(Constants.PROPERTY_HISTORYLENGTH, new Integer(hlength));
        exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn);
    }

}

Bank

public class Bank implements Processor {
    private static final transient Log LOG = LogFactory.getLog(Bank.class);
    private String bankName;

    public Bank(String name) {
        bankName = name;
    }

    public void process(Exchange exchange) throws Exception {
        LOG.info("Receiving bank request");
        String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN);
        double rate = Math.random() * 10;
        LOG.info("The bank: " + bankName + " for client: " + ssn + " 's rate " + rate);
        exchange.getOut().setHeader(Constants.PROPERTY_RATE, new Double(rate));
        exchange.getOut().setHeader(Constants.PROPERTY_BANK, bankName);
        exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn);
        // Sleep some time
        try {
            Thread.sleep((int) (Math.random() * 10) * 100);
        } catch (InterruptedException e) {
            // Discard
        }
    }

}

Translator

public class Translator implements Processor {

    public void process(Exchange exchange) throws Exception {
        String bank = (String)exchange.getIn().getHeader(Constants.PROPERTY_BANK);
        Double rate = (Double)exchange.getIn().getHeader(Constants.PROPERTY_RATE);
        String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN);
        exchange.getOut().setBody("Loan quotion for Client " + ssn
                                  + "The lowest rate of bank is " + bank + ", the rate is " + rate);
    }

}

You may found we set a custom aggregation strategy to find out the lowest loan rate from bank response message.

public class BankResponseAggregationStrategy implements AggregationStrategy {

    // Here we put the bank response together
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Integer old = (Integer) oldExchange.getProperty("aggregated");
        Double oldRate = (Double) oldExchange.getIn().getHeader(Constants.PROPERTY_RATE);
        Double newRate = (Double) newExchange.getIn().getHeader(Constants.PROPERTY_RATE);
        Exchange result = null;
        if (old == null) {
            old = 1;
        }
        if (newRate >= oldRate) {
            result = oldExchange;
        } else {
            result = newExchange;
        }
        // Set the property for the completeness condition
        result.setProperty("aggregated", old + 1);
        return result;

    }

}

We start the loan broker after we start up the ActiveMq broker and the connection factory of Camel-JMS component.

public static void main(String... args) throws Exception {

        CamelContext context = new DefaultCamelContext();
        JmsBroker broker = new JmsBroker();
        broker.start();
        // Set up the ActiveMQ JMS Components
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        // Note we can explicitly name the component
        context.addComponent("test-jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

        context.addRoutes(new LoanBroker());
        // Start the loan broker
        context.start();

        Thread.sleep(5 * 60 * 1000);
        context.stop();
        Thread.sleep(1000);
        broker.stop();

    }

    /**
     * Lets configure the Camel routing rules using Java code...
     */
    public void configure() {
        // Put the message from loanRequestQueue to the creditRequestQueue
        from("test-jms:queue:loanRequestQueue").to("test-jms:queue:creditRequestQueue");

        // Now we can let the CreditAgency process the request, then the message will be put into creditResponseQueue
        from("test-jms:queue:creditRequestQueue").process(new CreditAgency()).to("test-jms:queue:creditResponseQueue");

        // Here we use the multicast pattern to send the message to three different bank queue
        from("test-jms:queue:creditResponseQueue").multicast().to("test-jms:queue:bank1", "test-jms:queue:bank2", "test-jms:queue:bank3");

        // Each bank process will process the message and put the response message into the bankReplyQueue
        from("test-jms:queue:bank1").process(new Bank("bank1")).to("test-jms:queue:bankReplyQueue");
        from("test-jms:queue:bank2").process(new Bank("bank2")).to("test-jms:queue:bankReplyQueue");
        from("test-jms:queue:bank3").process(new Bank("bank3")).to("test-jms:queue:bankReplyQueue");

        // Now we aggregating the response message by using the Constants.PROPERTY_SSN header
        // The aggregation will completed when the three bank responses are received
        from("test-jms:queue:bankReplyQueue")
            .aggregator(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy())
            .completedPredicate(header("aggregated").isEqualTo(3))

        // Here we do some translation and put the message back to loanReplyQueue
        .process(new Translator()).to("test-jms:queue:loanReplyQueue");

    }
}

Now we can send the request from client

// send out the request message
for (int i = 0; i < 2; i++) {
    template.sendBodyAndHeader("test-jms:queue:loanRequestQueue",
                               "Quote for the lowerst rate of loaning bank",
                               Constants.PROPERTY_SSN, "Client" + i);
}

Here is how we pull the response message

from("test-jms:queue:loanReplyQueue").process(new Processor() {

    public void process(Exchange exchange) throws Exception {
        // Print out the response message
        System.out.println(exchange.getIn().getBody());

    }

});

Reply via email to