Repository: camel
Updated Branches:
  refs/heads/camel-2.13.x 9b790e5b0 -> 0b32b2a29
  refs/heads/camel-2.14.x 9d7440cf9 -> e8ed23455


CAMEL-7662 MQTTProducerTest fails once enables it

The assertion expects MQTT publish/disconnect call to be submitted as a async 
task, i.e. enqueued in a dispatch queue


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6f20e539
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6f20e539
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6f20e539

Branch: refs/heads/camel-2.14.x
Commit: 6f20e53974056e9e0298191b2d245d9e0d4c6a74
Parents: 9d7440c
Author: Tomohisa Igarashi <tm.igara...@gmail.com>
Authored: Sun Nov 16 16:55:45 2014 +0900
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Mon Nov 17 12:21:59 2014 +0800

----------------------------------------------------------------------
 components/camel-mqtt/pom.xml                   |  2 --
 .../camel/component/mqtt/MQTTEndpoint.java      | 30 +++++++++++++-------
 2 files changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6f20e539/components/camel-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/pom.xml b/components/camel-mqtt/pom.xml
index cb84a60..119c91f 100644
--- a/components/camel-mqtt/pom.xml
+++ b/components/camel-mqtt/pom.xml
@@ -80,8 +80,6 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
                     <forkMode>perTest</forkMode>
-                    <!--CAMEL-7662 disabling the assertion this time-->
-                    <enableAssertions>false</enableAssertions>
                 </configuration>
             </plugin>
              <plugin>

http://git-wip-us.apache.org/repos/asf/camel/blob/6f20e539/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 07014ad..664116f 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.Task;
 import org.fusesource.mqtt.client.Callback;
 import org.fusesource.mqtt.client.CallbackConnection;
 import org.fusesource.mqtt.client.Listener;
@@ -114,15 +115,19 @@ public class MQTTEndpoint extends DefaultEndpoint {
     protected void doStop() throws Exception {
         if (connection != null) {
             final Promise<Void> promise = new Promise<Void>();
-            connection.disconnect(new Callback<Void>() {
-                public void onSuccess(Void value) {
-                    promise.onSuccess(value);
-                }
+            connection.getDispatchQueue().execute(new Task() {
+                @Override
+                public void run() {
+                    connection.disconnect(new Callback<Void>() {
+                        public void onSuccess(Void value) {
+                            promise.onSuccess(value);
+                        }
 
-                public void onFailure(Throwable value) {
-                    promise.onFailure(value);
-                }
-            });
+                        public void onFailure(Throwable value) {
+                            promise.onFailure(value);
+                        }
+                    });
+            }});
             promise.await(configuration.getDisconnectWaitInSeconds(), 
TimeUnit.SECONDS);
         }
         super.doStop();
@@ -169,8 +174,13 @@ public class MQTTEndpoint extends DefaultEndpoint {
         return connected;
     }
  
-    void publish(String topic, byte[] payload, QoS qoS, boolean retain, 
Callback<Void> callback) throws Exception {
-        connection.publish(topic, payload, qoS, retain, callback);
+    void publish(final String topic, final byte[] payload, final QoS qoS, 
final boolean retain, final Callback<Void> callback) throws Exception {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                connection.publish(topic, payload, qoS, retain, callback);
+            }
+        });
     }
 
     void addConsumer(MQTTConsumer consumer) {

Reply via email to