Author: ningjiang
Date: Thu Apr 26 04:49:51 2012
New Revision: 1330657

URL: http://svn.apache.org/viewvc?rev=1330657&view=rev
Log:
CAMEL-5170 Add support for delete event in camel-zookeeper

Added:
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
Modified:
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
    
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
    
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
    
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
    
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
 Thu Apr 26 04:49:51 2012
@@ -42,6 +42,7 @@ public class ZooKeeperConfiguration impl
     private boolean listChildren;
     private boolean shouldCreate;
     private String createMode;
+    private boolean sendEmptyMessageOnDelete = true;
 
     public void addZookeeperServer(String server) {
         if (servers == null) {
@@ -158,7 +159,13 @@ public class ZooKeeperConfiguration impl
     public void setCreateMode(String createMode) {
         this.createMode = createMode;
     }
-    
-    
+
+    public boolean isSendEmptyMessageOnDelete() {
+        return sendEmptyMessageOnDelete;
+    }
+
+    public void setSendEmptyMessageOnDelete(boolean sendEmptyMessageOnDelete) {
+        this.sendEmptyMessageOnDelete = sendEmptyMessageOnDelete;
+    }
 
 }

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
 Thu Apr 26 04:49:51 2012
@@ -19,8 +19,7 @@ package org.apache.camel.component.zooke
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static java.lang.String.format;
-
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.zookeeper.operations.AnyOfOperations;
@@ -33,8 +32,10 @@ import org.apache.camel.component.zookee
 import org.apache.camel.component.zookeeper.operations.OperationResult;
 import org.apache.camel.component.zookeeper.operations.ZooKeeperOperation;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 
+
 /**
  * <code>ZooKeeperConsumer</code> uses various {@link ZooKeeperOperation} to
  * interact and consume data from a ZooKeeper cluster.
@@ -55,7 +56,7 @@ public class ZooKeeperConsumer extends D
     private ExecutorService executor;
 
     public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) {
-        super(endpoint, processor);
+        super((Endpoint)endpoint, processor);
         this.connectionManager = endpoint.getConnectionManager();
         this.configuration = endpoint.getConfiguration();
     }
@@ -65,7 +66,7 @@ public class ZooKeeperConsumer extends D
         super.doStart();
         connection = connectionManager.getConnection();
         if (log.isDebugEnabled()) {
-            log.debug(format("Connected to Zookeeper cluster %s", 
configuration.getConnectString()));
+            log.debug(String.format("Connected to Zookeeper cluster %s", 
configuration.getConnectString()));
         }
 
         initializeConsumer();
@@ -80,7 +81,7 @@ public class ZooKeeperConsumer extends D
         shuttingDown = true;
         connection = connectionManager.getConnection();
         if (log.isTraceEnabled()) {
-            log.trace(format("Shutting down zookeeper consumer of '%s'", 
configuration.getPath()));
+            log.trace(String.format("Shutting down zookeeper consumer of 
'%s'", configuration.getPath()));
         }
         executor.shutdown();
         connectionManager.shutdown();
@@ -113,9 +114,9 @@ public class ZooKeeperConsumer extends D
         }
     }
 
-    private Exchange createExchange(String path, OperationResult result) {
+    private Exchange createExchange(String path, OperationResult result, 
WatchedEvent watchedEvent) {
         Exchange e = getEndpoint().createExchange();
-        ZooKeeperMessage in = new ZooKeeperMessage(path, 
result.getStatistics());
+        ZooKeeperMessage in = new ZooKeeperMessage(path, 
result.getStatistics(), watchedEvent);
         e.setIn(in);
         if (result.isOk()) {
             in.setBody(result.getResult());
@@ -129,13 +130,15 @@ public class ZooKeeperConsumer extends D
 
         private ZooKeeperOperation current;
 
+        private WatchedEvent watchedEvent;
+
         public void run() {
             while (isRunAllowed()) {
 
                 try {
                     current = operations.take();
                     if (log.isTraceEnabled()) {
-                        log.trace(format("Processing '%s' operation", 
current.getClass().getSimpleName()));
+                        log.trace(String.format("Processing '%s' operation", 
current.getClass().getSimpleName()));
                     }
                 } catch (InterruptedException e) {
                     continue;
@@ -143,8 +146,12 @@ public class ZooKeeperConsumer extends D
                 String node = current.getNode();
                 try {
                     OperationResult result = current.get();
+                    if (ZooKeeperUtils.hasWatchedEvent(current)) {
+                        watchedEvent = ZooKeeperUtils.getWatchedEvent(current);
+                    }
                     if (result != null && current.shouldProduceExchange()) {
-                        getProcessor().process(createExchange(node, result));
+                        getProcessor().process(createExchange(node, result, 
watchedEvent));
+                        watchedEvent = null;
                     }
                 } catch (Exception e) {
                     handleException(e);
@@ -177,7 +184,7 @@ public class ZooKeeperConsumer extends D
         operations.clear();
         operations.add(new AnyOfOperations(node, new 
ExistsOperation(connection, node), new ExistenceChangedOperation(connection, 
node)));
         operations.add(new GetDataOperation(connection, node));
-        operations.add(new DataChangedOperation(connection, node, false));
+        operations.add(new DataChangedOperation(connection, node, false, 
configuration.isSendEmptyMessageOnDelete()));
     }
 
     private void addBasicChildListingSequence(String node) {

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
 Thu Apr 26 04:49:51 2012
@@ -181,5 +181,14 @@ public class ZooKeeperEndpoint extends D
         return this;
     }
 
+    @ManagedAttribute
+    public boolean isSendEmptyMessageOnDelete() {
+        return getConfiguration().isSendEmptyMessageOnDelete();
+    }
+
+    @ManagedAttribute
+    public void setSendEmptyMessageOnDelete(boolean sendEmptyMessageOnDelete) {
+        
getConfiguration().setSendEmptyMessageOnDelete(sendEmptyMessageOnDelete);
+    }
 
 }

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
 Thu Apr 26 04:49:51 2012
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.Stat;
 
 /**
@@ -43,14 +44,23 @@ public class ZooKeeperMessage extends De
 
     public static final String ZOOKEEPER_STATISTICS = 
"CamelZookeeperStatistics";
 
-    public ZooKeeperMessage(String node, Stat statistics) {
-        this(node, statistics, Collections.<String, Object>emptyMap());
+    public static final String ZOOKEEPER_EVENT_TYPE = 
"CamelZookeeperEventType";
+
+    public ZooKeeperMessage(String node, Stat statistics, WatchedEvent 
watchedEvent) {
+        this(node, statistics, Collections.<String, Object>emptyMap(), 
watchedEvent);
     }
 
     public ZooKeeperMessage(String node, Stat statistics, Map<String, Object> 
headers) {
+        this(node, statistics, headers, null);
+    }
+
+    public ZooKeeperMessage(String node, Stat statistics, Map<String, Object> 
headers, WatchedEvent watchedEvent) {
         setHeaders(headers);
         this.setHeader(ZOOKEEPER_NODE, node);
         this.setHeader(ZOOKEEPER_STATISTICS, statistics);
+        if (watchedEvent != null) {
+            this.setHeader(ZOOKEEPER_EVENT_TYPE, watchedEvent.getType());
+        }
     }
 
     public static Stat getStatistics(Message message) {

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
 Thu Apr 26 04:49:51 2012
@@ -20,8 +20,11 @@ import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.component.zookeeper.operations.WatchedEventProvider;
+import org.apache.camel.component.zookeeper.operations.ZooKeeperOperation;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.ACL;
 
@@ -102,4 +105,16 @@ public final class ZooKeeperUtils {
         }
         return value;
     }
+
+    public static WatchedEvent getWatchedEvent(ZooKeeperOperation 
zooKeeperOperation) {
+        WatchedEvent watchedEvent = null;
+        if (zooKeeperOperation instanceof WatchedEventProvider) {
+            watchedEvent = 
((WatchedEventProvider)zooKeeperOperation).getWatchedEvent();
+        }
+        return watchedEvent;
+    }
+
+    public static boolean hasWatchedEvent(ZooKeeperOperation 
zooKeeperOperation) {
+        return getWatchedEvent(zooKeeperOperation) != null;
+    }
 }

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
 Thu Apr 26 04:49:51 2012
@@ -20,6 +20,9 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.camel.component.zookeeper.ZooKeeperUtils;
+import org.apache.zookeeper.WatchedEvent;
+
 /**
  * <code>AnyOfOperations</code> is a composite operation of one or more sub
  * operation, executing each in turn until any one succeeds. If any execute
@@ -31,9 +34,10 @@ import java.util.concurrent.TimeoutExcep
  * if the test operation fails.
  */
 @SuppressWarnings("rawtypes")
-public class AnyOfOperations extends ZooKeeperOperation {
+public class AnyOfOperations extends ZooKeeperOperation implements 
WatchedEventProvider {
 
     private ZooKeeperOperation[] keeperOperations;
+    private ZooKeeperOperation operationProvidingResult;
 
     public AnyOfOperations(String node, ZooKeeperOperation... 
keeperOperations) {
         super(null, node, false);
@@ -46,6 +50,7 @@ public class AnyOfOperations extends Zoo
             try {
                 OperationResult result = op.get();
                 if (result.isOk()) {
+                    operationProvidingResult = op;
                     return result;
                 }
             } catch (Exception e) {
@@ -72,4 +77,9 @@ public class AnyOfOperations extends Zoo
         }
         return new AnyOfOperations(node, copy);
     }
+
+    @Override
+    public WatchedEvent getWatchedEvent() {
+        return ZooKeeperUtils.getWatchedEvent(operationProvidingResult);
+    }
 }

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
 Thu Apr 26 04:49:51 2012
@@ -29,13 +29,19 @@ import org.apache.zookeeper.data.Stat;
 @SuppressWarnings("rawtypes")
 public class DataChangedOperation extends FutureEventDrivenOperation<byte[]> {
 
-    protected static final Class[] CONSTRUCTOR_ARGS = {ZooKeeper.class, 
String.class, boolean.class};
+    protected static final Class[] CONSTRUCTOR_ARGS = {ZooKeeper.class, 
String.class, boolean.class, boolean.class};
     
     private boolean getChangedData;
+    private boolean sendEmptyMessageOnDelete;
 
     public DataChangedOperation(ZooKeeper connection, String znode, boolean 
getChangedData) {
+        this(connection, znode, getChangedData, false);
+    }
+
+    public DataChangedOperation(ZooKeeper connection, String znode, boolean 
getChangedData, boolean sendEmptyMessageOnDelete) {
         super(connection, znode, EventType.NodeDataChanged, 
EventType.NodeDeleted);
         this.getChangedData = getChangedData;
+        this.sendEmptyMessageOnDelete = sendEmptyMessageOnDelete;
     }
 
     @Override
@@ -47,11 +53,19 @@ public class DataChangedOperation extend
     }
 
     public OperationResult<byte[]> getResult() {
-        return getChangedData ? new GetDataOperation(connection, 
getNode()).getResult() : null;
+        OperationResult<byte[]> answer;
+        if (EventType.NodeDeleted.equals(getWatchedEvent().getType()) && 
sendEmptyMessageOnDelete) {
+            answer = new OperationResult<byte[]>((byte[])null, null);
+        } else if (getChangedData) {
+            answer = new GetDataOperation(connection, getNode()).getResult();
+        } else {
+            answer = null;
+        }
+        return answer;
     }
 
     @Override
     public ZooKeeperOperation createCopy() throws Exception {
-        return getClass().getConstructor(CONSTRUCTOR_ARGS).newInstance(new 
Object[] {connection, node, getChangedData});
+        return getClass().getConstructor(CONSTRUCTOR_ARGS).newInstance(new 
Object[] {connection, node, getChangedData, sendEmptyMessageOnDelete});
     }
 }

Modified: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
 Thu Apr 26 04:49:51 2012
@@ -33,7 +33,7 @@ import org.apache.zookeeper.ZooKeeper;
  * mechanism to await specific ZooKeeper events. Typically this is used to 
await
  * changes to a particular node before retrieving the change.
  */
-public abstract class FutureEventDrivenOperation<ResultType> extends 
ZooKeeperOperation<ResultType> implements Watcher {
+public abstract class FutureEventDrivenOperation<ResultType> extends 
ZooKeeperOperation<ResultType> implements Watcher, WatchedEventProvider {
 
     private EventType[] awaitedTypes;
 
@@ -96,7 +96,8 @@ public abstract class FutureEventDrivenO
      */
     protected abstract void installWatch();
 
-    public WatchedEvent getEvent() {
+    @Override
+    public WatchedEvent getWatchedEvent() {
         return event;
     }
 

Added: 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java?rev=1330657&view=auto
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
 (added)
+++ 
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
 Thu Apr 26 04:49:51 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.zookeeper.WatchedEvent;
+
+/**
+ *
+ */
+public interface WatchedEventProvider {
+    WatchedEvent getWatchedEvent();
+}

Modified: 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
 Thu Apr 26 04:49:51 2012
@@ -22,10 +22,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.zookeeper.NaturalSortComparator.Order;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Test;
 
@@ -61,11 +63,18 @@ public class ConsumeChildrenTest extends
 
     }
 
-    private void validateExchangesContainListings(MockEndpoint mock, 
List<String>... expected) throws InvalidPayloadException {
+    private void validateExchangesContainListings(MockEndpoint mock, 
List<String>... expected) throws InvalidPayloadException, NoSuchHeaderException 
{
         int index = 0;
         for (Exchange received : mock.getReceivedExchanges()) {
+            Watcher.Event.EventType expectedEvent;
+            if (index == 0) {
+                expectedEvent = Watcher.Event.EventType.NodeCreated;
+            } else {
+                expectedEvent = Watcher.Event.EventType.NodeChildrenChanged;
+            }
             List<String> actual = ExchangeHelper.getMandatoryInBody(received, 
List.class);
             assertEquals(expected[index++], actual);
+            assertEquals(expectedEvent, 
ExchangeHelper.getMandatoryHeader(received,  
ZooKeeperMessage.ZOOKEEPER_EVENT_TYPE, Watcher.Event.EventType.class));
             validateChildrenCountChangesEachTime(mock);
         }
     }

Modified: 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
 Thu Apr 26 04:49:51 2012
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.junit.Test;
 
 public class ConsumeDataTest extends ZooKeeperTestSupport {
@@ -36,17 +37,43 @@ public class ConsumeDataTest extends Zoo
 
     @Test
     public void shouldAwaitCreationAndGetDataNotification() throws Exception {
+        EventType[] expectedEventTypes = new EventType[] {
+            EventType.NodeCreated,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDataChanged,
+            EventType.NodeDeleted
+        };
 
         MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
-        mock.expectedMinimumMessageCount(10);
+        mock.expectedMessageCount(expectedEventTypes.length);
 
         createCamelNode();
+
         updateNode(10);
 
+        delay(200);
+        client.delete("/camel");
+
         mock.await(5, TimeUnit.SECONDS);
         mock.assertIsSatisfied();
 
-        validateExchangesReceivedInOrderWithIncreasingVersion(mock);
+        int lastVersion = -1;
+        for (int i = 0; i < mock.getExchanges().size(); i++) {
+            assertEquals(expectedEventTypes[i], 
mock.getExchanges().get(i).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_EVENT_TYPE));
+            if (!EventType.NodeDeleted.equals(expectedEventTypes[i])) {
+                // As a delete event does not carry statistics, ignore it in 
the version check.
+                int version = 
ZooKeeperMessage.getStatistics(mock.getExchanges().get(i).getIn()).getVersion();
+                assertTrue("Version did not increase", lastVersion < version);
+                lastVersion = version;
+            }
+        }
     }
 
     @Test

Modified: 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
--- 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java
 (original)
+++ 
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java
 Thu Apr 26 04:49:51 2012
@@ -46,7 +46,7 @@ public class FutureEventDrivenOperationT
         fireEventIn(future, event, 100);
         assertEquals(data, future.get().getResult());
         assertEquals(statistics, future.get().getStatistics());
-        assertEquals(event, future.getEvent());
+        assertEquals(event, future.getWatchedEvent());
     }
 
     private void fireEventIn(final FutureEventDrivenOperation<String> future, 
final WatchedEvent event, final int millisecondsTillFire) {


Reply via email to