Repository: camel Updated Branches: refs/heads/master 06048c832 -> cb5617e0d
CAMEL-10489: Camel-Nats: Add Flush option with timeout Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb5617e0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb5617e0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb5617e0 Branch: refs/heads/master Commit: cb5617e0d94288b8597ff23cbe69ddd72d86c999 Parents: 06048c8 Author: Andrea Cosentino <anco...@gmail.com> Authored: Thu Nov 17 10:59:11 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Thu Nov 17 10:59:11 2016 +0100 ---------------------------------------------------------------------- .../src/main/docs/nats-component.adoc | 4 ++- .../camel/component/nats/NatsConfiguration.java | 26 ++++++++++++++++++++ .../camel/component/nats/NatsConsumer.java | 6 +++-- .../camel/component/nats/NatsProducer.java | 4 +++ .../camel/component/nats/NatsConsumerTest.java | 4 +-- 5 files changed, 39 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/docs/nats-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc index 2c0839c..3965ef7 100644 --- a/components/camel-nats/src/main/docs/nats-component.adoc +++ b/components/camel-nats/src/main/docs/nats-component.adoc @@ -44,13 +44,15 @@ The Nats component has no options. // endpoint options: START -The Nats component supports 21 endpoint options which are listed below: +The Nats component supports 23 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description | servers | common | | String | *Required* URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers. +| flushConnection | common | false | boolean | Define if we want to flush connection or not +| flushTimeout | common | 1000 | int | Set the flush timeout | maxReconnectAttempts | common | 3 | int | Max reconnection attempts | noRandomizeServers | common | false | boolean | Whether or not randomizing the order of servers for the connection attempts | pedantic | common | false | boolean | Whether or not running in pedantic mode (this affects performace) http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java index 641b24b..dab069e 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java @@ -57,6 +57,10 @@ public class NatsConfiguration { private String maxMessages; @UriParam(label = "consumer", defaultValue = "10") private int poolSize = 10; + @UriParam(label = "common", defaultValue = "false") + private boolean flushConnection; + @UriParam(label = "common", defaultValue = "1000") + private int flushTimeout = 1000; @UriParam(label = "security") private boolean secure; @UriParam(label = "security") @@ -219,6 +223,28 @@ public class NatsConfiguration { this.poolSize = poolSize; } + public boolean isFlushConnection() { + return flushConnection; + } + + /** + * Define if we want to flush connection or not + */ + public void setFlushConnection(boolean flushConnection) { + this.flushConnection = flushConnection; + } + + public int getFlushTimeout() { + return flushTimeout; + } + + /** + * Set the flush timeout + */ + public void setFlushTimeout(int flushTimeout) { + this.flushTimeout = flushTimeout; + } + /** * Set secure option indicating TLS is required */ http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index 37e5171..95bc0e3 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -73,8 +73,10 @@ public class NatsConsumer extends DefaultConsumer { protected void doStop() throws Exception { super.doStop(); - LOG.debug("Flushing Messages before stopping"); - connection.flush(); + if (getEndpoint().getNatsConfiguration().isFlushConnection()) { + LOG.debug("Flushing Messages before stopping"); + connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout()); + } try { sid.unsubscribe(); http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java index 0efafe2..1be13e3 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java @@ -78,6 +78,10 @@ public class NatsProducer extends DefaultProducer { LOG.debug("Closing Nats Connection"); if (connection != null && !connection.isClosed()) { + if (getEndpoint().getNatsConfiguration().isFlushConnection()) { + LOG.debug("Flushing Nats Connection"); + connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout()); + } connection.close(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java index ca63048..24d4877 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java @@ -45,8 +45,8 @@ public class NatsConsumerTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:send").to("nats://localhost:4222?topic=test"); - from("nats://localhost:4222?topic=test").to(mockResultEndpoint); + from("direct:send").to("nats://localhost:4222?topic=test&flushConnection=true"); + from("nats://localhost:4222?topic=test&flushConnection=true").to(mockResultEndpoint); } }; }