[ 
https://issues.apache.org/jira/browse/CAMEL-13014?focusedWorklogId=176869&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176869
 ]

ASF GitHub Bot logged work on CAMEL-13014:
------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Dec/18 07:28
            Start Date: 19/Dec/18 07:28
    Worklog Time Spent: 10m 
      Work Description: oscerd closed pull request #2678: CAMEL-13014 fix 
stealing link for clientId (looped error)
URL: https://github.com/apache/camel/pull/2678
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 5b70097f46c..12849656131 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -228,6 +228,7 @@ protected void doStart() throws Exception {
         super.doStart();
 
         createConnection();
+        connect();
     }
 
     protected void createConnection() {
diff --git 
a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
index 24e0ad4c250..dcc2d215b9b 100644
--- 
a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
+++ 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
@@ -63,7 +63,7 @@ public void testWildcardSubscribeTopicsConfiguration() throws 
Exception {
 
     @Test
     public void testExactlyOnceQualityOfServiceConfiguration() throws 
Exception {
-        Endpoint endpoint = 
context.getEndpoint("mqtt:todo?qualityOfService=exactlyOnce");
+        Endpoint endpoint = 
context.getEndpoint("mqtt:todo?qualityOfService=exactlyOnce&host=" + 
MQTTTestSupport.getHostForMQTTEndpoint());
         assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint 
instanceof MQTTEndpoint);
         MQTTEndpoint mqttEndpoint = (MQTTEndpoint)endpoint;
 
@@ -72,7 +72,7 @@ public void testExactlyOnceQualityOfServiceConfiguration() 
throws Exception {
 
     @Test
     public void testAtLeastOnceQualityOfServiceConfiguration() throws 
Exception {
-        Endpoint endpoint = 
context.getEndpoint("mqtt:todo?qualityOfService=AtLeastOnce");
+        Endpoint endpoint = 
context.getEndpoint("mqtt:todo?qualityOfService=AtLeastOnce&host=" + 
MQTTTestSupport.getHostForMQTTEndpoint());
         assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint 
instanceof MQTTEndpoint);
         MQTTEndpoint mqttEndpoint = (MQTTEndpoint)endpoint;
 
@@ -81,7 +81,7 @@ public void testAtLeastOnceQualityOfServiceConfiguration() 
throws Exception {
 
     @Test
     public void testAtMostOnceQualityOfServiceConfiguration() throws Exception 
{
-        Endpoint endpoint = 
context.getEndpoint("mqtt:todo?qualityOfService=AtMostOnce");
+        Endpoint endpoint = 
context.getEndpoint("mqtt:todo?qualityOfService=AtMostOnce&host=" + 
MQTTTestSupport.getHostForMQTTEndpoint());
         assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint 
instanceof MQTTEndpoint);
         MQTTEndpoint mqttEndpoint = (MQTTEndpoint)endpoint;
 
diff --git 
a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
new file mode 100644
index 00000000000..12662696cbb
--- /dev/null
+++ 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+
+public class MQTTLoopProducerTest extends MQTTBaseTest {
+    @Test
+    public void testProduce() throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setHost(MQTTTestSupport.getHostForMQTTEndpoint());
+        final BlockingConnection subscribeConnection = 
mqtt.blockingConnection();
+        subscribeConnection.connect();
+        Topic topic = new Topic(TEST_TOPIC, QoS.AT_MOST_ONCE);
+        Topic[] topics = {topic};
+        subscribeConnection.subscribe(topics);
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages * 2); 
//2 publishers
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < numberOfMessages * 2; i++) {
+                    try {
+                        Message message = subscribeConnection.receive();
+                        message.ack();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            }
+        });
+        thread.start();
+
+        Producer producer = context.getEndpoint("direct:foo").createProducer();
+        for (int i = 0; i < numberOfMessages; i++) {
+            Exchange exchange = producer.createExchange();
+            exchange.getIn().setBody("test message " + i);
+            producer.process(exchange);
+        }
+        latch.await(10, TimeUnit.SECONDS);
+        assertTrue("Messages not consumed = " + latch.getCount(), 
latch.getCount() == 0);
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:foo")
+                .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, 
constant(TEST_TOPIC))
+                .to("mqtt:boo1?host=" + 
MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce")
+                .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, 
constant(TEST_TOPIC))
+                .to("mqtt:boo2?host=" + 
MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce");
+            }
+        };
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 176869)
    Time Spent: 20m  (was: 10m)

> camel mqtt crash using high volume traffic
> ------------------------------------------
>
>                 Key: CAMEL-13014
>                 URL: https://issues.apache.org/jira/browse/CAMEL-13014
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-mqtt
>    Affects Versions: 2.23.0
>            Reporter: Fabrizio Spataro
>            Assignee: Fabrizio Spataro
>            Priority: Major
>             Fix For: 2.21.4, 2.22.3, 3.0.0, 2.24.0, 2.23.1
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Into current version we cannot use 2 (or more) sequential camel-mqtt producer.
> With high volume traffic use case, we have this looped error:  
> {quote}2018-12-18 16:27:12,609 [.0.1:36172@1101] WARN RegionBroker - Stealing 
> link for clientId 7f000001e92a5c1911c9 From Connection Transport Connection 
> to: tcp://127.0.0.1:36176
> 2018-12-18 16:27:12,609 [.0.1:36170@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36172
> 2018-12-18 16:27:12,614 [.0.1:36180@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36170
> 2018-12-18 16:27:12,639 [.0.1:36182@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36180
> 2018-12-18 16:27:12,639 [.0.1:36184@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36182
> 2018-12-18 16:27:12,662 [.0.1:36186@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36184
> 2018-12-18 16:27:12,662 [.0.1:36190@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36186
> 2018-12-18 16:27:12,666 [.0.1:36178@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36190
> 2018-12-18 16:27:12,668 [.0.1:36188@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36192
> 2018-12-18 16:27:12,668 [.0.1:36192@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36178
> 2018-12-18 16:27:12,669 [.0.1:36196@1101] WARN RegionBroker - Stealing link 
> for clientId 7f000001e92a5c1911c9 From Connection Transport Connection to: 
> tcp://127.0.0.1:36194
> {quote}
>  You can reproduce it using junit class 
> *org.apache.camel.component.mqtt.MQTTLoopProducerTest*
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to