Hi,
thanks for answering. I already went throught that pages. I will make my
question more specific.

Here is the code of my simple consumer:
"
public class CoherenceQueueConsumer extends DefaultConsumer implements
Runnable {
        
        private final CoherenceQueueEndpoint queueEndpoint;
        private Subscriber subscription;
        
        public CoherenceQueueConsumer(CoherenceQueueEndpoint endpoint, Processor
processor) {
                super(endpoint, processor);
                this.queueEndpoint = endpoint;
                log.info("CoherenceQueueConsumer succesfully created: " + this);
        }

        
        @Override
        protected void doStart() throws Exception {
                super.doStart();
                MessagingSession session = 
this.queueEndpoint.getMessagingSession();
                this.subscription =
session.subscribe(this.queueEndpoint.getQueueIdentifier());
                this.subscription.setAutoCommit(false);
                new Thread(this).start();
                log.info("Consumer ["+this+"] started");
        }
        
        
        @Override
        protected void doStop() throws Exception {
                this.subscription.unsubscribe();
                log.info("Consumer ["+this+"] unsubscribed.");
                super.doStop();
        }
        
        public void run() {
                this.consumeMessages();
        }


        private void consumeMessages() {
                
                log.info("Consumer ["+this+"] is consuming messages...");
                
                while(isRunAllowed()) {
                        
                        Object rawMessage = subscription.getMessage();
                        log.info("Received Message: [" +rawMessage+"]");
                        
                        Exchange ex = getEndpoint().createExchange();
                        ex.getIn().setBody(rawMessage);
                        
                        try {
                                getProcessor().process(ex);
                                this.subscription.commit();
                        } catch (Exception e) {
                                log.error("Exeception occurred consuming 
message", e);
                                this.subscription.rollback();
                        }
                }
                
        }
        
        
        @Override
        public String toString() {
                return "CoherenceQueueConsumer [queueEndpoint=" + queueEndpoint 
+ "]";
        }
        
        
}
"

As you can see, to start effectively receiving messages from the Oracle
Coherence Queue, I start a new thread that calls the blocking method
"subscription.getMessage()".

My question is: is this approach correct ? Am I allowed to "rawly" create a
new thread for each consumer or should I use the
org.apache.camel.util.concurrent.ExecutorServiceHelper ?

Thanks a lot.

p.s. I am a volounter too, so I also don't have so much time ;)

--
View this message in context: 
http://camel.465427.n5.nabble.com/Developing-Oracle-Coherence-3-7-Camel-Component-tp4618649p4634601.html
Sent from the Camel Development mailing list archive at Nabble.com.

Reply via email to