Author: ningjiang
Date: Wed Jul 23 22:15:27 2008
New Revision: 679273

URL: http://svn.apache.org/viewvc?rev=679273&view=rev
Log:
CAMEL-715 Fixed the ConcurrentModificationException in ThreadProcessor

Added:
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
   (with props)
Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=679273&r1=679272&r2=679273&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
 Wed Jul 23 22:15:27 2008
@@ -18,6 +18,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -103,7 +104,7 @@
         if (properties == null) {
             return null;
         }
-        return new HashMap<String, Object>(properties);
+        return new ConcurrentHashMap<String, Object>(properties);
     }
 
     private static Message safeCopy(Exchange exchange, Message message) {
@@ -155,8 +156,14 @@
             Class type = value.getClass();
             validateExchangePropertyIsExpectedType(property, type, value);
         }
-
-        getProperties().put(name, value);
+        if (value != null) {
+            // avoid the NULLPointException
+            getProperties().put(name, value);
+        } else { // if the value is null , we just remove the key from the map
+            if (name !=  null) {
+                getProperties().remove(name);
+            }
+        }
     }
 
     private <T> void 
validateExchangePropertyIsExpectedType(ExchangeProperty<?> property, Class<T> 
type, Object value) {
@@ -174,7 +181,7 @@
 
     public Map<String, Object> getProperties() {
         if (properties == null) {
-            properties = new HashMap<String, Object>();
+            properties = new ConcurrentHashMap<String, Object>();
         }
         return properties;
     }

Added: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java?rev=679273&view=auto
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
 Wed Jul 23 22:15:27 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+
+public class Camel715ThreadProcessorTest extends TestCase {
+    private static final int ITERS = 50000;
+
+    class SendingProcessor implements Processor {
+        int iterationNumber;
+        public SendingProcessor(int iter) {
+            iterationNumber = iter;
+        }
+
+        public void process(Exchange exchange) throws Exception {
+            Message in = exchange.getIn();
+            in.setBody("a");
+            // may set the property here
+            exchange.setProperty("iterationNumber", iterationNumber);
+        }
+
+    };
+
+    public void testThreadProcessor() {
+        try {
+            CamelContext context = new DefaultCamelContext();
+
+            final CountDownLatch latch = new CountDownLatch(ITERS);
+
+            context.addRoutes(new RouteBuilder() {
+
+                @Override
+                public void configure() throws Exception {
+                    from("direct:a").thread(4).process(new Processor() {
+
+                        public void process(Exchange ex) throws Exception {
+                            latch.countDown();
+                        }
+                    });
+                }
+
+            });
+
+            final ProducerTemplate<Exchange> template = 
context.createProducerTemplate();
+
+            final Endpoint e = context.getEndpoint("direct:a");
+            context.start();
+            Long startTime = System.nanoTime();
+
+            for (int i = 0; i < ITERS; i++) {
+                template.send(e, new SendingProcessor(i), new AsyncCallback() {
+                    public void done(boolean arg0) {
+                        // Do nothing here
+                    }
+                });
+            }
+
+            latch.await();
+
+            context.stop();
+        } catch (Exception ex) {
+            fail("Get the exception " + ex + "here");
+            System.exit(-1);
+        }
+    }
+
+}

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to