CAMEL-9803: Camel-NATS: Switch to Jnats client as Java_nats is deprecated
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66f0fe84 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66f0fe84 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66f0fe84 Branch: refs/heads/master Commit: 66f0fe84c5a4dece014660ce4ebaf3e01fac94ec Parents: 431ee2b Author: Andrea Cosentino <anco...@gmail.com> Authored: Sun Apr 3 12:02:38 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sun Apr 3 12:19:54 2016 +0200 ---------------------------------------------------------------------- components/camel-nats/pom.xml | 6 +- .../camel/component/nats/NatsConfiguration.java | 12 +-- .../camel/component/nats/NatsConsumer.java | 78 ++++++++++++++------ .../camel/component/nats/NatsProducer.java | 12 ++- .../component/nats/NatsPropertiesConstants.java | 16 ++-- .../component/nats/NatsConsumerLoadTest.java | 14 ++-- .../nats/NatsConsumerMaxMessagesQueueTest.java | 2 +- .../nats/NatsConsumerMaxMessagesTest.java | 2 +- .../camel/component/nats/NatsConsumerTest.java | 2 +- .../camel/component/nats/NatsProducerTest.java | 1 + parent/pom.xml | 3 +- .../features/src/main/resources/features.xml | 2 +- 12 files changed, 96 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-nats/pom.xml b/components/camel-nats/pom.xml index 6ca8079..c2fd1dc 100644 --- a/components/camel-nats/pom.xml +++ b/components/camel-nats/pom.xml @@ -34,9 +34,9 @@ <artifactId>camel-core</artifactId> </dependency> <dependency> - <groupId>com.github.tyagihas</groupId> - <artifactId>java_nats</artifactId> - <version>${java-nats-version}</version> + <groupId>io.nats</groupId> + <artifactId>jnats</artifactId> + <version>${jnats-version}</version> </dependency> <!-- testing --> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java index 260d1a7..1618eb9 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java @@ -218,12 +218,12 @@ public class NatsConfiguration { return props; } - public Properties createSubProperties() { - Properties props = new Properties(); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages()); - return props; - } +// public Properties createSubProperties() { +// Properties props = new Properties(); +// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName()); +// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages()); +// return props; +// } private String splitServers() { StringBuilder servers = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index 9c8a29d..8be0aea 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -19,15 +19,21 @@ package org.apache.camel.component.nats; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; -import org.nats.Connection; -import org.nats.MsgHandler; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.nats.client.Connection; +import io.nats.client.ConnectionFactory; +import io.nats.client.Message; +import io.nats.client.MessageHandler; +import io.nats.client.Subscription; + public class NatsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class); @@ -35,7 +41,7 @@ public class NatsConsumer extends DefaultConsumer { private final Processor processor; private ExecutorService executor; private Connection connection; - private int sid; + private Subscription sid; public NatsConsumer(NatsEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -67,7 +73,7 @@ public class NatsConsumer extends DefaultConsumer { connection.flush(); try { - connection.unsubscribe(sid); + sid.unsubscribe(); } catch (Exception e) { getExceptionHandler().handleException("Error during unsubscribing", e); } @@ -83,14 +89,15 @@ public class NatsConsumer extends DefaultConsumer { executor = null; LOG.debug("Closing Nats Connection"); - if (connection.isConnected()) { + if (!connection.isClosed()) { connection.close(); } } - private Connection getConnection() throws IOException, InterruptedException { + private Connection getConnection() throws IOException, InterruptedException, TimeoutException { Properties prop = getEndpoint().getNatsConfiguration().createProperties(); - connection = Connection.connect(prop); + ConnectionFactory factory = new ConnectionFactory(prop); + connection = factory.createConnection(); return connection; } @@ -107,23 +114,50 @@ public class NatsConsumer extends DefaultConsumer { @Override public void run() { try { - sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), configuration.createSubProperties(), new MsgHandler() { - public void execute(String msg) { - LOG.debug("Received Message: {}", msg); - Exchange exchange = getEndpoint().createExchange(); - exchange.getIn().setBody(msg); - exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); - exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid); - try { - processor.process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error during processing", exchange, e); + if (ObjectHelper.isNotEmpty(configuration.getQueueName())) { + sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() { + + @Override + public void onMessage(Message msg) { + LOG.debug("Received Message: {}", msg); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(msg); + exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); + exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } } + }); + if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { + sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); } - }); - } catch (Throwable e) { - getExceptionHandler().handleException("Error during processing", e); - } + } else { + sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() { + + @Override + public void onMessage(Message msg) { + LOG.debug("Received Message: {}", msg); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(msg); + exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); + exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + }); + if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { + sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); + } + } + } catch (Throwable e) { + getExceptionHandler().handleException("Error during processing", e); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java index 89b2b23..2e92f44 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java @@ -18,13 +18,16 @@ package org.apache.camel.component.nats; import java.io.IOException; import java.util.Properties; +import java.util.concurrent.TimeoutException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; -import org.nats.Connection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.nats.client.Connection; +import io.nats.client.ConnectionFactory; + public class NatsProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class); @@ -64,14 +67,15 @@ public class NatsProducer extends DefaultProducer { LOG.debug("Stopping Nats Producer"); LOG.debug("Closing Nats Connection"); - if (connection != null && connection.isConnected()) { + if (connection != null && !connection.isClosed()) { connection.close(); } } - private Connection getConnection() throws IOException, InterruptedException { + private Connection getConnection() throws TimeoutException, IOException { Properties prop = getEndpoint().getNatsConfiguration().createProperties(); - connection = Connection.connect(prop); + ConnectionFactory factory = new ConnectionFactory(prop); + connection = factory.createConnection(); return connection; } http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java index 8c09ce8..2e09361 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java @@ -18,14 +18,14 @@ package org.apache.camel.component.nats; public interface NatsPropertiesConstants { String NATS_PROPERTY_URI = "uri"; - String NATS_PROPERTY_VERBOSE = "verbose"; - String NATS_PROPERTY_PEDANTIC = "pedantic"; - String NATS_PROPERTY_RECONNECT = "reconnect"; - String NATS_PROPERTY_SSL = "ssl"; - String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "max_reconnect_attempts"; - String NATS_PROPERTY_RECONNECT_TIME_WAIT = "reconnect_time_wait"; - String NATS_PROPERTY_PING_INTERVAL = "ping_interval"; - String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "dont_randomize_servers"; + String NATS_PROPERTY_VERBOSE = "io.nats.client.verbose"; + String NATS_PROPERTY_PEDANTIC = "io.nats.client.pedantic"; + String NATS_PROPERTY_RECONNECT = "io.nats.client.reconnect.allowed"; + String NATS_PROPERTY_SSL = "io.nats.client.secure"; + String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "io.nats.client.reconnect.max"; + String NATS_PROPERTY_RECONNECT_TIME_WAIT = "io.nats.client.reconnect.wait"; + String NATS_PROPERTY_PING_INTERVAL = "io.nats.client.pinginterval"; + String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "io.nats.client.norandomize"; String NATS_PROPERTY_QUEUE = "queue"; String NATS_PROPERTY_MAX_MESSAGES = "max"; } http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java index 67d1e7c..87d0c3e 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.nats; import java.io.IOException; import java.util.Properties; +import java.util.concurrent.TimeoutException; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; @@ -25,7 +26,9 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Ignore; import org.junit.Test; -import org.nats.Connection; + +import io.nats.client.Connection; +import io.nats.client.ConnectionFactory; @Ignore("Require a running Nats server") public class NatsConsumerLoadTest extends CamelTestSupport { @@ -34,11 +37,11 @@ public class NatsConsumerLoadTest extends CamelTestSupport { protected MockEndpoint mockResultEndpoint; @Test - public void testLoadConsumer() throws InterruptedException, IOException { + public void testLoadConsumer() throws InterruptedException, IOException, TimeoutException { mockResultEndpoint.setExpectedMessageCount(10000); - - Connection connection = Connection.connect(new Properties()); - + ConnectionFactory cf = new ConnectionFactory("nats://localhost:4222"); + Connection connection = cf.createConnection(); + for (int i = 0; i < 10000; i++) { connection.publish("test", ("test" + i).getBytes()); } @@ -51,6 +54,7 @@ public class NatsConsumerLoadTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { + from("direct:send").to("nats://localhost:4222?topic=test"); from("nats://localhost:4222?topic=test").to(mockResultEndpoint); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java index c637cef..b69a6b7 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java @@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesQueueTest extends CamelTestSupport { @Test public void testMaxConsumer() throws InterruptedException, IOException { - mockResultEndpoint.expectedBodiesReceivedInAnyOrder("test", "test1"); + mockResultEndpoint.expectedBodiesReceivedInAnyOrder("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}"); mockResultEndpoint.setExpectedMessageCount(2); template.sendBody("direct:send", "test"); http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java index 6e7482e..5ee94d9 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java @@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport { @Test public void testMaxConsumer() throws InterruptedException, IOException { - mockResultEndpoint.expectedBodiesReceived("test", "test1", "test2", "test3", "test4"); + mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}"); mockResultEndpoint.setExpectedMessageCount(5); template.sendBody("direct:send", "test"); template.sendBody("direct:send", "test1"); http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java index c689ade..ca63048 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java @@ -34,7 +34,7 @@ public class NatsConsumerTest extends CamelTestSupport { @Test public void testConsumer() throws InterruptedException, IOException { mockResultEndpoint.expectedMessageCount(1); - mockResultEndpoint.expectedBodiesReceived("test"); + mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}"); template.requestBody("direct:send", "test"); mockResultEndpoint.assertIsSatisfied(); http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java index 8f10b4c..4a22551 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java @@ -26,6 +26,7 @@ public class NatsProducerTest extends CamelTestSupport { @Test public void sendTest() throws Exception { + template.sendBody("direct:send", "pippo"); } http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index b6dde19..386a760 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -267,8 +267,7 @@ <java-apns-bundle-version>1.0.0.Beta6_1</java-apns-bundle-version> <java-apns-version>1.0.0.Beta6</java-apns-version> <java-ewah-version>0.7.9</java-ewah-version> - <java-nats-version>0.5.2</java-nats-version> - <java-nats-bundle-version>0.5.2_1</java-nats-bundle-version> + <jnats-version>0.4.0</jnats-version> <javacc-maven-plugin-version>2.6</javacc-maven-plugin-version> <javacrumbs-version>0.22</javacrumbs-version> <javassist-bundle-version>3.12.1.GA_3</javassist-bundle-version> http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/platforms/karaf/features/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml index dd084e9..e5c0ea9 100644 --- a/platforms/karaf/features/src/main/resources/features.xml +++ b/platforms/karaf/features/src/main/resources/features.xml @@ -1152,7 +1152,7 @@ </feature> <feature name='camel-nats' version='${project.version}' resolver='(obr)' start-level='50'> <feature version='${project.version}'>camel-core</feature> - <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.java_nats/${java-nats-bundle-version}</bundle> + <bundle dependency='true'>wrap:mvn:io.nats/jnats/${jnats-version}</bundle> <bundle>mvn:org.apache.camel/camel-nats/${project.version}</bundle> </feature> <feature name='camel-netty' version='${project.version}' resolver='(obr)' start-level='50'>