This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b9f2071ed NIFI-11270 Refactoring of the overly Paho-specific MQTT 
interface
2b9f2071ed is described below

commit 2b9f2071ed111f461236900277976f91bc1ac029
Author: Nandor Soma Abonyi <abonyis...@gmail.com>
AuthorDate: Sun Mar 12 22:58:55 2023 +0100

    NIFI-11270 Refactoring of the overly Paho-specific MQTT interface
    
    This closes #7032.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   | 21 +----
 .../apache/nifi/processors/mqtt/PublishMQTT.java   | 23 +-----
 .../mqtt/adapters/HiveMqV5ClientAdapter.java       | 15 +---
 .../mqtt/adapters/PahoMqttClientAdapter.java       | 89 +++++++++++++++-------
 .../nifi/processors/mqtt/common/MqttClient.java    | 10 +--
 ...llback.java => ReceivedMqttMessageHandler.java} | 12 ++-
 .../processors/mqtt/common/MqttTestClient.java     | 13 +---
 7 files changed, 84 insertions(+), 99 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index f0d529a233..02982600f7 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -43,7 +43,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
-import org.apache.nifi.processors.mqtt.common.MqttCallback;
 import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -104,7 +103,7 @@ import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
             "on the topic.")})
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The 'Max Queue Size' specifies the maximum number of messages that can be hold 
in memory by NiFi by a single "
         + "instance of this processor. A high value for this property could 
