Hi,

You need to turn the hashNumber into a String and put it after
"activemq" instead of using it directly.

I can reproduce the error by setting the component name with "123activemq", and the error is gone when I use "activemq123".

Willem

mdasari wrote:
Thanks for the reply. In summary it works now.
I actually tried this approach before and it didn't work then but now it
works due to minor change. It appears
org.apache.camel.impl.DefaultComponent.createEndpoint(...) doesn't like
numeric name, when I tried earlier I used numeric hash off the IP address.
It ran into following exception, and I assumed the URI schema has to follow
naming convention used in the http://camel.apache.org/uris.html


INFO   | jvm 1    | 2010/04/16 18:18:04 | java.net.URISyntaxException:
Illegal character in scheme name at index 0: 1466722955:topic:In.Servers
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
java.net.URI$Parser.fail(Unknown Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
java.net.URI$Parser.checkChars(Unknown Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
java.net.URI$Parser.checkChar(Unknown Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
java.net.URI$Parser.parse(Unknown Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at java.net.URI.<init>(Unknown
Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
org.apache.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:63)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
com.mycompany.infra.myrouter.MyRouterBean.getRoutes(MyRouterBean.java:73)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
java.lang.reflect.Method.invoke(Unknown Source)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:163)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:92)
INFO   | jvm 1    | 2010/04/16 18:18:04 |       at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:118)

Now by prefixing "activemq" to the hash it works.


Regarding your  ProducerCache and RecipientList comment, isn't that a bug?

thanks for the help.

cheers
- mdasari






willem.jiang wrote:
After digging the code, I found why your first "remote server" can receive the message.

There is a ProducerCache for the RecipientList and it only take the endpoint's uri as the key(in camel the uri is the key of endpoint), and all your remote endpoint are with the same uri.

For your case, you need to use CamelContext.addComponent to add the new ActiveMQComponent with differnent schema name like activemq + [hash of the IP address], then you create the endpoint with the name "activemq+[hash of the IP address]:topic:In.LocalServer".

Willem

mdasari wrote:
Thanks Willem for your comments. I posted a simpler version of my code.
My
actual code was adding a camel Component (ActiveMQComponent) by using
CamelContext.addComponent and I'm trying to retrieve the component by
calling getComponent. The key is hash of the IP address, basically like CamelContext::getOrCreateComponent. At the moment it seems to be creating
a
new component every time though. I can work around by my own cache or
some
other means.
But my real issue is usage of wrong IP address for delivery of the
messages,
it must be caching the first Endpoint there some where and using it again
and again.

I tried with version 1.6.2 there is no change in the behavior. I wonder
if
there is something wrong with my usage of Endpoint.
Are there any other ways of implementing remote broker RecipientList EIP?

cheers
- mdasari



willem.jiang wrote:
You created a new CamelContext and a ActiveMQComponent( for each new endpoint, that are expensive operation, maybe you can use a ActiveMQComponent cache with IPAddress as the index.

Can you try to use the latest released version of Camel 1.x (Camel 1.6.2) to see if the issue still there?

Willem

mdasari wrote:
Hi,

I needed to route a message from one ActiveMQ broker to one or more
remote
ActiveMQ brokers based on some runtime information.

Lets say I've broker "central" and remote brokers "remote1", "remote2",
...
"remoteN".
The number of remote brokers can change over time and I wouldn't want
to
configure any beans or any kind of configuration in the activemq.xml of
the
"central" broker.

The following what I tried, but it seems to be sending the message
always
to
the first "remote" server that I used. Couple of points:

0. I putup a POJO  bean with RecipientList EIP.
1. topic:Out.RemoteServers is the topic on the "central" broker
2. topic:In.LocalServer is the topic on any "remote" broker.
3. All applications that post messages to "central" broker add a
message
header with name "IPAddesses". The header value contain a list of IP
addresses corresponding to the "remote" brokers.
4. I'm using camel version 1.6.1.0

The code looks as follows (trimmed for clarity)

--------
        @Consume(uri = "topic:Out.RemoteServers")
        @RecipientList()
        public Collection<Endpoint> getRoutes(@Header(name = "IPAddresses")
String
ips, String body) {
                List<Endpoint> dests = new ArrayList<Endpoint>();
                try {
                        // do some validation on IP addresses header and barf.

                        // The header is delimited by commas.
                        //
                        StringTokenizer tokenizer = new StringTokenizer(ips, 
",");
                
                        CamelContext ctx = new DefaultCamelContext();
                        while (tokenizer.hasMoreElements()) {
                                String ip = tokenizer.nextToken();
                                ActiveMQComponent amq = new 
ActiveMQComponent(ctx);
                                
                                logger.info("Setting the broker URL to IP: " + 
ip);
                                
                                amq.setBrokerURL("tcp://" + ip + ":61616");

                                Endpoint ep = 
amq.createEndpoint("activemq:topic:In.LocalServer");
                                dests.add(ep);
                        }
                } catch (Exception e) {
                            // Let our monitoring take care of this..
                        logger.error("Unexpected exception while routing.", e);
                }
                return dests;
        }

----

Appreciate any help.

cheers
- mdasari






Reply via email to