This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 93a5823  NIFI-7890 - Added record support to ConsumeMQTT processor
93a5823 is described below

commit 93a5823f8ab9b1e6b71fe418e8af3756f4c1c1c2
Author: Pierre Villard <pierre.villard...@gmail.com>
AuthorDate: Wed Oct 7 19:05:47 2020 +0200

    NIFI-7890 - Added record support to ConsumeMQTT processor
    
    This closes #4738.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml  |  16 +
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   | 347 ++++++++++++++++++++-
 .../mqtt/common/AbstractMQTTProcessor.java         |   2 +
 .../mqtt/common/TestConsumeMqttCommon.java         | 226 ++++++++++++++
 4 files changed, 578 insertions(+), 13 deletions(-)

diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index 4bfff22..260c46c 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -68,9 +68,25 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
             <version>1.13.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 5f93e65..83c4359 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -33,6 +33,7 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -45,13 +46,29 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -62,11 +79,13 @@ import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
 import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
 import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
 import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.RECORD_COUNT_KEY;
 import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
@@ -79,6 +98,7 @@ import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
 @CapabilityDescription("Subscribes to a topic and receives messages from an 
MQTT broker")
 @SeeAlso({PublishMQTT.class})
 @WritesAttributes({
+    @WritesAttribute(attribute=RECORD_COUNT_KEY, description="The number of 
records received"),
     @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker 
that was the message source"),
     @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on 
which message was received"),
     @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of 
service for this message."),
@@ -88,13 +108,25 @@ import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The 'Max Queue Size' specifies the maximum number of messages that can be hold 
in memory by NiFi by a single "
         + "instance of this processor. A high value for this property could 
represent a lot of data being stored in memory.")
 
-public class ConsumeMQTT extends AbstractMQTTProcessor  implements 
MqttCallback {
+public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback 
{
 
-    public final static String BROKER_ATTRIBUTE_KEY =  "mqtt.broker";
-    public final static String TOPIC_ATTRIBUTE_KEY =  "mqtt.topic";
-    public final static String QOS_ATTRIBUTE_KEY =  "mqtt.qos";
-    public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  
"mqtt.isDuplicate";
-    public final static String IS_RETAINED_ATTRIBUTE_KEY =  "mqtt.isRetained";
+    public final static String RECORD_COUNT_KEY = "record.count";
+    public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
+    public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
+    public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos";
+    public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
+    public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
+
+    public final static String TOPIC_FIELD_KEY = "_topic";
+    public final static String QOS_FIELD_KEY = "_qos";
+    public final static String IS_DUPLICATE_FIELD_KEY = "_isDuplicate";
+    public final static String IS_RETAINED_FIELD_KEY = "_isRetained";
+
+    private final static String COUNTER_PARSE_FAILURES = "Parse Failures";
+    private final static String COUNTER_RECORDS_RECEIVED = "Records Received";
+    private final static String COUNTER_RECORDS_PROCESSED = "Records 
Processed";
+
+    private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;
 
     public static final PropertyDescriptor PROP_GROUPID = new 
PropertyDescriptor.Builder()
             .name("Group ID")
@@ -131,6 +163,46 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before writing it to a FlowFile")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new 
PropertyDescriptor.Builder()
+            .name("add-attributes-as-fields")
+            .displayName("Add attributes as fields")
+            .description("If using the Records reader/writer and if setting 
this property to true, default fields "
+                    + "are going to be added in each record: _topic, _qos, 
_isDuplicate, _isRetained.")
+            .required(true)
+            .defaultValue("true")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("message-demarcator")
+            .displayName("Message Demarcator")
+            .required(false)
+            .addValidator(Validator.VALID)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .description("With this property, you have an option to output 
FlowFiles which contains multiple messages. "
+                    + "This property allows you to provide a string 
(interpreted as UTF-8) to use for demarcating apart "
+                    + "multiple messages. This is an optional property ; if 
not provided, and if not defining a "
+                    + "Reader/Writer, each message received will result in a 
single FlowFile which. To enter special "
+                    + "character such as 'new line' use CTRL+Enter or 
Shift+Enter depending on the OS.")
+            .build();
+
     private volatile int qos;
     private volatile String topicPrefix = "";
     private volatile String topicFilter;
@@ -143,6 +215,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
             .description("The MQTT message output")
             .build();
 
+    public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
+            .name("parse.failure")
+            .description("If a message cannot be parsed using the configured 
Record Reader, the contents of the "
+                + "message will be routed to this Relationship as its own 
individual FlowFile.")
+            .autoTerminateDefault(true) // to make sure flow are still valid 
after upgrades
+            .build();
+
     private static final List<PropertyDescriptor> descriptors;
     private static final Set<Relationship> relationships;
 
@@ -152,10 +231,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
         innerDescriptorsList.add(PROP_TOPIC_FILTER);
         innerDescriptorsList.add(PROP_QOS);
         innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
+        innerDescriptorsList.add(RECORD_READER);
+        innerDescriptorsList.add(RECORD_WRITER);
+        innerDescriptorsList.add(ADD_ATTRIBUTES_AS_FIELDS);
+        innerDescriptorsList.add(MESSAGE_DEMARCATOR);
         descriptors = Collections.unmodifiableList(innerDescriptorsList);
 
         final Set<Relationship> innerRelationshipsSet = new 
HashSet<Relationship>();
         innerRelationshipsSet.add(REL_MESSAGE);
+        innerRelationshipsSet.add(REL_PARSE_FAILURE);
         relationships = Collections.unmodifiableSet(innerRelationshipsSet);
     }
 
@@ -200,6 +284,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
         final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet();
         final boolean clientIDwithEL = 
context.getProperty(PROP_CLIENTID).isExpressionLanguagePresent();
         final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet();
+
         if (!clientIDwithEL && clientIDSet && groupIDSet) {
             results.add(new ValidationResult.Builder()
                     .subject("Client ID and Group ID").valid(false)
@@ -209,6 +294,19 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
                     .build());
         }
 
+        final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
+        final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
+        if((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
+            results.add(new ValidationResult.Builder().subject("Reader and 
Writer").valid(false)
+                    .explanation("Both Record Reader and Writer must be set 
when used").build());
+        }
+
+        final boolean demarcatorIsSet = 
context.getProperty(MESSAGE_DEMARCATOR).isSet();
+        if(readerIsSet && demarcatorIsSet) {
+            results.add(new ValidationResult.Builder().subject("Reader and 
Writer").valid(false)
+                    .explanation("You cannot use both a demarcator and a 
Reader/Writer").build());
+        }
+
         return results;
     }
 
@@ -257,7 +355,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
         if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory 
!= null) {
             logger.info("Finishing processing leftover messages");
             ProcessSession session = processSessionFactory.createSession();
-            transferQueue(session);
+            if(context.getProperty(RECORD_READER).isSet()) {
+                transferQueueRecord(context, session);
+            } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
+                transferQueueDemarcator(context, session);
+            } else {
+                transferQueue(session);
+            }
         } else {
             if (mqttQueue!= null && !mqttQueue.isEmpty()){
                 throw new ProcessException("Stopping the processor but there 
is no ProcessSessionFactory stored and there are messages in the MQTT internal 
queue. Removing the processor now will " +
@@ -279,10 +383,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
         }
 
         if (mqttQueue.isEmpty()) {
+            context.yield();
             return;
         }
 
-        transferQueue(session);
+        if(context.getProperty(RECORD_READER).isSet()) {
+            transferQueueRecord(context, session);
+        } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
+            transferQueueDemarcator(context, session);
+        } else {
+            transferQueue(session);
+        }
     }
 
     private void initializeClient(ProcessContext context) {
@@ -308,8 +419,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 
     private void transferQueue(ProcessSession session){
         while (!mqttQueue.isEmpty()) {
-            FlowFile messageFlowfile = session.create();
             final MQTTQueueMessage mqttMessage = mqttQueue.peek();
+            FlowFile messageFlowfile = session.create();
 
             Map<String, String> attrs = new HashMap<>();
             attrs.put(BROKER_ATTRIBUTE_KEY, broker);
@@ -323,18 +434,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
             messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream out) throws IOException 
{
-                    out.write(mqttMessage.getPayload());
+                    out.write(mqttMessage.getPayload() == null ? new byte[0] : 
mqttMessage.getPayload());
                 }
             });
 
-            String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
-            session.getProvenanceReporter().receive(messageFlowfile, 
transitUri);
+            session.getProvenanceReporter().receive(messageFlowfile, 
getTransitUri(mqttMessage.getTopic()));
             session.transfer(messageFlowfile, REL_MESSAGE);
             session.commit();
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, 
possible duplication of flow files")
                         .toString());
@@ -342,6 +452,217 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+        final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            int i = 0;
+            while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload() == null ? new byte[0] : 
mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+                i++;
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, 
getTransitUri(topicPrefix, topicFilter));
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final 
MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, 
getTransitUri(mqttMessage.getTopic()));
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final 
ProcessSession session){
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new 
ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+        int i = 0;
+
+        try {
+            while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                if(mqttMessage == null) {
+                    break;
+                }
+
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? 
new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new 
ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, 
in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the 
internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = 
record.getSchema();
+                                final OutputStream rawOut = 
session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new 
ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new 
RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new 
RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new 
RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new 
RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new 
SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for 
FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, 
writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, 
mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, 
mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, 
mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, 
mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using 
the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter(COUNTER_RECORDS_RECEIVED, 
1L, false);
+                            i++;
+                        }
+                    } catch (final IOException | MalformedRecordException | 
SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put(RECORD_COUNT_KEY, 
String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            int numberOfMessages = 0;
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    numberOfMessages++;
+                    if(getLogger().isDebugEnabled()) {
+                        logger.debug("Could not add message back into the 
internal queue, this could lead to data loss", ex);
+                    }
+                }
+            }
+            if(numberOfMessages > 0) {
+                logger.error("Could not add {} message(s) back into the 
internal queue, this could mean data loss", new Object[] {numberOfMessages});
+            }
+
+            throw new ProcessException("Could not process data received from 
the MQTT broker(s): " + broker, e);
+        } finally {
+            closeWriter(writer);
+        }
+
+        if(recordCount.get() == 0) {
+            session.remove(flowFile);
+            return;
+        }
+
+        session.putAllAttributes(flowFile, attributes);
+        session.getProvenanceReporter().receive(flowFile, 
getTransitUri(topicPrefix, topicFilter));
+        session.transfer(flowFile, REL_MESSAGE);
+        session.commit();
+
+        final int count = recordCount.get();
+        session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false);
+        getLogger().info("Successfully processed {} records for {}", new 
Object[] {count, flowFile});
+    }
+
+    private void closeWriter(final RecordSetWriter writer) {
+        try {
+            if (writer != null) {
+                writer.close();
+            }
+        } catch (final Exception ioe) {
+            logger.warn("Failed to close Record Writer", ioe);
+        }
+    }
+
+    private String getTransitUri(String... appends) {
+        StringBuilder stringBuilder = new StringBuilder(brokerUri);
+        for(String append : appends) {
+            stringBuilder.append(append);
+        }
+        return stringBuilder.toString();
+    }
+
     @Override
     public void connectionLost(Throwable cause) {
         logger.error("Connection to {} lost due to: {}", new Object[]{broker, 
cause.getMessage()}, cause);
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index d6d4040..3d34d94 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -62,6 +62,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
     protected ComponentLog logger;
     protected IMqttClient mqttClient;
     protected volatile String broker;
+    protected volatile String brokerUri;
     protected volatile String clientID;
     protected MqttConnectOptions connOpts;
     protected MemoryPersistence persistence = new MemoryPersistence();
@@ -314,6 +315,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
 
     protected void onScheduled(final ProcessContext context){
         broker = context.getProperty(PROP_BROKER_URI).getValue();
+        brokerUri = broker.endsWith("/") ? broker : broker + "/";
         clientID = 
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
 
         if (clientID == null) {
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
index 5518193..6a5b5f1 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -20,11 +20,15 @@ package org.apache.nifi.processors.mqtt.common;
 import io.moquette.proto.messages.AbstractMessage;
 import io.moquette.proto.messages.PublishMessage;
 import io.moquette.server.Server;
+
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processors.mqtt.ConsumeMQTT;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -53,6 +57,7 @@ import static org.junit.Assert.assertTrue;
 public abstract class TestConsumeMqttCommon {
 
     public int PUBLISH_WAIT_MS = 1000;
+    public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
 
     public Server MQTT_server;
     public TestRunner testRunner;
@@ -409,6 +414,227 @@ public abstract class TestConsumeMqttCommon {
         flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
     }
 
+    @Test
+    public void testConsumeRecordsWithAddedFields() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        testRunner.addControllerService("record-reader", jsonReader);
+        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        testRunner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService("record-writer", jsonWriter);
+        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        testRunner.enableControllerService(jsonWriter);
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache 
NiFi\"}".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        testMessage.setRetainFlag(false);
+
+        PublishMessage badMessage = new PublishMessage();
+        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
+        badMessage.setTopicName("testTopic");
+        badMessage.setDupFlag(false);
+        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        badMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+        internalPublish(badMessage);
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() == 1);
+        assertEquals("[{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+                + "{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
+                new String(flowFiles.get(0).toByteArray()));
+
+        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertTrue(badFlowFiles.size() == 1);
+        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
+
+        // clean runner by removing records reader/writer
+        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
+        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
+    }
+
+    @Test
+    public void testConsumeDemarcator() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache 
NiFi\"}".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        testMessage.setRetainFlag(false);
+
+        PublishMessage badMessage = new PublishMessage();
+        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
+        badMessage.setTopicName("testTopic");
+        badMessage.setDupFlag(false);
+        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        badMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+        internalPublish(badMessage);
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertEquals(flowFiles.size(), 1);
+        assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+                + THIS_IS_NOT_JSON + "\\n"
+                + "{\"name\":\"Apache NiFi\"}\\n",
+                new String(flowFiles.get(0).toByteArray()));
+
+        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertTrue(badFlowFiles.size() == 0);
+
+        // clean runner by removing message demarcator
+        testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
+    }
+
+    @Test
+    public void testConsumeRecordsWithoutAddedFields() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
+        testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        testRunner.addControllerService("record-reader", jsonReader);
+        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        testRunner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService("record-writer", jsonWriter);
+        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        testRunner.enableControllerService(jsonWriter);
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache 
NiFi\"}".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        testMessage.setRetainFlag(false);
+
+        PublishMessage badMessage = new PublishMessage();
+        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
+        badMessage.setTopicName("testTopic");
+        badMessage.setDupFlag(false);
+        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        badMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+        internalPublish(badMessage);
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() == 1);
+        assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache 
NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
+
+        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertTrue(badFlowFiles.size() == 1);
+        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
+
+        // clean runner by removing records reader/writer
+        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
+        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
+    }
+
+    @Test
+    public void testConsumeRecordsOnlyBadData() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
+        testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        testRunner.addControllerService("record-reader", jsonReader);
+        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        testRunner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService("record-writer", jsonWriter);
+        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        testRunner.enableControllerService(jsonWriter);
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage badMessage = new PublishMessage();
+        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
+        badMessage.setTopicName("testTopic");
+        badMessage.setDupFlag(false);
+        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        badMessage.setRetainFlag(false);
+
+        internalPublish(badMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertTrue(badFlowFiles.size() == 1);
+        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
+
+        // clean runner by removing records reader/writer
+        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
+        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
+    }
+
     private static boolean isConnected(AbstractMQTTProcessor processor) throws 
NoSuchFieldException, IllegalAccessException {
         Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
         f.setAccessible(true);

Reply via email to