[nifi] branch master updated: NIFI-6856 - Make client ID a non-required field for the MQTTConsume and MQTTProduce processors. Generates a random ID if not set.

2020-02-26 Thread pvillard
This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
 new 23b04ae  NIFI-6856 - Make client ID a non-required field for the 
MQTTConsume and MQTTProduce processors. Generates a random ID if not set.
23b04ae is described below

commit 23b04ae96863fc3b0fd3b268bcb7d155edf17f26
Author: Justin Miller 
AuthorDate: Fri Nov 8 11:34:57 2019 -0600

NIFI-6856 - Make client ID a non-required field for the MQTTConsume and 
MQTTProduce processors. Generates a
random ID if not set.

Also add group ID field to ConsumeMQTT processor. Allows consumer to join 
consumer group at $share//

add expression language support for the MQTT client ID

Setting client id in publish test fails because it is not a flowfile 
attribute.
Remove client id and autogenerate it when testing.

Since the evaluation is done in onScheduled, there is no flow file 
available and we're not using the attributes to make the expression language 
evaluation. You can change the scope to use the Variable Registry.

Co-Authored-By: Pierre Villard 
Signed-off-by: Pierre Villard 

This closes #3879.
---
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   | 22 +-
 .../mqtt/common/AbstractMQTTProcessor.java | 13 ++---
 .../nifi/processors/mqtt/TestPublishMQTT.java  |  1 -
 3 files changed, 31 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index f0cba72..94d5397 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -90,6 +90,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  
"mqtt.isDuplicate";
 public final static String IS_RETAINED_ATTRIBUTE_KEY =  "mqtt.isRetained";
 
