This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 93a5823 NIFI-7890 - Added record support to ConsumeMQTT processor 93a5823 is described below commit 93a5823f8ab9b1e6b71fe418e8af3756f4c1c1c2 Author: Pierre Villard <pierre.villard...@gmail.com> AuthorDate: Wed Oct 7 19:05:47 2020 +0200 NIFI-7890 - Added record support to ConsumeMQTT processor This closes #4738. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml | 16 + .../apache/nifi/processors/mqtt/ConsumeMQTT.java | 347 ++++++++++++++++++++- .../mqtt/common/AbstractMQTTProcessor.java | 2 + .../mqtt/common/TestConsumeMqttCommon.java | 226 ++++++++++++++ 4 files changed, 578 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml index 4bfff22..260c46c 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml @@ -68,9 +68,25 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-services</artifactId> <version>1.13.0-SNAPSHOT</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-avro-record-utils</artifactId> + <version>1.13.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index 5f93e65..83c4359 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -33,6 +33,7 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -45,13 +46,29 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SchemaValidationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -62,11 +79,13 @@ import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.RECORD_COUNT_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1; @@ -79,6 +98,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL @CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker") @SeeAlso({PublishMQTT.class}) @WritesAttributes({ + @WritesAttribute(attribute=RECORD_COUNT_KEY, description="The number of records received"), @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was the message source"), @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which message was received"), @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service for this message."), @@ -88,13 +108,25 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single " + "instance of this processor. A high value for this property could represent a lot of data being stored in memory.") -public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { +public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { - public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; - public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic"; - public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos"; - public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate"; - public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained"; + public final static String RECORD_COUNT_KEY = "record.count"; + public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; + public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic"; + public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos"; + public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate"; + public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained"; + + public final static String TOPIC_FIELD_KEY = "_topic"; + public final static String QOS_FIELD_KEY = "_qos"; + public final static String IS_DUPLICATE_FIELD_KEY = "_isDuplicate"; + public final static String IS_RETAINED_FIELD_KEY = "_isRetained"; + + private final static String COUNTER_PARSE_FAILURES = "Parse Failures"; + private final static String COUNTER_RECORDS_RECEIVED = "Records Received"; + private final static String COUNTER_RECORDS_PROCESSED = "Records Processed"; + + private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000; public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder() .name("Group ID") @@ -131,6 +163,46 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for received messages") + .identifiesControllerService(RecordReaderFactory.class) + .required(false) + .build(); + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data before writing it to a FlowFile") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(false) + .build(); + + public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder() + .name("add-attributes-as-fields") + .displayName("Add attributes as fields") + .description("If using the Records reader/writer and if setting this property to true, default fields " + + "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.") + .required(true) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("message-demarcator") + .displayName("Message Demarcator") + .required(false) + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .description("With this property, you have an option to output FlowFiles which contains multiple messages. " + + "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart " + + "multiple messages. This is an optional property ; if not provided, and if not defining a " + + "Reader/Writer, each message received will result in a single FlowFile which. To enter special " + + "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.") + .build(); + private volatile int qos; private volatile String topicPrefix = ""; private volatile String topicFilter; @@ -143,6 +215,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback .description("The MQTT message output") .build(); + public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder() + .name("parse.failure") + .description("If a message cannot be parsed using the configured Record Reader, the contents of the " + + "message will be routed to this Relationship as its own individual FlowFile.") + .autoTerminateDefault(true) // to make sure flow are still valid after upgrades + .build(); + private static final List<PropertyDescriptor> descriptors; private static final Set<Relationship> relationships; @@ -152,10 +231,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback innerDescriptorsList.add(PROP_TOPIC_FILTER); innerDescriptorsList.add(PROP_QOS); innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE); + innerDescriptorsList.add(RECORD_READER); + innerDescriptorsList.add(RECORD_WRITER); + innerDescriptorsList.add(ADD_ATTRIBUTES_AS_FIELDS); + innerDescriptorsList.add(MESSAGE_DEMARCATOR); descriptors = Collections.unmodifiableList(innerDescriptorsList); final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>(); innerRelationshipsSet.add(REL_MESSAGE); + innerRelationshipsSet.add(REL_PARSE_FAILURE); relationships = Collections.unmodifiableSet(innerRelationshipsSet); } @@ -200,6 +284,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet(); final boolean clientIDwithEL = context.getProperty(PROP_CLIENTID).isExpressionLanguagePresent(); final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet(); + if (!clientIDwithEL && clientIDSet && groupIDSet) { results.add(new ValidationResult.Builder() .subject("Client ID and Group ID").valid(false) @@ -209,6 +294,19 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback .build()); } + final boolean readerIsSet = context.getProperty(RECORD_READER).isSet(); + final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet(); + if((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) { + results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false) + .explanation("Both Record Reader and Writer must be set when used").build()); + } + + final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet(); + if(readerIsSet && demarcatorIsSet) { + results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false) + .explanation("You cannot use both a demarcator and a Reader/Writer").build()); + } + return results; } @@ -257,7 +355,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) { logger.info("Finishing processing leftover messages"); ProcessSession session = processSessionFactory.createSession(); - transferQueue(session); + if(context.getProperty(RECORD_READER).isSet()) { + transferQueueRecord(context, session); + } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { + transferQueueDemarcator(context, session); + } else { + transferQueue(session); + } } else { if (mqttQueue!= null && !mqttQueue.isEmpty()){ throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " + @@ -279,10 +383,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback } if (mqttQueue.isEmpty()) { + context.yield(); return; } - transferQueue(session); + if(context.getProperty(RECORD_READER).isSet()) { + transferQueueRecord(context, session); + } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { + transferQueueDemarcator(context, session); + } else { + transferQueue(session); + } } private void initializeClient(ProcessContext context) { @@ -308,8 +419,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback private void transferQueue(ProcessSession session){ while (!mqttQueue.isEmpty()) { - FlowFile messageFlowfile = session.create(); final MQTTQueueMessage mqttMessage = mqttQueue.peek(); + FlowFile messageFlowfile = session.create(); Map<String, String> attrs = new HashMap<>(); attrs.put(BROKER_ATTRIBUTE_KEY, broker); @@ -323,18 +434,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { - out.write(mqttMessage.getPayload()); + out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()); } }); - String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString(); - session.getProvenanceReporter().receive(messageFlowfile, transitUri); + session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic())); session.transfer(messageFlowfile, REL_MESSAGE); session.commit(); if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) { logger.warn(new StringBuilder("FlowFile ") .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key())) - .append(" for Mqtt message ") + .append(" for MQTT message ") .append(mqttMessage) .append(" had already been removed from queue, possible duplication of flow files") .toString()); @@ -342,6 +452,217 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback } } + private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){ + final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8); + + FlowFile messageFlowfile = session.create(); + session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker); + + + messageFlowfile = session.append(messageFlowfile, out -> { + int i = 0; + while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) { + final MQTTQueueMessage mqttMessage = mqttQueue.poll(); + out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()); + out.write(demarcator); + session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); + i++; + } + }); + + session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(topicPrefix, topicFilter)); + session.transfer(messageFlowfile, REL_MESSAGE); + session.commit(); + } + + private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) { + FlowFile messageFlowfile = session.create(); + + Map<String, String> attrs = new HashMap<>(); + attrs.put(BROKER_ATTRIBUTE_KEY, broker); + attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic()); + attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos())); + attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate())); + attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained())); + + messageFlowfile = session.putAllAttributes(messageFlowfile, attrs); + + messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(mqttMessage.getPayload()); + } + }); + + session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic())); + session.transfer(messageFlowfile, REL_PARSE_FAILURE); + session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false); + } + + private void transferQueueRecord(final ProcessContext context, final ProcessSession session){ + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + FlowFile flowFile = session.create(); + session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker); + + final Map<String, String> attributes = new HashMap<>(); + final AtomicInteger recordCount = new AtomicInteger(); + + final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>(); + + RecordSetWriter writer = null; + boolean isWriterInitialized = false; + int i = 0; + + try { + while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) { + final MQTTQueueMessage mqttMessage = mqttQueue.poll(); + if(mqttMessage == null) { + break; + } + + final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload(); + + try (final InputStream in = new ByteArrayInputStream(recordBytes)) { + final RecordReader reader; + + try { + reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger); + } catch (final Exception e) { + logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e); + transferFailure(session, mqttMessage); + continue; + } + + try { + Record record; + while ((record = reader.nextRecord()) != null) { + + if(!isWriterInitialized) { + final RecordSchema recordSchema = record.getSchema(); + final OutputStream rawOut = session.write(flowFile); + + RecordSchema writeSchema; + try { + writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); + if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { + final List<RecordField> fields = new ArrayList<>(); + fields.addAll(writeSchema.getFields()); + + fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType())); + fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType())); + fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType())); + + writeSchema = new SimpleRecordSchema(fields); + } + } catch (final Exception e) { + logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e); + transferFailure(session, mqttMessage); + continue; + } + + writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile); + writer.beginRecordSet(); + } + + try { + if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { + record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic()); + record.setValue(QOS_FIELD_KEY, mqttMessage.getQos()); + record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained()); + record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate()); + } + writer.write(record); + isWriterInitialized = true; + doneList.add(mqttMessage); + } catch (final RuntimeException re) { + logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re); + transferFailure(session, mqttMessage); + continue; + } + + session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); + i++; + } + } catch (final IOException | MalformedRecordException | SchemaValidationException e) { + logger.error("Failed to write message, sending to the parse failure relationship", e); + transferFailure(session, mqttMessage); + continue; + } + } catch (Exception e) { + logger.error("Failed to write message, sending to the parse failure relationship", e); + transferFailure(session, mqttMessage); + continue; + } + } + + if(writer != null) { + final WriteResult writeResult = writer.finishRecordSet(); + attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + recordCount.set(writeResult.getRecordCount()); + } + + } catch (final Exception e) { + context.yield(); + + // we try to add the messages back into the internal queue + int numberOfMessages = 0; + for(MQTTQueueMessage done : doneList) { + try { + mqttQueue.offer(done, 1, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + numberOfMessages++; + if(getLogger().isDebugEnabled()) { + logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex); + } + } + } + if(numberOfMessages > 0) { + logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages}); + } + + throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e); + } finally { + closeWriter(writer); + } + + if(recordCount.get() == 0) { + session.remove(flowFile); + return; + } + + session.putAllAttributes(flowFile, attributes); + session.getProvenanceReporter().receive(flowFile, getTransitUri(topicPrefix, topicFilter)); + session.transfer(flowFile, REL_MESSAGE); + session.commit(); + + final int count = recordCount.get(); + session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false); + getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile}); + } + + private void closeWriter(final RecordSetWriter writer) { + try { + if (writer != null) { + writer.close(); + } + } catch (final Exception ioe) { + logger.warn("Failed to close Record Writer", ioe); + } + } + + private String getTransitUri(String... appends) { + StringBuilder stringBuilder = new StringBuilder(brokerUri); + for(String append : appends) { + stringBuilder.append(append); + } + return stringBuilder.toString(); + } + @Override public void connectionLost(Throwable cause) { logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java index d6d4040..3d34d94 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java @@ -62,6 +62,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces protected ComponentLog logger; protected IMqttClient mqttClient; protected volatile String broker; + protected volatile String brokerUri; protected volatile String clientID; protected MqttConnectOptions connOpts; protected MemoryPersistence persistence = new MemoryPersistence(); @@ -314,6 +315,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces protected void onScheduled(final ProcessContext context){ broker = context.getProperty(PROP_BROKER_URI).getValue(); + brokerUri = broker.endsWith("/") ? broker : broker + "/"; clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue(); if (clientID == null) { diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java index 5518193..6a5b5f1 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java @@ -20,11 +20,15 @@ package org.apache.nifi.processors.mqtt.common; import io.moquette.proto.messages.AbstractMessage; import io.moquette.proto.messages.PublishMessage; import io.moquette.server.Server; + +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processors.mqtt.ConsumeMQTT; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -53,6 +57,7 @@ import static org.junit.Assert.assertTrue; public abstract class TestConsumeMqttCommon { public int PUBLISH_WAIT_MS = 1000; + public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON"; public Server MQTT_server; public TestRunner testRunner; @@ -409,6 +414,227 @@ public abstract class TestConsumeMqttCommon { flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); } + @Test + public void testConsumeRecordsWithAddedFields() throws Exception { + testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader"); + testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer"); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + testRunner.addControllerService("record-reader", jsonReader); + testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema"); + testRunner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + testRunner.addControllerService("record-writer", jsonWriter); + testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + testRunner.enableControllerService(jsonWriter); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT, testRunner.getProcessContext()); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + testMessage.setRetainFlag(false); + + PublishMessage badMessage = new PublishMessage(); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); + badMessage.setTopicName("testTopic"); + badMessage.setDupFlag(false); + badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + badMessage.setRetainFlag(false); + + internalPublish(testMessage); + internalPublish(badMessage); + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() == 1); + assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}," + + "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]", + new String(flowFiles.get(0).toByteArray())); + + List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE); + assertTrue(badFlowFiles.size() == 1); + assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray())); + + // clean runner by removing records reader/writer + testRunner.removeProperty(ConsumeMQTT.RECORD_READER); + testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER); + } + + @Test + public void testConsumeDemarcator() throws Exception { + testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n"); + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT, testRunner.getProcessContext()); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + testMessage.setRetainFlag(false); + + PublishMessage badMessage = new PublishMessage(); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); + badMessage.setTopicName("testTopic"); + badMessage.setDupFlag(false); + badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + badMessage.setRetainFlag(false); + + internalPublish(testMessage); + internalPublish(badMessage); + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertEquals(flowFiles.size(), 1); + assertEquals("{\"name\":\"Apache NiFi\"}\\n" + + THIS_IS_NOT_JSON + "\\n" + + "{\"name\":\"Apache NiFi\"}\\n", + new String(flowFiles.get(0).toByteArray())); + + List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE); + assertTrue(badFlowFiles.size() == 0); + + // clean runner by removing message demarcator + testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR); + } + + @Test + public void testConsumeRecordsWithoutAddedFields() throws Exception { + testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader"); + testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer"); + testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false"); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + testRunner.addControllerService("record-reader", jsonReader); + testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema"); + testRunner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + testRunner.addControllerService("record-writer", jsonWriter); + testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + testRunner.enableControllerService(jsonWriter); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT, testRunner.getProcessContext()); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + testMessage.setRetainFlag(false); + + PublishMessage badMessage = new PublishMessage(); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); + badMessage.setTopicName("testTopic"); + badMessage.setDupFlag(false); + badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + badMessage.setRetainFlag(false); + + internalPublish(testMessage); + internalPublish(badMessage); + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() == 1); + assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache NiFi\"}]", new String(flowFiles.get(0).toByteArray())); + + List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE); + assertTrue(badFlowFiles.size() == 1); + assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray())); + + // clean runner by removing records reader/writer + testRunner.removeProperty(ConsumeMQTT.RECORD_READER); + testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER); + } + + @Test + public void testConsumeRecordsOnlyBadData() throws Exception { + testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader"); + testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer"); + testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false"); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + testRunner.addControllerService("record-reader", jsonReader); + testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema"); + testRunner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + testRunner.addControllerService("record-writer", jsonWriter); + testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + testRunner.enableControllerService(jsonWriter); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT, testRunner.getProcessContext()); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage badMessage = new PublishMessage(); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); + badMessage.setTopicName("testTopic"); + badMessage.setDupFlag(false); + badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + badMessage.setRetainFlag(false); + + internalPublish(badMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE); + assertTrue(badFlowFiles.size() == 1); + assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray())); + + // clean runner by removing records reader/writer + testRunner.removeProperty(ConsumeMQTT.RECORD_READER); + testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER); + } + private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException { Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient"); f.setAccessible(true);