CAMEL-9962: Add a field in the consumer to define if it is subscribed to the topic or not
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd895e15 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd895e15 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd895e15 Branch: refs/heads/kube-lb Commit: cd895e15336cad4ec42b0ad50e89e2794e2f038e Parents: 016147d Author: Andrea Cosentino <anco...@gmail.com> Authored: Thu May 12 13:54:36 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 16 09:59:33 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/nats/NatsConsumer.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cd895e15/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index d0abb36..8fc2eff 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -42,6 +42,7 @@ public class NatsConsumer extends DefaultConsumer { private ExecutorService executor; private Connection connection; private Subscription sid; + private boolean subscribed; public NatsConsumer(NatsEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -101,6 +102,14 @@ public class NatsConsumer extends DefaultConsumer { return connection; } + public boolean isSubscribed() { + return subscribed; + } + + public void setSubscribed(boolean subscribed) { + this.subscribed = subscribed; + } + class NatsConsumingTask implements Runnable { private final Connection connection; @@ -133,6 +142,9 @@ public class NatsConsumer extends DefaultConsumer { if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); } + if (sid.isValid()) { + setSubscribed(true); + } } else { sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() { @Override @@ -151,7 +163,10 @@ public class NatsConsumer extends DefaultConsumer { }); if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); - } + } + if (sid.isValid()) { + setSubscribed(true); + } } } catch (Throwable e) { getExceptionHandler().handleException("Error during processing", e);