Repository: camel Updated Branches: refs/heads/camel-2.15.x 7cdb7c1f1 -> ac31039c9 refs/heads/master d1c7f6507 -> 17391a12e
CAMEL-9092 MQTT consumer receives duplicate messages after broker restart. With thanks to Tomohisa Igarashi. Code merged with modifications. This closes #601. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/17391a12 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/17391a12 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/17391a12 Branch: refs/heads/master Commit: 17391a12e9e7a3158058d4e885c6af65141a1338 Parents: d1c7f65 Author: Raul Kripalani <ra...@apache.org> Authored: Fri Aug 28 15:44:35 2015 +0100 Committer: Raul Kripalani <ra...@apache.org> Committed: Fri Aug 28 15:51:38 2015 +0100 ---------------------------------------------------------------------- .../camel/component/mqtt/MQTTEndpoint.java | 139 +++++++++++++++- .../component/mqtt/MQTTDuplicatesTest.java | 158 +++++++++++++++++++ .../src/test/resources/log4j.properties | 2 +- 3 files changed, 296 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/17391a12/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index 92c8d17..89caedd 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -39,6 +39,21 @@ import org.fusesource.mqtt.client.Listener; import org.fusesource.mqtt.client.Promise; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.client.Tracer; +import org.fusesource.mqtt.codec.CONNACK; +import org.fusesource.mqtt.codec.CONNECT; +import org.fusesource.mqtt.codec.DISCONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.PINGREQ; +import org.fusesource.mqtt.codec.PINGRESP; +import org.fusesource.mqtt.codec.PUBACK; +import org.fusesource.mqtt.codec.PUBCOMP; +import org.fusesource.mqtt.codec.PUBLISH; +import org.fusesource.mqtt.codec.PUBREC; +import org.fusesource.mqtt.codec.PUBREL; +import org.fusesource.mqtt.codec.SUBACK; +import org.fusesource.mqtt.codec.SUBSCRIBE; +import org.fusesource.mqtt.codec.UNSUBSCRIBE; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,12 +72,127 @@ public class MQTTEndpoint extends DefaultEndpoint { @UriPath @Metadata(required = "true") private String name; + @UriParam private final MQTTConfiguration configuration; - public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties) { + public MQTTEndpoint(final String uri, MQTTComponent component, MQTTConfiguration properties) { super(uri, component); this.configuration = properties; + if (LOG.isTraceEnabled()) { + configuration.setTracer(new Tracer() { + @Override + public void debug(String message, Object...args) { + LOG.trace("tracer.debug() " + this + ": uri=" + uri + ", message=" + String.format(message, args)); + } + + @Override + public void onSend(MQTTFrame frame) { + String decoded = null; + try { + switch (frame.messageType()) { + case PINGREQ.TYPE: + decoded = new PINGREQ().decode(frame).toString(); + break; + case PINGRESP.TYPE: + decoded = new PINGRESP().decode(frame).toString(); + break; + case CONNECT.TYPE: + decoded = new CONNECT().decode(frame).toString(); + break; + case DISCONNECT.TYPE: + decoded = new DISCONNECT().decode(frame).toString(); + break; + case SUBSCRIBE.TYPE: + decoded = new SUBSCRIBE().decode(frame).toString(); + break; + case UNSUBSCRIBE.TYPE: + decoded = new UNSUBSCRIBE().decode(frame).toString(); + break; + case PUBLISH.TYPE: + decoded = new PUBLISH().decode(frame).toString(); + break; + case PUBACK.TYPE: + decoded = new PUBACK().decode(frame).toString(); + break; + case PUBREC.TYPE: + decoded = new PUBREC().decode(frame).toString(); + break; + case PUBREL.TYPE: + decoded = new PUBREL().decode(frame).toString(); + break; + case PUBCOMP.TYPE: + decoded = new PUBCOMP().decode(frame).toString(); + break; + case CONNACK.TYPE: + decoded = new CONNACK().decode(frame).toString(); + break; + case SUBACK.TYPE: + decoded = new SUBACK().decode(frame).toString(); + break; + default: + decoded = frame.toString(); + } + } catch (Throwable e) { + decoded = frame.toString(); + } + LOG.trace("tracer.onSend() " + this + ": uri=" + uri + ", frame=" + decoded); + } + + @Override + public void onReceive(MQTTFrame frame) { + String decoded = null; + try { + switch (frame.messageType()) { + case PINGREQ.TYPE: + decoded = new PINGREQ().decode(frame).toString(); + break; + case PINGRESP.TYPE: + decoded = new PINGRESP().decode(frame).toString(); + break; + case CONNECT.TYPE: + decoded = new CONNECT().decode(frame).toString(); + break; + case DISCONNECT.TYPE: + decoded = new DISCONNECT().decode(frame).toString(); + break; + case SUBSCRIBE.TYPE: + decoded = new SUBSCRIBE().decode(frame).toString(); + break; + case UNSUBSCRIBE.TYPE: + decoded = new UNSUBSCRIBE().decode(frame).toString(); + break; + case PUBLISH.TYPE: + decoded = new PUBLISH().decode(frame).toString(); + break; + case PUBACK.TYPE: + decoded = new PUBACK().decode(frame).toString(); + break; + case PUBREC.TYPE: + decoded = new PUBREC().decode(frame).toString(); + break; + case PUBREL.TYPE: + decoded = new PUBREL().decode(frame).toString(); + break; + case PUBCOMP.TYPE: + decoded = new PUBCOMP().decode(frame).toString(); + break; + case CONNACK.TYPE: + decoded = new CONNACK().decode(frame).toString(); + break; + case SUBACK.TYPE: + decoded = new SUBACK().decode(frame).toString(); + break; + default: + decoded = frame.toString(); + } + } catch (Throwable e) { + decoded = frame.toString(); + } + LOG.trace("tracer.onReceive() " + this + ": uri=" + uri + ", frame=" + decoded); + } + }); + } } @Override @@ -109,7 +239,11 @@ public class MQTTEndpoint extends DefaultEndpoint { } public void onDisconnected() { - connected = false; + // no connected = false required here because the MQTT client should trigger its own reconnect; + // setting connected = false would make the publish() method to launch a new connection while the original + // one is still reconnecting, likely leading to duplicate messages as observed in CAMEL-9092; + // if retries are exhausted and it desists, we should get a callback on onFailure, and then we can set + // connected = false safely LOG.debug("MQTT Connection disconnected from {}", configuration.getHost()); } @@ -181,6 +315,7 @@ public class MQTTEndpoint extends DefaultEndpoint { } public void onFailure(Throwable value) { + LOG.debug("Failed to subscribe", value); promise.onFailure(value); connection.disconnect(null); connected = false; http://git-wip-us.apache.org/repos/asf/camel/blob/17391a12/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java new file mode 100644 index 0000000..c397a1f --- /dev/null +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mqtt; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.activemq.broker.BrokerService; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests duplicate delivery via mqtt consumer. + * + * @version + */ +public class MQTTDuplicatesTest extends MQTTBaseTest { + + private static final int MESSAGE_COUNT = 50; + private static final int WAIT_MILLIS = 100; + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:withClientID") + protected ProducerTemplate templateWithClientID; + + @Produce(uri = "direct:withoutClientID") + protected ProducerTemplate templateWithoutClientID; + + @Test + public void testMqttDuplicates() throws Exception { + for (int i = 0; i < MESSAGE_COUNT; i++) { + String body = System.currentTimeMillis() + ": Dummy! " + i; + templateWithClientID.asyncSendBody("direct:withClientID", body); + Thread.sleep(WAIT_MILLIS); + } + + assertNoDuplicates(); + } + + @Test + public void testMqttDuplicatesAfterBrokerRestartWithoutClientID() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + + LOG.info(">>>>>>>>>> Restarting broker..."); + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true"); + brokerService.start(); + brokerService.waitUntilStarted(); + LOG.info(">>>>>>>>>> Broker restarted"); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + String body = System.currentTimeMillis() + ": Dummy-restart-without-clientID! " + i; + templateWithoutClientID.asyncSendBody("direct:withoutClientID", body); + Thread.sleep(WAIT_MILLIS); + } + + assertNoDuplicates(); + } + + @Test + public void testMqttDuplicatesAfterBrokerRestartWithClientID() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + + LOG.info(">>>>>>>>>> Restarting broker..."); + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true"); + brokerService.start(); + brokerService.waitUntilStarted(); + LOG.info(">>>>>>>>>> Broker restarted"); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + String body = System.currentTimeMillis() + ": Dummy-restart-with-clientID! " + i; + templateWithClientID.asyncSendBody("direct:withClientID", body); + Thread.sleep(WAIT_MILLIS); + } + + assertNoDuplicates(); + } + + private void assertNoDuplicates() { + List<Exchange> exchanges = resultEndpoint.getExchanges(); + Assert.assertTrue("No message was delivered - something wrong happened", exchanges.size() > 0); + Set<String> values = new HashSet<String>(); + List<String> duplicates = new ArrayList<String>(); + for (Exchange e : exchanges) { + String body = e.getIn().getBody(String.class); + if (values.contains(body)) { + duplicates.add(body); + } + values.add(body); + } + Assert.assertTrue("Duplicate messages are detected: " + duplicates.toString(), duplicates.isEmpty()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + + // -------------------- + // Without client ID: + // -------------------- + from("direct:withoutClientID") + .routeId("SenderWithoutClientID") + .log("$$$$$ Sending message: ${body}") + .to("mqtt:sender?publishTopicName=test/topic1&qualityOfService=ExactlyOnce"); + + from("mqtt:reader?subscribeTopicName=test/topic1&qualityOfService=ExactlyOnce") + .routeId("ReceiverWithoutClientID") + .log("$$$$$ Received message: ${body}") + .to("mock:result"); + + // -------------------- + // With client ID: + // -------------------- + from("direct:withClientID") + .routeId("SenderWithClientID") + .log("$$$$$ Sending message: ${body}") + .to("mqtt:sender?publishTopicName=test/topic2&clientId=sender&qualityOfService=ExactlyOnce"); + + from("mqtt:reader?subscribeTopicName=test/topic2&clientId=receiver&qualityOfService=ExactlyOnce") + .routeId("ReceiverWithClientID") + .log("$$$$$ Received message: ${body}") + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/17391a12/components/camel-mqtt/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/resources/log4j.properties b/components/camel-mqtt/src/test/resources/log4j.properties index b15a0cd..d7962ca 100644 --- a/components/camel-mqtt/src/test/resources/log4j.properties +++ b/components/camel-mqtt/src/test/resources/log4j.properties @@ -21,7 +21,7 @@ log4j.rootLogger=info, file #log4j.logger.twitter4j=DEBUG -#log4j.logger.org.apache.camel.component.mqtt=DEBUG +#log4j.logger.org.apache.camel.component.mqtt=TRACE #log4j.logger.org.apache.camel=DEBUG # CONSOLE appender not used by default