Hello, I tried to create as simple component as possible that represents the case I need to implement (I am using provided ActiveMQ broker to post messages that are consumed by my Component). When I try to launch simple test that sends 1 message to broker via ProducerTemplate, but whole setup is behaving strangely and does not get message where it should. Here is my code:
Spring XML: <broker:broker id="cdiBroker" useJmx="false" persistent="false" brokerName="localhost"> <broker:transportConnectors> <broker:transportConnector name="tcpCDI" uri="tcp://localhost:51612" /> </broker:transportConnectors> </broker:broker> <bean id="cdi" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:51612" /> </bean> </property> </bean> <bean id="my" class="com.xxx.MyComponent"> <property name="host" value="XXX"/> <property name="port" value="XXX"/> <property name="instanceId" value="XXX"/> <property name="username" value="XXX"/> <property name="password" value="XXX"/> </bean> <camel:camelContext id="camelContext"> <camel:route> <camel:from uri="cdi:queue:input" /> <camel:log message="Recieved message in CDI part: ${body}" loggingLevel="WARN" /> <camel:to uri="bean:my" /> </camel:route> </camel:camelContext> And for component: public class MyComponent extends DefaultComponent { private static final Logger LOG = LoggerFactory.getLogger(MyComponent.class); private String host; private String port; private String instanceId; private String password; private String username; @Override public Endpoint createEndpoint(String uri) throws Exception { LOG.info("Creating new My endopint..."); return this.createEndpoint(uri, null, new HashMap<String, Object>()); } @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { LOG.info("Creating new My endopint..."); host = getAndRemoveParameter(parameters, "host", String.class); port = getAndRemoveParameter(parameters, "port", String.class); instanceId = getAndRemoveParameter(parameters, "instanceId", String.class); password = getAndRemoveParameter(parameters, "password", String.class); username = getAndRemoveParameter(parameters, "username", String.class); ConnectionFactory cf = new AMIQueueConnectionFactory(host, port, instanceId); UserCredentialsConnectionFactoryAdapter ucfa = new UserCredentialsConnectionFactoryAdapter(); ucfa.setTargetConnectionFactory(cf); ucfa.setUsername(username); ucfa.setPassword(password); MyJmsQueueEndpoint endpoint = new MyJmsQueueEndpoint(uri, this, ucfa, instanceId); return endpoint; } My endpoint looks like this: public class MyJmsQueueEndpoint extends DefaultEndpoint { private static final Logger LOG = LoggerFactory.getLogger(MyJmsQueueEndpoint.class); private QueueConnection connection; private ConnectionFactory cf; private String instanceId; public MyJmsQueueEndpoint(String uri, MyComponent component, ConnectionFactory cf, String instanceId) { super(uri, component); this.cf = cf; this.instanceId = instanceId; } @Override public boolean isSingleton() { return true; } @Override public MyConsumer createConsumer(Processor processor) throws Exception { return new MyConsumer(this, processor); } @Override public MyProducer createProducer() throws Exception { return new MyProducer(this); } @Override protected void doStart() throws Exception { LOG.info("Starting endpoint..."); Connection c = createConnection(); LOG.info("Connection to My JMS verified"); c.close(); } @Override protected void doStop() throws Exception { LOG.info("Stopping endpoint..."); if (connection != null) { connection.close(); } } public QueueConnection createConnection() throws Exception { if (connection == null) { this.connection = ((UserCredentialsConnectionFactoryAdapter) cf).createQueueConnection();; } return connection; } public String getInstanceId() { return instanceId; } and my producer look like this: public class MyProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(MyProducer.class); public MyProducer(MyJmsQueueEndpoint endpoint) { super(endpoint); } @Override public void process(Exchange exchange) throws Exception { LOG.info("process() method in MyProducer class invoked..."); QueueConnection connection = ((MyJmsQueueEndpoint)getEndpoint()).createConnection(); AMIQueueSession session = (AMIQueueSession) connection.createQueueSession(true,0); AMIQueue destination = new AMIQueue(((MyJmsQueueEndpoint)getEndpoint()).getInstanceId() + "/" + "QUEUE.IN"); AMIQueueSender queueSender = (AMIQueueSender) session.createSender(destination); AMITextMessage message = (AMITextMessage) session.createTextMessage(); message.setText(exchange.getIn().getBody(String.class)); message.setJMSType("JMSTYPE"); try { queueSender.send(message, DeliveryMode.PERSISTENT, 0, 0); session.commit(); LOG.info("Message sent to queue"); } catch (JMSException e) { LOG.error("Message sending fail: " + e.getMessage()); e.printStackTrace(); } finally { queueSender.close(); session.close(); } } When ran the test finishes by itself, without giving any WARN in logs except for logging component that correctly logs message: [ main] DefaultComponent DEBUG Creating endpoint uri=[bean://my], path=[my], parameters=[{}] ... [thread #0 - JmsConsumer[input]] route1 WARN Recieved message in CDI part: XXX [thread #0 - JmsConsumer[input]] SendProcessor DEBUG >>>> Endpoint[bean://my] Exchange[JmsMessage: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:CHEPWMWVB316E6-3561-1322213255367-2:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:CHEPWMWVB316E6-3561-1322213255367-2:2:1:1, destination = queue://input, transactionId = null, expiration = 0, timestamp = 1322213267445, arrival = 0, brokerInTime = 1322213267445, brokerOutTime = 1322213267523, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@a34b91, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID-CHEPWMWVB316E6-3562-1322213257352-0-2}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = <soapenv:Envelope xmlns:soapenv="http://schem...nv:Envelope>}] [thread #0 - JmsConsumer[input]] MyComponent INFO Creating new My endopint... [thread #0 - JmsConsumer[input]] MyComponent INFO Creating new My endopint... [thread #0 - JmsConsumer[input]] BeanProcessor DEBUG Setting bean invocation result on the IN message: Endpoint[XXX] [ main] SpringCamelContext INFO Apache Camel 2.8.2 (CamelContext:camelContext) is shutting down ... What is strange that there is createEndpoint() method invoked on DefaultComponent although I override it my my own Comopnent. Also start(), stop(), createConsumer() or createProducer() method are not invoked. Logging with DEBUG is quite verbose but I have no idea what is wrong, it just stops without any trace of problems. When debugging and setting breakpoint to any Exception it stops on EOFException in DataInputStream in Thread assigned to my testing broker on port 51612. But no clue what is happening around it. One point - I tried even simpler setup without own component, just one processor at the end of pipeline that manually connects where needed and this works. So there are no port issues with local broker etc. Also sending message via ProducerTemplate directly to Component does not help... There must be soemthign wrong with Component setup but I don't see where... Please help if You see issue(s) :) -- View this message in context: http://camel.465427.n5.nabble.com/connecting-to-new-jms-implementation-tp5017200p5022416.html Sent from the Camel - Users mailing list archive at Nabble.com.