nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966172583


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import javax.json.Json;
+import javax.json.JsonArray;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
-        StandardMqttMessage lastPublishedMessage = 
mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private TestRunner testRunner;
+    private MqttTestClient mqttTestClient;
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean 
or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final JsonArray testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS,
 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, 
false);
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue 
should be empty.");
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName),
 "Failed attribute should not be present on the FlowFile");
+
+        // clean runner by removing records reader/writer
+        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
+        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);

Review Comment:
   Thanks for the idea! These cleanups are not needed at all, because every 
test creates a new runner. Nevertheless, I've added @AfterEach to properly 
cleanup (just for the sake of safety) and there were 2 runners that used local 
variables that I now changed.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java:
##########
@@ -17,105 +17,583 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
-import org.apache.nifi.security.util.TlsConfiguration;
 import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import javax.net.ssl.SSLContext;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
-    private static TlsConfiguration tlsConfiguration;
+public class TestConsumeMQTT {
 
-    public MqttTestClient mqttTestClient;
+    private static final int PUBLISH_WAIT_MS = 0;
+    private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String CLIENT_ID = "TestClient";
+    private static final String TOPIC_NAME = "testTopic";
+    private static final String INTERNAL_QUEUE_SIZE = "100";
 
-    public class UnitTestableConsumeMqtt extends ConsumeMQTT {
+    private static final String STRING_MESSAGE = "testMessage";
+    private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
 
-        public UnitTestableConsumeMqtt(){
-            super();
-        }
+    private static final int MOST_ONE = 0;
+    private static final int LEAST_ONE = 1;
+    private static final int EXACTLY_ONCE = 2;

Review Comment:
   Sure, changed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to