CAMEL-9962: Add a field in the consumer to define if it is subscribed to the 
topic or not


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd895e15
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd895e15
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd895e15

Branch: refs/heads/kube-lb
Commit: cd895e15336cad4ec42b0ad50e89e2794e2f038e
Parents: 016147d
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Thu May 12 13:54:36 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 16 09:59:33 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/component/nats/NatsConsumer.java  | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cd895e15/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index d0abb36..8fc2eff 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -42,6 +42,7 @@ public class NatsConsumer extends DefaultConsumer {
     private ExecutorService executor;
     private Connection connection;
     private Subscription sid;
+    private boolean subscribed;
 
     public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -101,6 +102,14 @@ public class NatsConsumer extends DefaultConsumer {
         return connection;
     }
 
+    public boolean isSubscribed() {
+        return subscribed;
+    }
+
+    public void setSubscribed(boolean subscribed) {
+        this.subscribed = subscribed;
+    }
+
     class NatsConsumingTask implements Runnable {
 
         private final Connection connection;
@@ -133,6 +142,9 @@ public class NatsConsumer extends DefaultConsumer {
                     if 
(ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages()))
 {
                         
sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
                     }
+                    if (sid.isValid()) {
+                        setSubscribed(true);
+                    }
                 } else {
                     sid = 
connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new 
MessageHandler() {
                         @Override
@@ -151,7 +163,10 @@ public class NatsConsumer extends DefaultConsumer {
                     });
                     if 
(ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages()))
 {
                         
sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
-                    }    
+                    }
+                    if (sid.isValid()) {
+                        setSubscribed(true);
+                    }
                 }
             } catch (Throwable e) {
                 getExceptionHandler().handleException("Error during 
processing", e);

Reply via email to