Author: davsclaus
Date: Tue Oct 16 13:43:31 2012
New Revision: 1398789

URL: http://svn.apache.org/viewvc?rev=1398789&view=rev
Log:
CAMEL-5707: NotifyBuilder is now thread safe.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java?rev=1398789&r1=1398788&r2=1398789&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
 Tue Oct 16 13:43:31 2012
@@ -22,6 +22,8 @@ import java.util.EventObject;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -77,7 +79,7 @@ public class NotifyBuilder {
     private int wereSentToIndex;
 
     // computed value whether all the predicates matched
-    private boolean matches;
+    private volatile boolean matches;
 
     /**
      * Creates a new builder.
@@ -287,7 +289,7 @@ public class NotifyBuilder {
     public NotifyBuilder wereSentTo(final String endpointUri) {
         // insert in start of stack but after the previous wereSentTo
         stack.add(wereSentToIndex++, new EventPredicateSupport() {
-            private boolean sentTo;
+            private AtomicBoolean sentTo = new AtomicBoolean();
 
             @Override
             public boolean isAbstract() {
@@ -298,14 +300,14 @@ public class NotifyBuilder {
             @Override
             public boolean onExchangeCreated(Exchange exchange) {
                 // reset when a new exchange is created
-                sentTo = false;
+                sentTo.set(false);
                 return onExchange(exchange);
             }
 
             @Override
             public boolean onExchangeSent(Exchange exchange, Endpoint 
endpoint, long timeTaken) {
                 if (EndpointHelper.matchEndpoint(context, 
endpoint.getEndpointUri(), endpointUri)) {
-                    sentTo = true;
+                    sentTo.set(true);
                 }
                 return onExchange(exchange);
             }
@@ -313,7 +315,7 @@ public class NotifyBuilder {
             @Override
             public boolean onExchange(Exchange exchange) {
                 // filter only when sentTo
-                return sentTo;
+                return sentTo.get();
             }
 
             public boolean matches() {
@@ -323,7 +325,7 @@ public class NotifyBuilder {
 
             @Override
             public void reset() {
-                sentTo = false;
+                sentTo.set(false);
             }
 
             @Override
@@ -345,21 +347,21 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenReceived(final int number) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeCreated(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             public boolean matches() {
-                return current >= number;
+                return current.get() >= number;
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
             }
 
             @Override
@@ -384,27 +386,27 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenDone(final int number) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private final AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeCompleted(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             @Override
             public boolean onExchangeFailed(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             public boolean matches() {
-                return current >= number;
+                return current.get() >= number;
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
             }
 
             @Override
@@ -426,23 +428,23 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenDoneByIndex(final int index) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private AtomicInteger current = new AtomicInteger();
             private String id;
-            private boolean done;
+            private AtomicBoolean done = new AtomicBoolean();
 
             @Override
             public boolean onExchangeCreated(Exchange exchange) {
-                if (current == index) {
+                if (current.get() == index) {
                     id = exchange.getExchangeId();
                 }
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             @Override
             public boolean onExchangeCompleted(Exchange exchange) {
                 if (exchange.getExchangeId().equals(id)) {
-                    done = true;
+                    done.set(false);
                 }
                 return true;
             }
@@ -450,20 +452,20 @@ public class NotifyBuilder {
             @Override
             public boolean onExchangeFailed(Exchange exchange) {
                 if (exchange.getExchangeId().equals(id)) {
-                    done = true;
+                    done.set(true);
                 }
                 return true;
             }
 
             public boolean matches() {
-                return done;
+                return done.get();
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
                 id = null;
-                done = false;
+                done.set(false);
             }
 
             @Override
@@ -488,21 +490,21 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenCompleted(final int number) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeCompleted(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             public boolean matches() {
-                return current >= number;
+                return current.get() >= number;
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
             }
 
             @Override
@@ -524,21 +526,21 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenFailed(final int number) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeFailed(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             public boolean matches() {
-                return current >= number;
+                return current.get() >= number;
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
             }
 
             @Override
@@ -559,27 +561,27 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenExactlyDone(final int number) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeCompleted(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             @Override
             public boolean onExchangeFailed(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             public boolean matches() {
-                return current == number;
+                return current.get() == number;
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
             }
 
             @Override
@@ -601,21 +603,21 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenExactlyCompleted(final int number) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeCompleted(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             public boolean matches() {
-                return current == number;
+                return current.get() == number;
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
             }
 
             @Override
@@ -634,21 +636,21 @@ public class NotifyBuilder {
      */
     public NotifyBuilder whenExactlyFailed(final int number) {
         stack.add(new EventPredicateSupport() {
-            private int current;
+            private AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeFailed(Exchange exchange) {
-                current++;
+                current.incrementAndGet();
                 return true;
             }
 
             public boolean matches() {
-                return current == number;
+                return current.get() == number;
             }
 
             @Override
             public void reset() {
-                current = 0;
+                current.set(0);
             }
 
             @Override
@@ -681,39 +683,39 @@ public class NotifyBuilder {
 
     private NotifyBuilder doWhenAnyMatches(final Predicate predicate, final 
boolean received) {
         stack.add(new EventPredicateSupport() {
-            private boolean matches;
+            private final AtomicBoolean matches = new AtomicBoolean();
 
             @Override
             public boolean onExchangeCompleted(Exchange exchange) {
-                if (!received && !matches) {
-                    matches = predicate.matches(exchange);
+                if (!received && !matches.get()) {
+                    matches.set(predicate.matches(exchange));
                 }
                 return true;
             }
 
             @Override
             public boolean onExchangeFailed(Exchange exchange) {
-                if (!received && !matches) {
-                    matches = predicate.matches(exchange);
+                if (!received && !matches.get()) {
+                    matches.set(predicate.matches(exchange));
                 }
                 return true;
             }
 
             @Override
             public boolean onExchangeCreated(Exchange exchange) {
-                if (received && !matches) {
-                    matches = predicate.matches(exchange);
+                if (received && !matches.get()) {
+                    matches.set(predicate.matches(exchange));
                 }
                 return true;
             }
 
             public boolean matches() {
-                return matches;
+                return matches.get();
             }
 
             @Override
             public void reset() {
-                matches = false;
+                matches.set(false);
             }
 
             @Override
@@ -750,39 +752,39 @@ public class NotifyBuilder {
 
     private NotifyBuilder doWhenAllMatches(final Predicate predicate, final 
boolean received) {
         stack.add(new EventPredicateSupport() {
-            private boolean matches = true;
+            private final AtomicBoolean matches = new AtomicBoolean(true);
 
             @Override
             public boolean onExchangeCompleted(Exchange exchange) {
-                if (!received && matches) {
-                    matches = predicate.matches(exchange);
+                if (!received && matches.get()) {
+                    matches.set(predicate.matches(exchange));
                 }
                 return true;
             }
 
             @Override
             public boolean onExchangeFailed(Exchange exchange) {
-                if (!received && matches) {
-                    matches = predicate.matches(exchange);
+                if (!received && matches.get()) {
+                    matches.set(predicate.matches(exchange));
                 }
                 return true;
             }
 
             @Override
             public boolean onExchangeCreated(Exchange exchange) {
-                if (received && matches) {
-                    matches = predicate.matches(exchange);
+                if (received && matches.get()) {
+                    matches.set(predicate.matches(exchange));
                 }
                 return true;
             }
 
             public boolean matches() {
-                return matches;
+                return matches.get();
             }
 
             @Override
             public void reset() {
-                matches = true;
+                matches.set(true);
             }
 
             @Override
@@ -932,7 +934,6 @@ public class NotifyBuilder {
 
     private NotifyBuilder doWhenNotSatisfied(final MockEndpoint mock, final 
boolean received) {
         stack.add(new EventPredicateSupport() {
-
             private Producer producer;
 
             @Override
@@ -1060,8 +1061,8 @@ public class NotifyBuilder {
 
     private NotifyBuilder doWhenBodies(final List<?> bodies, final boolean 
received, final boolean exact) {
         stack.add(new EventPredicateSupport() {
-            private boolean matches;
-            private int current;
+            private volatile boolean matches;
+            private final AtomicInteger current = new AtomicInteger();
 
             @Override
             public boolean onExchangeCreated(Exchange exchange) {
@@ -1088,30 +1089,28 @@ public class NotifyBuilder {
             }
 
             private void matchBody(Exchange exchange) {
-                current++;
-
-                if (current > bodies.size()) {
+                if (current.incrementAndGet() > bodies.size()) {
                     // out of bounds
                     return;
                 }
 
                 Object actual = exchange.getIn().getBody();
-                Object expected = bodies.get(current - 1);
+                Object expected = bodies.get(current.get() - 1);
                 matches = ObjectHelper.equal(expected, actual);
             }
 
             public boolean matches() {
                 if (exact) {
-                    return matches && current == bodies.size();
+                    return matches && current.get() == bodies.size();
                 } else {
-                    return matches && current >= bodies.size();
+                    return matches && current.get() >= bodies.size();
                 }
             }
 
             @Override
             public void reset() {
                 matches = false;
-                current = 0;
+                current.set(0);
             }
 
             @Override


Reply via email to