represent a lot of data being stored in memory.")
-public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback 
{
+public class ConsumeMQTT extends AbstractMQTTProcessor {
 
     public final static String RECORD_COUNT_KEY = "record.count";
     public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
@@ -383,9 +382,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         // non-null but not connected, so we need to handle each case and only 
create a new client when it is null
         try {
             mqttClient = createMqttClient();
-            mqttClient.setCallback(this);
             mqttClient.connect();
-            mqttClient.subscribe(topicPrefix + topicFilter, qos);
+            mqttClient.subscribe(topicPrefix + topicFilter, qos, 
this::handleReceivedMessage);
         } catch (Exception e) {
             logger.error("Connection failed to {}. Yielding processor", 
clientProperties.getRawBrokerUris(), e);
             mqttClient = null; // prevent stucked processor when subscribe 
fails
@@ -614,13 +612,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         return stringBuilder.toString();
     }
 
-    @Override
-    public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost", 
clientProperties.getRawBrokerUris(), cause);
-    }
-
-    @Override
-    public void messageArrived(ReceivedMqttMessage message) {
+    private void handleReceivedMessage(ReceivedMqttMessage message) {
         if (logger.isDebugEnabled()) {
             byte[] payload = message.getPayload();
             final String text = new String(payload, StandardCharsets.UTF_8);
@@ -639,11 +631,4 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
             throw new MqttException("Failed to process message arrived from 
topic " + message.getTopic());
         }
     }
-
-    @Override
-    public void deliveryComplete(String token) {
-        // Unlikely situation. Api uses the same callback for publisher and 
consumer as well.
-        // That's why we have this log message here to indicate something 
really messy thing happened.
-        logger.error("Received MQTT 'delivery complete' message to subscriber. 
Token: [{}]", token);
-    }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index ffb9633549..13b18abb86 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -38,8 +38,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
-import org.apache.nifi.processors.mqtt.common.MqttCallback;
-import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -74,7 +72,7 @@ import static java.util.Optional.ofNullable;
 @CapabilityDescription("Publishes a message to an MQTT topic")
 @SeeAlso({ConsumeMQTT.class})
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
-public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback 
{
+public class PublishMQTT extends AbstractMQTTProcessor {
 
     public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
             .name("Topic")
@@ -289,7 +287,6 @@ public class PublishMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         // non-null but not connected, so we need to handle each case and only 
create a new client when it is null
         try {
             mqttClient = createMqttClient();
-            mqttClient.setCallback(this);
             mqttClient.connect();
         } catch (Exception e) {
             logger.error("Connection failed to {}. Yielding processor", 
clientProperties.getRawBrokerUris(), e);
@@ -297,24 +294,6 @@ public class PublishMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         }
     }
 
-    @Override
-    public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost", 
clientProperties.getRawBrokerUris(), cause);
-    }
-
-    @Override
-    public void messageArrived(ReceivedMqttMessage message) {
-        // Unlikely situation. Api uses the same callback for publisher and 
consumer as well.
-        // That's why we have this log message here to indicate something 
really messy thing happened.
-        logger.error("Message arrived to a PublishMQTT processor { topic:'" + 
message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) + 
"}");
-    }
-
-    @Override
-    public void deliveryComplete(String token) {
-        // Client.publish waits for message to be delivered so this token will 
always have a null message and is useless in this application.
-        logger.trace("Received 'delivery complete' message from broker. Token: 
[{}]", token);
-    }
-
     interface ProcessStrategy {
         void process(ProcessContext context, FlowFile flowfile, InputStream 
in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) 
throws IOException;
         String getFailureTemplateMessage();
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
index b44c18070a..4295f07502 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
@@ -24,12 +24,12 @@ import 
com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
 import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
 import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.mqtt.common.MqttCallback;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
 import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttProtocolScheme;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessageHandler;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
 import org.apache.nifi.security.util.KeyStoreUtils;
 import org.apache.nifi.security.util.TlsException;
@@ -50,8 +50,6 @@ public class HiveMqV5ClientAdapter implements MqttClient {
     private final MqttClientProperties clientProperties;
     private final ComponentLog logger;
 
-    private MqttCallback callback;
-
     public HiveMqV5ClientAdapter(URI brokerUri, MqttClientProperties 
clientProperties, ComponentLog logger) throws TlsException {
         this.mqtt5BlockingClient = createClient(brokerUri, clientProperties, 
logger);
         this.clientProperties = clientProperties;
@@ -124,9 +122,7 @@ public class HiveMqV5ClientAdapter implements MqttClient {
     }
 
     @Override
-    public void subscribe(String topicFilter, int qos) {
-        Objects.requireNonNull(callback, "callback should be set");
-
+    public void subscribe(String topicFilter, int qos, 
ReceivedMqttMessageHandler handler) {
         logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
 
         CompletableFuture<Mqtt5SubAck> futureAck = 
mqtt5BlockingClient.toAsync().subscribeWith()
@@ -138,7 +134,7 @@ public class HiveMqV5ClientAdapter implements MqttClient {
                             mqtt5Publish.getQos().getCode(),
                             mqtt5Publish.isRetain(),
                             mqtt5Publish.getTopic().toString());
-                    callback.messageArrived(receivedMessage);
+                    handler.handleReceivedMessage(receivedMessage);
                 })
                 .send();
 
@@ -152,11 +148,6 @@ public class HiveMqV5ClientAdapter implements MqttClient {
         }
     }
 
-    @Override
-    public void setCallback(MqttCallback callback) {
-        this.callback = callback;
-    }
-
     private static Mqtt5BlockingClient createClient(URI brokerUri, 
MqttClientProperties clientProperties, ComponentLog logger) throws TlsException 
{
         logger.debug("Creating Mqtt v5 client");
 
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
index 15b49fc208..fd723601a7 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
@@ -17,20 +17,22 @@
 package org.apache.nifi.processors.mqtt.adapters;
 
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.mqtt.common.MqttCallback;
-import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessageHandler;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
 import org.apache.nifi.security.util.TlsConfiguration;
 import org.eclipse.paho.client.mqttv3.IMqttClient;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Properties;
 
 public class PahoMqttClientAdapter implements MqttClient {
@@ -45,6 +47,7 @@ public class PahoMqttClientAdapter implements MqttClient {
         this.client = createClient(brokerUri, clientProperties, logger);
         this.clientProperties = clientProperties;
         this.logger = logger;
+        client.setCallback(new DefaultMqttCallback());
     }
 
     @Override
@@ -121,9 +124,11 @@ public class PahoMqttClientAdapter implements MqttClient {
     }
 
     @Override
-    public void subscribe(String topicFilter, int qos) {
+    public void subscribe(String topicFilter, int qos, 
ReceivedMqttMessageHandler handler) {
         logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
 
+        client.setCallback(new ConsumerMqttCallback(handler));
+
         try {
             client.subscribe(topicFilter, qos);
         } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
@@ -131,28 +136,6 @@ public class PahoMqttClientAdapter implements MqttClient {
         }
     }
 
-    @Override
-    public void setCallback(MqttCallback callback) {
-        client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallback() {
-            @Override
-            public void connectionLost(Throwable cause) {
-                callback.connectionLost(cause);
-            }
-
-            @Override
-            public void messageArrived(String topic, MqttMessage message) {
-                logger.debug("Message arrived with id: {}", message.getId());
-                final ReceivedMqttMessage receivedMessage = new 
ReceivedMqttMessage(message.getPayload(), message.getQos(), 
message.isRetained(), topic);
-                callback.messageArrived(receivedMessage);
-            }
-
-            @Override
-            public void deliveryComplete(IMqttDeliveryToken token) {
-                callback.deliveryComplete(token.toString());
-            }
-        });
-    }
-
     public static Properties transformSSLContextService(TlsConfiguration 
tlsConfiguration) {
         final Properties properties = new Properties();
         if (tlsConfiguration.getProtocol() != null) {
@@ -176,7 +159,7 @@ public class PahoMqttClientAdapter implements MqttClient {
         if (tlsConfiguration.getTruststoreType() != null) {
             properties.setProperty("com.ibm.ssl.trustStoreType", 
tlsConfiguration.getTruststoreType().getType());
         }
-        return  properties;
+        return properties;
     }
 
     private static org.eclipse.paho.client.mqttv3.MqttClient createClient(URI 
brokerUri, MqttClientProperties clientProperties, ComponentLog logger) {
@@ -189,4 +172,58 @@ public class PahoMqttClientAdapter implements MqttClient {
         }
     }
 
+    /**
+     * Paho API uses the same callback for the publisher and consumer as well.
+     * Because of that, DefaultMqttCallback sets some reasonable default logs
+     * to make it easier to track misconfiguration errors.
+     * <p>
+     * In case of subscribing clients messageArrived needs to be overridden.
+     */
+    private class DefaultMqttCallback implements MqttCallback {
+
+        @Override
+        public void connectionLost(Throwable cause) {
+            logger.error("Connection to [{}] lost", 
clientProperties.getRawBrokerUris(), cause);
+        }
+
+        @Override
+        public void messageArrived(String topic, MqttMessage message) {
+            // Unlikely situation. The Paho api uses the same callback for 
publisher and consumer as well. That's why
+            // we have this log message here to indicate something messy thing 
happened because we don't expect to
+            // receive messages until the client is not subscribed and the 
callback is not changed to ConsumerMqttCallback.
+            logger.error("MQTT message arrived [topic:{}; payload:{}]", topic, 
Arrays.toString(message.getPayload()));
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken token) {
+            logger.trace("Received 'delivery complete' message from broker. 
Token: [{}]", token);
+        }
+    }
+
+    /**
+     * Subscriber specific implementation of MqttCallback
+     */
+    private class ConsumerMqttCallback extends DefaultMqttCallback {
+
+        private final ReceivedMqttMessageHandler handler;
+
+        private ConsumerMqttCallback(ReceivedMqttMessageHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public void messageArrived(String topic, MqttMessage message) {
+            logger.debug("Message arrived. Id: [{}]", message.getId());
+            final ReceivedMqttMessage receivedMessage = new 
ReceivedMqttMessage(message.getPayload(), message.getQos(), 
message.isRetained(), topic);
+            handler.handleReceivedMessage(receivedMessage);
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken token) {
+            // Unlikely situation. The Paho api uses the same callback for 
publisher and consumer as well. That's why
+            // we have this log message here to indicate something messy thing 
happened because we don't expect to
+            // receive 'delivery complete' messages while the client is 
subscribed.
+            logger.error("Received MQTT 'delivery complete' message to a 
subscribed client. Token: [{}]", token);
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
index f21d3e9242..2b5c949531 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
@@ -57,13 +57,7 @@ public interface MqttClient {
      *            published at a lower quality of service will be received at 
the published
      *            QoS. Messages published at a higher quality of service will 
be received using
      *            the QoS specified on the subscribe.
+     * @param handler that further processes the message received by the client
      */
-    void subscribe(String topicFilter, int qos);
-
-    /**
-     * Sets a callback listener to use for events that happen asynchronously.
-     *
-     * @param callback for matching events
-     */
-    void setCallback(MqttCallback callback);
+    void subscribe(String topicFilter, int qos, ReceivedMqttMessageHandler 
handler);
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessageHandler.java
similarity index 80%
rename from 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
rename to 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessageHandler.java
index a890616f5c..6a5c75b119 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessageHandler.java
@@ -16,8 +16,12 @@
  */
 package org.apache.nifi.processors.mqtt.common;
 
-public interface MqttCallback {
-    void connectionLost(Throwable cause);
-    void messageArrived(ReceivedMqttMessage message);
-    void deliveryComplete(String token);
+public interface ReceivedMqttMessageHandler {
+
+    /**
+     * Handler to process received MQTT message
+     *
+     * @param message to process
+     */
+    void handleReceivedMessage(ReceivedMqttMessage message);
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
index dcb87b612c..263f3c55d3 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
@@ -29,14 +29,13 @@ public class MqttTestClient implements MqttClient {
 
     public AtomicBoolean connected = new AtomicBoolean(false);
 
-    public MqttCallback mqttCallback;
     public ConnectType type;
 
     public enum ConnectType {Publisher, Subscriber}
 
     public String subscribedTopic;
     public int subscribedQos;
-
+    public ReceivedMqttMessageHandler receivedMqttMessageHandler;
     public MqttTestClient(ConnectType type) {
         this.type = type;
     }
@@ -68,20 +67,16 @@ public class MqttTestClient implements MqttClient {
                 publishedMessages.add(Pair.of(topic, message));
                 break;
             case Subscriber:
-                mqttCallback.messageArrived(new 
ReceivedMqttMessage(message.getPayload(), message.getQos(), 
message.isRetained(), topic));
+                receivedMqttMessageHandler.handleReceivedMessage(new 
ReceivedMqttMessage(message.getPayload(), message.getQos(), 
message.isRetained(), topic));
                 break;
         }
     }
 
     @Override
-    public void subscribe(String topicFilter, int qos) {
+    public void subscribe(String topicFilter, int qos, 
ReceivedMqttMessageHandler handler) {
         subscribedTopic = topicFilter;
         subscribedQos = qos;
-    }
-
-    @Override
-    public void setCallback(MqttCallback callback) {
-        this.mqttCallback = callback;
+        receivedMqttMessageHandler = handler;
     }
 
     public Pair<String, StandardMqttMessage> getLastPublished() {

Reply via email to