On Wed, Jun 6, 2012 at 4:12 PM, jhart <james.h...@nokia.com> wrote:

> *When I run this route heavily multithreaded camel route I am getting a
> NullPointerException thrown in the CaseInsensitiveMap when it is copying
> the
> exchanges for the multicast.  I am using version 2.9.2, though
> 2.10-SNAPSHOT
> has the same issue.  I am doing this to reduce the latency and increase
> throughput in this system.
>
> Here is the fragment of the DSL which builds the route.
> *
> .....
> processDefinition.split(genericTmcMappingProcessor)
>    .parallelProcessing().timeout(timeout).threads(10, 20)
>    .process(preDestinationProcessor).multicast()
>       .parallelProcessing().timeout(timeout).threads(10,20)
>    .to(destinationURIs);
>
>
This is really wrong with multicast parallel + threads. Then you have 2x
thread pools.
And then yet another multicast inside the existing multicast.

If you want to configure the thread pools of the multicast, then read the
docs
http://camel.apache.org/multicast

Use the executorServiceRef to refer to a custom thread pool / profile where
you can configure the thread pool sizes.
See details at: http://camel.apache.org/threading-model.html


*The CaseInsensitiveMap assembleKey method throws a NullPointerException on
> the key.toString() as key is passed in null.*
>
>    private static String assembleKey(Object key) {
> *        String s = key.toString().toLowerCase();
> *        if (s.startsWith("camel")) {
>            // use intern String for headers which is Camel* headers
>            // this reduces memory allocations needed for those common
> headers
>            s = s.intern();
>        }
>        return s;
>    }
>
>
> *The details of the issue is here in the Map.Entry anonymous class, where
> the originalKeys.get(s) returns a null even though the originalKeys HashMap
> actually has the data in it.  I know this because I can inspect it in the
> debugger and it will return the proper value to me in the debugger.
> *
>                Map.Entry<String, Object> view = new Map.Entry<String,
> Object>() {
>                    public String getKey() {
>                        String s = entry.getKey();
>                        // use the original key so we can preserve it
> *                        return originalKeys.get(s);
> *                    }
>
> *I was able to make it more reliable by this ridiculous hack below, which
> retries the originalKeys.get(s) up to 100 times, though this is not an
> acceptable solution but an excercise to try to understand if this was a
> multithreading issue. *
>
>                    public synchronized String getKey()
>                    {
>                        String s = entry.getKey();
>                        String key = originalKeys.get(s);
>
>                        int count = 100;
>                        while(count > 0 && key == null)
>                        {
>                           try
>                           {
>                              Thread.sleep(25);
>                              key  = originalKeys.get(s);
>                           }
>                           catch(InterruptedException e)
>                           {
>                              e.printStackTrace();
>                           }
>                           finally
>                           {
>                              count--;
>                           }
>                        }
>
>                        if(key == null)
>                        {
>                           System.out.println("get failed");
>                        }
>
>                        return key;
>                    }
>
> *Here is the call stack when I set the breakpoint for
> NullPointerExceptions.*
>
> Daemon Thread [Camel (camel-1) thread #18 - Threads] (Suspended (exception
> NullPointerException))
>        CaseInsensitiveMap.assembleKey(Object) line: 142
>        CaseInsensitiveMap.putAll(Map<String,?>) line: 97
>        GenericFileMessage<T>(MessageSupport).copyFrom(Message) line: 141
>        GenericFileMessage<T>(DefaultMessage).copyFrom(Message) line: 52
>        ExchangeHelper.copyResults(Exchange, Exchange) line: 249
>        Splitter(MulticastProcessor).doDone(Exchange, Exchange,
> AsyncCallback,
> boolean, boolean) line: 749
>        Splitter(MulticastProcessor).process(Exchange, AsyncCallback) line:
> 238
>        Splitter.process(Exchange, AsyncCallback) line: 97
>        AsyncProcessorHelper.process(AsyncProcessor, Exchange,
> AsyncCallback) line:
> 73
>
>  InstrumentationProcessor(DelegateAsyncProcessor).processNext(Exchange,
> AsyncCallback) line:
> ..........
>
> *I believe this to be an issue with the CaseInsensitiveMap not being
> threadsafe.  Shouldn't a threadsafe version be implemented using
> ConcurrentHashmaps and syncronization?
>
> I have not been able to get a simple case together to show this issue, I
> will work on that in the next few days.
> *
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Possible-threading-issue-tp5714069.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Reply via email to