This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new c804906 Upgrade Apache Pulsar c804906 is described below commit c80490699134378c49611f66275cba890779b05d Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri May 31 07:19:43 2019 +0200 Upgrade Apache Pulsar --- .../camel/component/pulsar/PulsarComponent.java | 14 +++----------- .../pulsar/configuration/PulsarConfiguration.java | 4 ++-- .../camel/component/pulsar/utils/PulsarUtils.java | 20 +++++++++++++++++--- .../camel/component/pulsar/PulsarTestSupport.java | 2 +- parent/pom.xml | 4 ++-- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java index e726203..e02de5c 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java @@ -30,19 +30,11 @@ import org.apache.pulsar.client.api.PulsarClient; @Component("pulsar") public class PulsarComponent extends DefaultComponent { - @Metadata + @Metadata(label = "advanced") private AutoConfiguration autoConfiguration; - @Metadata + @Metadata(label = "advanced") private PulsarClient pulsarClient; - public PulsarComponent() { - this(null); - } - - public PulsarComponent(CamelContext context) { - super(context); - } - @Override protected Endpoint createEndpoint(final String uri, final String path, final Map<String, Object> parameters) throws Exception { final PulsarConfiguration configuration = new PulsarConfiguration(); @@ -64,7 +56,7 @@ public class PulsarComponent extends DefaultComponent { } /** - * The pulsar autoconfiguration + * The pulsar auto configuration */ public void setAutoConfiguration(AutoConfiguration autoConfiguration) { this.autoConfiguration = autoConfiguration; diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java index 5e76604..8834164 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java @@ -25,9 +25,9 @@ import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType @UriParams public class PulsarConfiguration { - @UriParam(label = "consumer", defaultValue = "subscription") + @UriParam(label = "consumer", defaultValue = "subs") private String subscriptionName = "subs"; - @UriParam(label = "consumer", enums = "EXCLUSIVE, SHARED, FAILOVER", defaultValue = "EXCLUSIVE") + @UriParam(label = "consumer", defaultValue = "EXCLUSIVE") private SubscriptionType subscriptionType = EXCLUSIVE; @UriParam(label = "consumer", defaultValue = "1") private int numberOfConsumers = 1; diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java index a4bd1b3..7698152 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java @@ -18,11 +18,16 @@ package org.apache.camel.component.pulsar.utils; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; + import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class PulsarUtils { - + + private static final Logger LOG = LoggerFactory.getLogger(PulsarUtils.class); + private PulsarUtils() { } @@ -30,8 +35,17 @@ public final class PulsarUtils { while (!consumers.isEmpty()) { Consumer<byte[]> consumer = consumers.poll(); if (consumer != null) { - consumer.unsubscribe(); - consumer.close(); + try { + consumer.unsubscribe(); + consumer.close(); + } catch (Exception e) { + // ignore during stopping + if (e instanceof PulsarClientException.AlreadyClosedException) { + // ignore + } else { + LOG.debug("Error stopping consumer: " + consumer + " due to " + e.getMessage() + ". This exception is ignored", e); + } + } } } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java index 1826a2c..ad84334 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java @@ -22,7 +22,7 @@ import org.testcontainers.containers.GenericContainer; public class PulsarTestSupport extends ContainerAwareTestSupport { - public static final String CONTAINER_IMAGE = "apachepulsar/pulsar:2.2.0"; + public static final String CONTAINER_IMAGE = "apachepulsar/pulsar:2.3.1"; public static final String CONTAINER_NAME = "pulsar"; public static final int BROKER_PORT = 6650; public static final int BROKER_HTTP_PORT = 8080; diff --git a/parent/pom.xml b/parent/pom.xml index f9ebf8a..9a16eaee 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -562,8 +562,8 @@ <protobuf-maven-plugin-version>0.5.1</protobuf-maven-plugin-version> <protonpack-version>1.8</protonpack-version> <pubnub-version>4.21.0</pubnub-version> - <pulsar-version>2.2.1</pulsar-version> - <pulsar-bundle-version>2.2.1_1</pulsar-bundle-version> + <pulsar-version>2.3.1</pulsar-version> + <pulsar-bundle-version>2.3.1_1</pulsar-bundle-version> <qpid-broker-version>7.1.3</qpid-broker-version> <qpid-proton-j-version>0.33.0</qpid-proton-j-version> <qpid-jms-client-version>0.42.0</qpid-jms-client-version>