Author: davsclaus Date: Tue Oct 16 13:47:09 2012 New Revision: 1398793 URL: http://svn.apache.org/viewvc?rev=1398793&view=rev Log: CAMEL-5707: NotifyBuilder is now thread safe.
Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1398789 Merged /camel/branches/camel-2.10.x:r1398790 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java?rev=1398793&r1=1398792&r2=1398793&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java Tue Oct 16 13:47:09 2012 @@ -22,6 +22,8 @@ import java.util.EventObject; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -77,7 +79,7 @@ public class NotifyBuilder { private int wereSentToIndex; // computed value whether all the predicates matched - private boolean matches; + private volatile boolean matches; /** * Creates a new builder. @@ -287,7 +289,7 @@ public class NotifyBuilder { public NotifyBuilder wereSentTo(final String endpointUri) { // insert in start of stack but after the previous wereSentTo stack.add(wereSentToIndex++, new EventPredicateSupport() { - private boolean sentTo; + private AtomicBoolean sentTo = new AtomicBoolean(); @Override public boolean isAbstract() { @@ -298,14 +300,14 @@ public class NotifyBuilder { @Override public boolean onExchangeCreated(Exchange exchange) { // reset when a new exchange is created - sentTo = false; + sentTo.set(false); return onExchange(exchange); } @Override public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) { if (EndpointHelper.matchEndpoint(context, endpoint.getEndpointUri(), endpointUri)) { - sentTo = true; + sentTo.set(true); } return onExchange(exchange); } @@ -313,7 +315,7 @@ public class NotifyBuilder { @Override public boolean onExchange(Exchange exchange) { // filter only when sentTo - return sentTo; + return sentTo.get(); } public boolean matches() { @@ -323,7 +325,7 @@ public class NotifyBuilder { @Override public void reset() { - sentTo = false; + sentTo.set(false); } @Override @@ -345,21 +347,21 @@ public class NotifyBuilder { */ public NotifyBuilder whenReceived(final int number) { stack.add(new EventPredicateSupport() { - private int current; + private AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeCreated(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } public boolean matches() { - return current >= number; + return current.get() >= number; } @Override public void reset() { - current = 0; + current.set(0); } @Override @@ -384,27 +386,27 @@ public class NotifyBuilder { */ public NotifyBuilder whenDone(final int number) { stack.add(new EventPredicateSupport() { - private int current; + private final AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeCompleted(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } @Override public boolean onExchangeFailed(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } public boolean matches() { - return current >= number; + return current.get() >= number; } @Override public void reset() { - current = 0; + current.set(0); } @Override @@ -426,23 +428,23 @@ public class NotifyBuilder { */ public NotifyBuilder whenDoneByIndex(final int index) { stack.add(new EventPredicateSupport() { - private int current; + private AtomicInteger current = new AtomicInteger(); private String id; - private boolean done; + private AtomicBoolean done = new AtomicBoolean(); @Override public boolean onExchangeCreated(Exchange exchange) { - if (current == index) { + if (current.get() == index) { id = exchange.getExchangeId(); } - current++; + current.incrementAndGet(); return true; } @Override public boolean onExchangeCompleted(Exchange exchange) { if (exchange.getExchangeId().equals(id)) { - done = true; + done.set(false); } return true; } @@ -450,20 +452,20 @@ public class NotifyBuilder { @Override public boolean onExchangeFailed(Exchange exchange) { if (exchange.getExchangeId().equals(id)) { - done = true; + done.set(true); } return true; } public boolean matches() { - return done; + return done.get(); } @Override public void reset() { - current = 0; + current.set(0); id = null; - done = false; + done.set(false); } @Override @@ -488,21 +490,21 @@ public class NotifyBuilder { */ public NotifyBuilder whenCompleted(final int number) { stack.add(new EventPredicateSupport() { - private int current; + private AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeCompleted(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } public boolean matches() { - return current >= number; + return current.get() >= number; } @Override public void reset() { - current = 0; + current.set(0); } @Override @@ -524,21 +526,21 @@ public class NotifyBuilder { */ public NotifyBuilder whenFailed(final int number) { stack.add(new EventPredicateSupport() { - private int current; + private AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeFailed(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } public boolean matches() { - return current >= number; + return current.get() >= number; } @Override public void reset() { - current = 0; + current.set(0); } @Override @@ -559,27 +561,27 @@ public class NotifyBuilder { */ public NotifyBuilder whenExactlyDone(final int number) { stack.add(new EventPredicateSupport() { - private int current; + private AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeCompleted(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } @Override public boolean onExchangeFailed(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } public boolean matches() { - return current == number; + return current.get() == number; } @Override public void reset() { - current = 0; + current.set(0); } @Override @@ -601,21 +603,21 @@ public class NotifyBuilder { */ public NotifyBuilder whenExactlyCompleted(final int number) { stack.add(new EventPredicateSupport() { - private int current; + private AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeCompleted(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } public boolean matches() { - return current == number; + return current.get() == number; } @Override public void reset() { - current = 0; + current.set(0); } @Override @@ -634,21 +636,21 @@ public class NotifyBuilder { */ public NotifyBuilder whenExactlyFailed(final int number) { stack.add(new EventPredicateSupport() { - private int current; + private AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeFailed(Exchange exchange) { - current++; + current.incrementAndGet(); return true; } public boolean matches() { - return current == number; + return current.get() == number; } @Override public void reset() { - current = 0; + current.set(0); } @Override @@ -681,39 +683,39 @@ public class NotifyBuilder { private NotifyBuilder doWhenAnyMatches(final Predicate predicate, final boolean received) { stack.add(new EventPredicateSupport() { - private boolean matches; + private final AtomicBoolean matches = new AtomicBoolean(); @Override public boolean onExchangeCompleted(Exchange exchange) { - if (!received && !matches) { - matches = predicate.matches(exchange); + if (!received && !matches.get()) { + matches.set(predicate.matches(exchange)); } return true; } @Override public boolean onExchangeFailed(Exchange exchange) { - if (!received && !matches) { - matches = predicate.matches(exchange); + if (!received && !matches.get()) { + matches.set(predicate.matches(exchange)); } return true; } @Override public boolean onExchangeCreated(Exchange exchange) { - if (received && !matches) { - matches = predicate.matches(exchange); + if (received && !matches.get()) { + matches.set(predicate.matches(exchange)); } return true; } public boolean matches() { - return matches; + return matches.get(); } @Override public void reset() { - matches = false; + matches.set(false); } @Override @@ -750,39 +752,39 @@ public class NotifyBuilder { private NotifyBuilder doWhenAllMatches(final Predicate predicate, final boolean received) { stack.add(new EventPredicateSupport() { - private boolean matches = true; + private final AtomicBoolean matches = new AtomicBoolean(true); @Override public boolean onExchangeCompleted(Exchange exchange) { - if (!received && matches) { - matches = predicate.matches(exchange); + if (!received && matches.get()) { + matches.set(predicate.matches(exchange)); } return true; } @Override public boolean onExchangeFailed(Exchange exchange) { - if (!received && matches) { - matches = predicate.matches(exchange); + if (!received && matches.get()) { + matches.set(predicate.matches(exchange)); } return true; } @Override public boolean onExchangeCreated(Exchange exchange) { - if (received && matches) { - matches = predicate.matches(exchange); + if (received && matches.get()) { + matches.set(predicate.matches(exchange)); } return true; } public boolean matches() { - return matches; + return matches.get(); } @Override public void reset() { - matches = true; + matches.set(true); } @Override @@ -932,7 +934,6 @@ public class NotifyBuilder { private NotifyBuilder doWhenNotSatisfied(final MockEndpoint mock, final boolean received) { stack.add(new EventPredicateSupport() { - private Producer producer; @Override @@ -1060,8 +1061,8 @@ public class NotifyBuilder { private NotifyBuilder doWhenBodies(final List bodies, final boolean received, final boolean exact) { stack.add(new EventPredicateSupport() { - private boolean matches; - private int current; + private volatile boolean matches; + private final AtomicInteger current = new AtomicInteger(); @Override public boolean onExchangeCreated(Exchange exchange) { @@ -1088,30 +1089,28 @@ public class NotifyBuilder { } private void matchBody(Exchange exchange) { - current++; - - if (current > bodies.size()) { + if (current.incrementAndGet() > bodies.size()) { // out of bounds return; } Object actual = exchange.getIn().getBody(); - Object expected = bodies.get(current - 1); + Object expected = bodies.get(current.get() - 1); matches = ObjectHelper.equal(expected, actual); } public boolean matches() { if (exact) { - return matches && current == bodies.size(); + return matches && current.get() == bodies.size(); } else { - return matches && current >= bodies.size(); + return matches && current.get() >= bodies.size(); } } @Override public void reset() { matches = false; - current = 0; + current.set(0); } @Override