Hello,
I tried to create as simple component as possible that represents the case I
need to implement (I am using provided ActiveMQ broker to post messages that
are consumed by my Component). 
When I try to launch simple test that sends 1 message to broker via
ProducerTemplate, but whole setup is behaving strangely and does not get
message where it should. Here is my code:

Spring XML:

<broker:broker id="cdiBroker" useJmx="false" persistent="false"
brokerName="localhost">
        <broker:transportConnectors>
                <broker:transportConnector name="tcpCDI" 
uri="tcp://localhost:51612" />
        </broker:transportConnectors>
</broker:broker>

<bean id="cdi" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                        <property name="brokerURL" 
value="tcp://localhost:51612" />
                </bean>
        </property>
</bean>


<bean id="my" class="com.xxx.MyComponent">
        <property name="host" value="XXX"/>
        <property name="port" value="XXX"/>
        <property name="instanceId" value="XXX"/>
        <property name="username" value="XXX"/>
        <property name="password" value="XXX"/>
</bean>          

<camel:camelContext id="camelContext">
        <camel:route>
                <camel:from uri="cdi:queue:input" />
                <camel:log message="Recieved message in CDI part: ${body}"
loggingLevel="WARN" />
                <camel:to uri="bean:my" />
        </camel:route>
</camel:camelContext>

And for component:

public class MyComponent extends DefaultComponent {
        private static final Logger LOG =
LoggerFactory.getLogger(MyComponent.class);
        private String host;
        private String port;
        private String instanceId;
        private String password;
        private String username;

        @Override
        public Endpoint createEndpoint(String uri) throws Exception {
                LOG.info("Creating new My endopint...");
                return this.createEndpoint(uri, null, new HashMap<String, 
Object>());
        }

        @Override
        protected Endpoint createEndpoint(String uri, String remaining, 
Map<String,
Object> parameters) throws Exception {
                LOG.info("Creating new My endopint...");
                
                host = getAndRemoveParameter(parameters, "host", String.class);
                port = getAndRemoveParameter(parameters, "port", String.class);
                instanceId = getAndRemoveParameter(parameters, "instanceId",
String.class);
                password = getAndRemoveParameter(parameters, "password", 
String.class);
                username = getAndRemoveParameter(parameters, "username", 
String.class);

                ConnectionFactory cf = new AMIQueueConnectionFactory(host, port,
instanceId);
        UserCredentialsConnectionFactoryAdapter ucfa = new
UserCredentialsConnectionFactoryAdapter();
        ucfa.setTargetConnectionFactory(cf);
        ucfa.setUsername(username);
        ucfa.setPassword(password);
        
                MyJmsQueueEndpoint endpoint = new MyJmsQueueEndpoint(uri, this, 
ucfa,
instanceId);

                return endpoint;
        }

My endpoint looks like this:

public class MyJmsQueueEndpoint extends DefaultEndpoint {
        private static final Logger LOG =
LoggerFactory.getLogger(MyJmsQueueEndpoint.class);
        private QueueConnection connection;
        private ConnectionFactory cf;
        private String instanceId;

        public MyJmsQueueEndpoint(String uri, MyComponent component,
ConnectionFactory cf, String instanceId) {
                super(uri, component);
                this.cf = cf;
                this.instanceId = instanceId;
        }

        @Override
        public boolean isSingleton() {
                return true;
        }
        
        @Override
    public MyConsumer createConsumer(Processor processor) throws Exception {
        return new MyConsumer(this, processor);
    }

        
    @Override
        public MyProducer createProducer() throws Exception {
        return new MyProducer(this);
    }

        @Override
        protected void doStart() throws Exception {
                LOG.info("Starting endpoint...");
                Connection c = createConnection();
                LOG.info("Connection to My JMS verified");
                c.close();
   }
        
        @Override
        protected void doStop() throws Exception {
                LOG.info("Stopping endpoint...");
                if (connection != null) {
                        connection.close();
                }
   }

    public QueueConnection createConnection() throws Exception {
        if (connection == null) {
                this.connection = ((UserCredentialsConnectionFactoryAdapter)
cf).createQueueConnection();;
        }
        return connection;      
    }

