Author: ningjiang
Date: Thu Apr 26 04:49:51 2012
New Revision: 1330657
URL: http://svn.apache.org/viewvc?rev=1330657&view=rev
Log:
CAMEL-5170 Add support for delete event in camel-zookeeper
Added:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConfiguration.java
Thu Apr 26 04:49:51 2012
@@ -42,6 +42,7 @@ public class ZooKeeperConfiguration impl
private boolean listChildren;
private boolean shouldCreate;
private String createMode;
+ private boolean sendEmptyMessageOnDelete = true;
public void addZookeeperServer(String server) {
if (servers == null) {
@@ -158,7 +159,13 @@ public class ZooKeeperConfiguration impl
public void setCreateMode(String createMode) {
this.createMode = createMode;
}
-
-
+
+ public boolean isSendEmptyMessageOnDelete() {
+ return sendEmptyMessageOnDelete;
+ }
+
+ public void setSendEmptyMessageOnDelete(boolean sendEmptyMessageOnDelete) {
+ this.sendEmptyMessageOnDelete = sendEmptyMessageOnDelete;
+ }
}
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
Thu Apr 26 04:49:51 2012
@@ -19,8 +19,7 @@ package org.apache.camel.component.zooke
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import static java.lang.String.format;
-
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.zookeeper.operations.AnyOfOperations;
@@ -33,8 +32,10 @@ import org.apache.camel.component.zookee
import org.apache.camel.component.zookeeper.operations.OperationResult;
import org.apache.camel.component.zookeeper.operations.ZooKeeperOperation;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
+
/**
* <code>ZooKeeperConsumer</code> uses various {@link ZooKeeperOperation} to
* interact and consume data from a ZooKeeper cluster.
@@ -55,7 +56,7 @@ public class ZooKeeperConsumer extends D
private ExecutorService executor;
public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) {
- super(endpoint, processor);
+ super((Endpoint)endpoint, processor);
this.connectionManager = endpoint.getConnectionManager();
this.configuration = endpoint.getConfiguration();
}
@@ -65,7 +66,7 @@ public class ZooKeeperConsumer extends D
super.doStart();
connection = connectionManager.getConnection();
if (log.isDebugEnabled()) {
- log.debug(format("Connected to Zookeeper cluster %s",
configuration.getConnectString()));
+ log.debug(String.format("Connected to Zookeeper cluster %s",
configuration.getConnectString()));
}
initializeConsumer();
@@ -80,7 +81,7 @@ public class ZooKeeperConsumer extends D
shuttingDown = true;
connection = connectionManager.getConnection();
if (log.isTraceEnabled()) {
- log.trace(format("Shutting down zookeeper consumer of '%s'",
configuration.getPath()));
+ log.trace(String.format("Shutting down zookeeper consumer of
'%s'", configuration.getPath()));
}
executor.shutdown();
connectionManager.shutdown();
@@ -113,9 +114,9 @@ public class ZooKeeperConsumer extends D
}
}
- private Exchange createExchange(String path, OperationResult result) {
+ private Exchange createExchange(String path, OperationResult result,
WatchedEvent watchedEvent) {
Exchange e = getEndpoint().createExchange();
- ZooKeeperMessage in = new ZooKeeperMessage(path,
result.getStatistics());
+ ZooKeeperMessage in = new ZooKeeperMessage(path,
result.getStatistics(), watchedEvent);
e.setIn(in);
if (result.isOk()) {
in.setBody(result.getResult());
@@ -129,13 +130,15 @@ public class ZooKeeperConsumer extends D
private ZooKeeperOperation current;
+ private WatchedEvent watchedEvent;
+
public void run() {
while (isRunAllowed()) {
try {
current = operations.take();
if (log.isTraceEnabled()) {
- log.trace(format("Processing '%s' operation",
current.getClass().getSimpleName()));
+ log.trace(String.format("Processing '%s' operation",
current.getClass().getSimpleName()));
}
} catch (InterruptedException e) {
continue;
@@ -143,8 +146,12 @@ public class ZooKeeperConsumer extends D
String node = current.getNode();
try {
OperationResult result = current.get();
+ if (ZooKeeperUtils.hasWatchedEvent(current)) {
+ watchedEvent = ZooKeeperUtils.getWatchedEvent(current);
+ }
if (result != null && current.shouldProduceExchange()) {
- getProcessor().process(createExchange(node, result));
+ getProcessor().process(createExchange(node, result,
watchedEvent));
+ watchedEvent = null;
}
} catch (Exception e) {
handleException(e);
@@ -177,7 +184,7 @@ public class ZooKeeperConsumer extends D
operations.clear();
operations.add(new AnyOfOperations(node, new
ExistsOperation(connection, node), new ExistenceChangedOperation(connection,
node)));
operations.add(new GetDataOperation(connection, node));
- operations.add(new DataChangedOperation(connection, node, false));
+ operations.add(new DataChangedOperation(connection, node, false,
configuration.isSendEmptyMessageOnDelete()));
}
private void addBasicChildListingSequence(String node) {
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
Thu Apr 26 04:49:51 2012
@@ -181,5 +181,14 @@ public class ZooKeeperEndpoint extends D
return this;
}
+ @ManagedAttribute
+ public boolean isSendEmptyMessageOnDelete() {
+ return getConfiguration().isSendEmptyMessageOnDelete();
+ }
+
+ @ManagedAttribute
+ public void setSendEmptyMessageOnDelete(boolean sendEmptyMessageOnDelete) {
+
getConfiguration().setSendEmptyMessageOnDelete(sendEmptyMessageOnDelete);
+ }
}
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
Thu Apr 26 04:49:51 2012
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultMessage;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
/**
@@ -43,14 +44,23 @@ public class ZooKeeperMessage extends De
public static final String ZOOKEEPER_STATISTICS =
"CamelZookeeperStatistics";
- public ZooKeeperMessage(String node, Stat statistics) {
- this(node, statistics, Collections.<String, Object>emptyMap());
+ public static final String ZOOKEEPER_EVENT_TYPE =
"CamelZookeeperEventType";
+
+ public ZooKeeperMessage(String node, Stat statistics, WatchedEvent
watchedEvent) {
+ this(node, statistics, Collections.<String, Object>emptyMap(),
watchedEvent);
}
public ZooKeeperMessage(String node, Stat statistics, Map<String, Object>
headers) {
+ this(node, statistics, headers, null);
+ }
+
+ public ZooKeeperMessage(String node, Stat statistics, Map<String, Object>
headers, WatchedEvent watchedEvent) {
setHeaders(headers);
this.setHeader(ZOOKEEPER_NODE, node);
this.setHeader(ZOOKEEPER_STATISTICS, statistics);
+ if (watchedEvent != null) {
+ this.setHeader(ZOOKEEPER_EVENT_TYPE, watchedEvent.getType());
+ }
}
public static Stat getStatistics(Message message) {
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperUtils.java
Thu Apr 26 04:49:51 2012
@@ -20,8 +20,11 @@ import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.component.zookeeper.operations.WatchedEventProvider;
+import org.apache.camel.component.zookeeper.operations.ZooKeeperOperation;
import org.apache.camel.util.ExchangeHelper;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.ACL;
@@ -102,4 +105,16 @@ public final class ZooKeeperUtils {
}
return value;
}
+
+ public static WatchedEvent getWatchedEvent(ZooKeeperOperation
zooKeeperOperation) {
+ WatchedEvent watchedEvent = null;
+ if (zooKeeperOperation instanceof WatchedEventProvider) {
+ watchedEvent =
((WatchedEventProvider)zooKeeperOperation).getWatchedEvent();
+ }
+ return watchedEvent;
+ }
+
+ public static boolean hasWatchedEvent(ZooKeeperOperation
zooKeeperOperation) {
+ return getWatchedEvent(zooKeeperOperation) != null;
+ }
}
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/AnyOfOperations.java
Thu Apr 26 04:49:51 2012
@@ -20,6 +20,9 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.camel.component.zookeeper.ZooKeeperUtils;
+import org.apache.zookeeper.WatchedEvent;
+
/**
* <code>AnyOfOperations</code> is a composite operation of one or more sub
* operation, executing each in turn until any one succeeds. If any execute
@@ -31,9 +34,10 @@ import java.util.concurrent.TimeoutExcep
* if the test operation fails.
*/
@SuppressWarnings("rawtypes")
-public class AnyOfOperations extends ZooKeeperOperation {
+public class AnyOfOperations extends ZooKeeperOperation implements
WatchedEventProvider {
private ZooKeeperOperation[] keeperOperations;
+ private ZooKeeperOperation operationProvidingResult;
public AnyOfOperations(String node, ZooKeeperOperation...
keeperOperations) {
super(null, node, false);
@@ -46,6 +50,7 @@ public class AnyOfOperations extends Zoo
try {
OperationResult result = op.get();
if (result.isOk()) {
+ operationProvidingResult = op;
return result;
}
} catch (Exception e) {
@@ -72,4 +77,9 @@ public class AnyOfOperations extends Zoo
}
return new AnyOfOperations(node, copy);
}
+
+ @Override
+ public WatchedEvent getWatchedEvent() {
+ return ZooKeeperUtils.getWatchedEvent(operationProvidingResult);
+ }
}
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/DataChangedOperation.java
Thu Apr 26 04:49:51 2012
@@ -29,13 +29,19 @@ import org.apache.zookeeper.data.Stat;
@SuppressWarnings("rawtypes")
public class DataChangedOperation extends FutureEventDrivenOperation<byte[]> {
- protected static final Class[] CONSTRUCTOR_ARGS = {ZooKeeper.class,
String.class, boolean.class};
+ protected static final Class[] CONSTRUCTOR_ARGS = {ZooKeeper.class,
String.class, boolean.class, boolean.class};
private boolean getChangedData;
+ private boolean sendEmptyMessageOnDelete;
public DataChangedOperation(ZooKeeper connection, String znode, boolean
getChangedData) {
+ this(connection, znode, getChangedData, false);
+ }
+
+ public DataChangedOperation(ZooKeeper connection, String znode, boolean
getChangedData, boolean sendEmptyMessageOnDelete) {
super(connection, znode, EventType.NodeDataChanged,
EventType.NodeDeleted);
this.getChangedData = getChangedData;
+ this.sendEmptyMessageOnDelete = sendEmptyMessageOnDelete;
}
@Override
@@ -47,11 +53,19 @@ public class DataChangedOperation extend
}
public OperationResult<byte[]> getResult() {
- return getChangedData ? new GetDataOperation(connection,
getNode()).getResult() : null;
+ OperationResult<byte[]> answer;
+ if (EventType.NodeDeleted.equals(getWatchedEvent().getType()) &&
sendEmptyMessageOnDelete) {
+ answer = new OperationResult<byte[]>((byte[])null, null);
+ } else if (getChangedData) {
+ answer = new GetDataOperation(connection, getNode()).getResult();
+ } else {
+ answer = null;
+ }
+ return answer;
}
@Override
public ZooKeeperOperation createCopy() throws Exception {
- return getClass().getConstructor(CONSTRUCTOR_ARGS).newInstance(new
Object[] {connection, node, getChangedData});
+ return getClass().getConstructor(CONSTRUCTOR_ARGS).newInstance(new
Object[] {connection, node, getChangedData, sendEmptyMessageOnDelete});
}
}
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperation.java
Thu Apr 26 04:49:51 2012
@@ -33,7 +33,7 @@ import org.apache.zookeeper.ZooKeeper;
* mechanism to await specific ZooKeeper events. Typically this is used to
await
* changes to a particular node before retrieving the change.
*/
-public abstract class FutureEventDrivenOperation<ResultType> extends
ZooKeeperOperation<ResultType> implements Watcher {
+public abstract class FutureEventDrivenOperation<ResultType> extends
ZooKeeperOperation<ResultType> implements Watcher, WatchedEventProvider {
private EventType[] awaitedTypes;
@@ -96,7 +96,8 @@ public abstract class FutureEventDrivenO
*/
protected abstract void installWatch();
- public WatchedEvent getEvent() {
+ @Override
+ public WatchedEvent getWatchedEvent() {
return event;
}
Added:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java?rev=1330657&view=auto
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
(added)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/WatchedEventProvider.java
Thu Apr 26 04:49:51 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.zookeeper.WatchedEvent;
+
+/**
+ *
+ */
+public interface WatchedEventProvider {
+ WatchedEvent getWatchedEvent();
+}
Modified:
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
Thu Apr 26 04:49:51 2012
@@ -22,10 +22,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.zookeeper.NaturalSortComparator.Order;
import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
@@ -61,11 +63,18 @@ public class ConsumeChildrenTest extends
}
- private void validateExchangesContainListings(MockEndpoint mock,
List<String>... expected) throws InvalidPayloadException {
+ private void validateExchangesContainListings(MockEndpoint mock,
List<String>... expected) throws InvalidPayloadException, NoSuchHeaderException
{
int index = 0;
for (Exchange received : mock.getReceivedExchanges()) {
+ Watcher.Event.EventType expectedEvent;
+ if (index == 0) {
+ expectedEvent = Watcher.Event.EventType.NodeCreated;
+ } else {
+ expectedEvent = Watcher.Event.EventType.NodeChildrenChanged;
+ }
List<String> actual = ExchangeHelper.getMandatoryInBody(received,
List.class);
assertEquals(expected[index++], actual);
+ assertEquals(expectedEvent,
ExchangeHelper.getMandatoryHeader(received,
ZooKeeperMessage.ZOOKEEPER_EVENT_TYPE, Watcher.Event.EventType.class));
validateChildrenCountChangesEachTime(mock);
}
}
Modified:
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
Thu Apr 26 04:49:51 2012
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.junit.Test;
public class ConsumeDataTest extends ZooKeeperTestSupport {
@@ -36,17 +37,43 @@ public class ConsumeDataTest extends Zoo
@Test
public void shouldAwaitCreationAndGetDataNotification() throws Exception {
+ EventType[] expectedEventTypes = new EventType[] {
+ EventType.NodeCreated,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDataChanged,
+ EventType.NodeDeleted
+ };
MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
- mock.expectedMinimumMessageCount(10);
+ mock.expectedMessageCount(expectedEventTypes.length);
createCamelNode();
+
updateNode(10);
+ delay(200);
+ client.delete("/camel");
+
mock.await(5, TimeUnit.SECONDS);
mock.assertIsSatisfied();
- validateExchangesReceivedInOrderWithIncreasingVersion(mock);
+ int lastVersion = -1;
+ for (int i = 0; i < mock.getExchanges().size(); i++) {
+ assertEquals(expectedEventTypes[i],
mock.getExchanges().get(i).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_EVENT_TYPE));
+ if (!EventType.NodeDeleted.equals(expectedEventTypes[i])) {
+ // As a delete event does not carry statistics, ignore it in
the version check.
+ int version =
ZooKeeperMessage.getStatistics(mock.getExchanges().get(i).getIn()).getVersion();
+ assertTrue("Version did not increase", lastVersion < version);
+ lastVersion = version;
+ }
+ }
}
@Test
Modified:
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java?rev=1330657&r1=1330656&r2=1330657&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/FutureEventDrivenOperationTest.java
Thu Apr 26 04:49:51 2012
@@ -46,7 +46,7 @@ public class FutureEventDrivenOperationT
fireEventIn(future, event, 100);
assertEquals(data, future.get().getResult());
assertEquals(statistics, future.get().getStatistics());
- assertEquals(event, future.getEvent());
+ assertEquals(event, future.getWatchedEvent());
}
private void fireEventIn(final FutureEventDrivenOperation<String> future,
final WatchedEvent event, final int millisecondsTillFire) {