+public static final PropertyDescriptor PROP_GROUPID = new 
PropertyDescriptor.Builder()
+.name("Group ID")
+.description("MQTT consumer group ID to use. If group ID not set, 
client will connect as individual consumer.")
+.required(false)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
 public static final PropertyDescriptor PROP_TOPIC_FILTER = new 
PropertyDescriptor.Builder()
 .name("Topic Filter")
 .description("The MQTT topic filter to designate the topics to 
subscribe to.")
@@ -121,6 +128,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 private volatile long maxQueueSize;
 
 private volatile int qos;
+private volatile String topicPrefix = "";
 private volatile String topicFilter;
 private final AtomicBoolean scheduled = new AtomicBoolean(false);
 
@@ -136,6 +144,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 
 static{
 final List innerDescriptorsList = 
getAbstractPropertyDescriptors();
+innerDescriptorsList.add(PROP_GROUPID);
 innerDescriptorsList.add(PROP_TOPIC_FILTER);
 innerDescriptorsList.add(PROP_QOS);
 innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
@@ -184,6 +193,12 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 .build());
 }
 
+final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet();
+final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet();
+if (clientIDSet && groupIDSet) {
+results.add(new ValidationResult.Builder().subject("Client ID and 
Group ID").valid(false).explanation("if client ID is not unique, multiple nodes 
cannot join the consumer group").build());
+}
+
 return results;
 }
 
@@ -208,6 +223,11 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 qos = context.getProperty(PROP_QOS).asInteger();
 maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
 topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
+
+if (context.getProperty(PROP_GROUPID).isSet()) {
+topicPrefix = "$share/" + 
context.getProperty(PROP_GROUPID).getValue() + "/";
+}
+
 scheduled.set(true);
 }
 
@@ -266,7 +286,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 if (!mqttClient.isConnected()) {
 

[nifi] branch master updated: NIFI-7205 NIFI-7206

2020-02-26 Thread pvillard
This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
 new 4cd63c9  NIFI-7205 NIFI-7206
4cd63c9 is described below

commit 4cd63c99e8f412650caa2f3b775a9647f6414a98
Author: Joe Witt 
AuthorDate: Wed Feb 26 11:55:21 2020 -0800

NIFI-7205 NIFI-7206

Signed-off-by: Pierre Villard 

This closes #4093.
---
 .../state/providers/zookeeper/TestZooKeeperStateProvider.java  | 10 +-
 .../standard/relp/handler/TestRELPSocketChannelHandler.java|  4 ++--
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
index 091b13c..cac0cf9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
@@ -45,7 +45,7 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
 private static final Map defaultProperties = 
new HashMap<>();
 
 static {
-defaultProperties.put(ZooKeeperStateProvider.SESSION_TIMEOUT, "3 
secs");
+defaultProperties.put(ZooKeeperStateProvider.SESSION_TIMEOUT, "15 
secs");
 defaultProperties.put(ZooKeeperStateProvider.ROOT_NODE, 
"/nifi/team1/testing");
 defaultProperties.put(ZooKeeperStateProvider.ACCESS_CONTROL, 
ZooKeeperStateProvider.OPEN_TO_WORLD.getValue());
 }
@@ -131,7 +131,7 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
 return provider;
 }
 
-@Test(timeout = 2)
+@Test(timeout = 3)
 public void testStateTooLargeExceptionThrownOnSetState() throws 
InterruptedException {
 final Map state = new HashMap<>();
 final StringBuilder sb = new StringBuilder();
@@ -157,7 +157,7 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
 // If we attempt to interact with the server too quickly, we 
will get a
 // ZooKeeper ConnectionLoss Exception, which the provider 
wraps in an IOException.
 // We will wait 1 second in this case and try again. The test 
will timeout if this
-// does not succeeed within 20 seconds.
+// does not succeeed within 30 seconds.
 Thread.sleep(1000L);
 } catch (final Exception e) {
 e.printStackTrace();
@@ -166,7 +166,7 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
 }
 }
 
-@Test(timeout = 2)
+@Test(timeout = 3)
 public void testStateTooLargeExceptionThrownOnReplace() throws 
IOException, InterruptedException {
 final Map state = new HashMap<>();
 final StringBuilder sb = new StringBuilder();
@@ -192,7 +192,7 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
 // If we attempt to interact with the server too quickly, we 
will get a
 // ZooKeeper ConnectionLoss Exception, which the provider 
wraps in an IOException.
 // We will wait 1 second in this case and try again. The test 
will timeout if this
-// does not succeeed within 20 seconds.
+// does not succeeed within 30 seconds.
 Thread.sleep(1000L);
 }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
index 4ac1fb7..5bfaca7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
@@ -153,8 +153,8 @@ public class TestRELPSocketChannelHandler {
 }
 }
 
-// wait up to 10 seconds to verify the responses
-long timeout = 1;
+// wait up to 25 seconds to verify the responses
+long timeout = 25000;
 long 

[nifi] branch master updated: NIFI-7139 Add record.error.message on failure of a record reader or writer

2020-02-26 Thread mattyb149
This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
 new 0bb8ce7  NIFI-7139 Add record.error.message on failure of a record 
reader or writer
0bb8ce7 is described below

commit 0bb8ce7438d9855dcca6bf89e3a672d1f9477593
Author: Shawn Weeks 
AuthorDate: Thu Feb 13 08:39:49 2020 -0600

NIFI-7139 Add record.error.message on failure of a record reader or writer

Handle scenario where message might be null.

Update to test case that was failing because adding attributes modified a 
flow file even if you don't change the contents.

Fixed Style Issues and Updated WritesAttributes.

Added Test Case for Error Message

Signed-off-by: Matthew Burgess 

This closes #4052
---
 .../nifi/processors/standard/AbstractRecordProcessor.java |  9 +
 .../org/apache/nifi/processors/standard/ConvertRecord.java|  3 ++-
 .../org/apache/nifi/processors/standard/UpdateRecord.java |  6 +-
 .../apache/nifi/processors/standard/TestConvertRecord.java| 11 ++-
 4 files changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 8ccea5a..1ea70e2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -174,6 +174,15 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
 });
 } catch (final Exception e) {
 getLogger().error("Failed to process {}; will route to failure", 
new Object[] {flowFile, e});
+// Since we are wrapping the exceptions above there should always 
be a cause
+// but it's possible it might not have a message. This handles 
that by logging
+// the name of the class thrown.
+Throwable c = e.getCause();
+if (c != null) {
+session.putAttribute(flowFile, "record.error.message", 
(c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : 
c.getClass().getCanonicalName() + " Thrown");
+} else {
+session.putAttribute(flowFile, "record.error.message", 
e.getClass().getCanonicalName() + " Thrown");
+}
 session.transfer(flowFile, REL_FAILURE);
 return;
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index 1be1794..a1e6f99 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -41,7 +41,8 @@ import java.util.List;
 @Tags({"convert", "record", "generic", "schema", "json", "csv", "avro", "log", 
"logs", "freeform", "text"})
 @WritesAttributes({
 @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
-@WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+@WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile"),
+@WritesAttribute(attribute = "record.error.message", description = "This 
attribute provides on failure the error message encountered by the Reader or 
Writer.")
 })
 @CapabilityDescription("Converts records from one data format to another using 
configured Record Reader and Record Write Controller Services. "
 + "The Reader and Writer must be configured with \"matching\" schemas. By 
this, we mean the schemas must have the same field names. The types of the 
fields "
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 8ee5f43..65abdc9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++