        public String getInstanceId() {
                return instanceId;
        }


and my producer look like this:

public class MyProducer extends DefaultProducer {
        
        private static final Logger LOG =
LoggerFactory.getLogger(MyProducer.class);

        public MyProducer(MyJmsQueueEndpoint endpoint) {
                super(endpoint);
        }

        @Override
        public void process(Exchange exchange) throws Exception {
                LOG.info("process() method in MyProducer class invoked...");
                
                QueueConnection connection =
((MyJmsQueueEndpoint)getEndpoint()).createConnection();

                AMIQueueSession session = (AMIQueueSession)
connection.createQueueSession(true,0);
                AMIQueue destination = new
AMIQueue(((MyJmsQueueEndpoint)getEndpoint()).getInstanceId() + "/" +
"QUEUE.IN");
                AMIQueueSender queueSender = (AMIQueueSender)
session.createSender(destination);
        AMITextMessage message = (AMITextMessage)
session.createTextMessage();
        message.setText(exchange.getIn().getBody(String.class));
        message.setJMSType("JMSTYPE");

        try {
                queueSender.send(message, DeliveryMode.PERSISTENT, 0, 0);
                session.commit();
                LOG.info("Message sent to queue");
        } catch (JMSException e) {
                LOG.error("Message sending fail: " + e.getMessage());
                e.printStackTrace();
        } finally {
                queueSender.close();
                session.close();
        }

        }


When ran the test finishes by itself, without giving any WARN in logs except
for logging component that correctly logs message:

[                          main] DefaultComponent               DEBUG
Creating endpoint uri=[bean://my], path=[my], parameters=[{}]
...

[thread #0 - JmsConsumer[input]] route1                         WARN 
Recieved message in CDI part: XXX
[thread #0 - JmsConsumer[input]] SendProcessor                  DEBUG >>>>
Endpoint[bean://my] Exchange[JmsMessage: ActiveMQTextMessage {commandId = 5,
responseRequired = true, messageId =
ID:CHEPWMWVB316E6-3561-1322213255367-2:2:1:1:1, originalDestination = null,
originalTransactionId = null, producerId =
ID:CHEPWMWVB316E6-3561-1322213255367-2:2:1:1, destination = queue://input,
transactionId = null, expiration = 0, timestamp = 1322213267445, arrival =
0, brokerInTime = 1322213267445, brokerOutTime = 1322213267523,
correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
org.apache.activemq.util.ByteSequence@a34b91, dataStructure = null,
redeliveryCounter = 0, size = 0, properties =
{breadcrumbId=ID-CHEPWMWVB316E6-3562-1322213257352-0-2}, readOnlyProperties
= true, readOnlyBody = true, droppable = false, text = <soapenv:Envelope
xmlns:soapenv="http://schem...nv:Envelope>}]
[thread #0 - JmsConsumer[input]] MyComponent                INFO  Creating
new My endopint...
[thread #0 - JmsConsumer[input]] MyComponent                INFO  Creating
new My endopint...
[thread #0 - JmsConsumer[input]] BeanProcessor                  DEBUG
Setting bean invocation result on the IN message: Endpoint[XXX]
[                          main] SpringCamelContext             INFO  Apache
Camel 2.8.2 (CamelContext:camelContext) is shutting down
...

What is strange that there is createEndpoint() method invoked on
DefaultComponent although I override it my my own Comopnent. Also start(),
stop(), createConsumer() or createProducer() method are not invoked. Logging
with DEBUG is quite verbose but I have no idea what is wrong, it just stops
without any trace of problems. When debugging and setting breakpoint to any
Exception it stops on EOFException in DataInputStream in Thread assigned to
my testing broker on port 51612. But no clue what is happening around it.
One point - I tried even simpler setup without own component, just one
processor at the end of pipeline that manually connects where needed and
this works. So there are no port issues with local broker etc. Also sending
message via ProducerTemplate directly to Component does not help...
There must be soemthign wrong with Component setup but I don't see where...
Please help if You see issue(s) :)


--
View this message in context: 
http://camel.465427.n5.nabble.com/connecting-to-new-jms-implementation-tp5017200p5022416.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to