Re: Nifi to Titan... How?

2016-05-19 Thread Joe Witt
Pat

It looks like Titan can be backed by Apache Cassandra or Apache HBase.
NiFi can deliver to both of those.  Would that take care of what
you're looking to do?

Thanks
Joe

On Thu, May 19, 2016 at 11:16 PM, Pat Trainor  wrote:
> Thanks to everyone making this now open sourced tool awesome.
>
> I can't find anything that directly links the 2.
>
> I want to use Nifi to coordinate page scraping, parsing, and finally throw
> Titan data for graphs.
>
> Do I have to use Kafka or Spark for this?
>
> I'm looking at the output mechanisms (Processors), and I'm not a JSON
> expert, but I can write anything needed in Java...
>
> I kind of like the elegance of Titan on Cassandra, and am reticent to add
> more animals to my little ark!
>
> Just looking for pointers to the tech that would fit neatly...
>
> Thanks
> in advance for your insights
> !
>
> pat 
> ( ͡° ͜ʖ ͡°)


Nifi to Titan... How?

2016-05-19 Thread Pat Trainor
Thanks to everyone making this now open sourced tool awesome.

​I can't find anything that directly links the 2.

I want to use ​Nifi to coordinate page scraping, parsing, and finally throw
Titan data for graphs.

​Do I have to use Kafka or Spark for this?

I'm looking at the output mechanisms (Processors), and I'm not a JSON
expert, but I can write anything needed in Java...

I kind of like the elegance of Titan on Cassandra, and am reticent to add
more animals to my little ark!​

​Just looking for pointers to the tech that would fit neatly...​

Thanks
​ in advance for your insights​
!

pat 
( ͡° ͜ʖ ͡°)


[GitHub] nifi pull request: Nifi 539c

2016-05-19 Thread mosermw
Github user mosermw commented on the pull request:

https://github.com/apache/nifi/pull/453#issuecomment-220499039
  
@eorgad your interest and willingness to get involved with the NiFi 
community is certainly appreciated.  In this case, I don't think the Travis 
build failed on the code in this PR.  If you run "mvn clean install 
-Pcontrib-check" in your branch successfully, then that should give you 
confidence that your PR will be fine.

I do think there is value to a PutSCP processor over using ExecuteProcess.  
I personally wouldn't want to remember the specifics of the scp command in 
order to configure ExecuteProcess properly, so PutSCP would be a boon right 
there.

I think special care is needed to design this processor to the strengths 
and limitations of the scp protocol.  For instance the PutSFTP processor will 
first sftp.put() to a temporary file, then sftp.rename() that temp file to the 
intended permanent name (this is to avoid the possibility that automated 
processes on the remote host pick up an incomplete file in the middle of 
transfer).  PutSFTP also has options to sftp.setMtime(), sftp.chmod(), and 
sftp.chown().  I'm not sure how much of this interaction is supported in the 
scp protocol.

For processors like ListSCP or GetSCP, which probably would need to rely on 
getting a directory listing on the remote host, I'm having a difficult time 
determining if the scp protocol can do this.  I can see a FetchSCP, though, 
when it's given a specific filename on the remote host to transfer into NiFi.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Is it possible to deploy NiFi to an existing app server like WildFly?

2016-05-19 Thread Matt Gilman
Thanks for the question and your interest in Apache NiFi!

Currently, NiFi runs an embedded web server. There are a number of reasons for 
this but most importantly is ease of deployment. By embedding the web server we 
can expose the relevant settings and ensure proper bootstrapping for our 
extension points. NiFi offers a rich extension model (most notably Processors) 
and ensures class path isolation that would be more difficult without 
controlling the web server. Additionally, NiFi is comprised of a number of web 
applications as we offer a number of UI extension points which are 
automatically discovered and deployed to our embedded web server. There is no 
current plans to support deploying NiFi to an external web server.

Let me know if you have any other questions!

Matt

Sent from my iPhone

> On May 19, 2016, at 7:49 PM, Phu-Thien Tran  wrote:
> 
> We have a requirement to run NiFi in an approved app server. Is it
> possible to do that? If not, is there a plan to support that? Thanks.


Is it possible to deploy NiFi to an existing app server like WildFly?

2016-05-19 Thread Phu-Thien Tran
We have a requirement to run NiFi in an approved app server. Is it
possible to do that? If not, is there a plan to support that? Thanks.


[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...

2016-05-19 Thread mcgilman
Github user mcgilman commented on the pull request:

https://github.com/apache/nifi/pull/452#issuecomment-220469545
  
@alopresto Great comment about the UserService. The reason we cannot do 
that directly is the extension point that is getting discovered is an 
Authorizer. However, this did make me think of possibly changing the 
MutableAuthorizer into an abstract class which implements (and marks final) the 
authorize() method. Then the MutableAuthorizer would simply handle 
User/Group/Policy persistence. Maybe the name changes too... something like 
AbstractPolicyBasedAuthorizer. The NiFi internal implementation would look like

`class FileAuthorizer extends AbstractPolicyBasedAuthorizer`

We'll hash out some of the details and update the PR accordingly. May be a 
good place to handle duplicate detection and whatnot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 539c

2016-05-19 Thread eorgad
Github user eorgad commented on the pull request:

https://github.com/apache/nifi/pull/453#issuecomment-220467272
  
Taking a dipper look at this, the the ScpTo.java code uses the following:
...
 // exec 'scp -t rfile' remotely
  String command="scp " + (ptimestamp ? "-p" :"") +" -t "+rfile;
  Channel channel=session.openChannel("exec");
  ((ChannelExec)channel).setCommand(command);

  // get I/O streams for remote scp
  OutputStream out=channel.getOutputStream();
  InputStream in=channel.getInputStream();
  ...
  So implementing this would probably be the equivalent of invoking an 
ExecuteProcess 
  and providing it with the parameters for scp command. So the same 
question @mosermw  
  raised where is the value in those processors if I would end up with the 
same implementation 
  as ExecuteProcess. In both cases using the native binaries for scp.
  
  Also as I am new to contributing to Nifi looks like the verification 
checks failed, when I look 
  at the build it succeeded. Am I doing something wrong procedurally or did 
it time out before the 
  build completed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...

2016-05-19 Thread alopresto
Github user alopresto commented on the pull request:

https://github.com/apache/nifi/pull/452#issuecomment-220464739
  
@mcgilman thanks for the response above. It feels a little to me like the 
authorizer has gotten conflated with user and group management tasks. If the 
authorizer' responsibility is strictly to manage and enforce policy, then we 
probably want a `UserService` to manage user retrieval, modification, etc. 
Maybe this is injected into the authorizer to allow various identity provider 
implementations (LDAP, Kerberos, etc.) to work with various authorizers 
(Ranger, NiFi internal, etc.) I'm not going to hold up this step with the 
understanding there are still more design decisions to be made. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...

2016-05-19 Thread mcgilman
Github user mcgilman commented on the pull request:

https://github.com/apache/nifi/pull/452#issuecomment-220457062
  
With the builders included, looks good to me.

@jtstorck @alopresto Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...

2016-05-19 Thread mcgilman
Github user mcgilman commented on a diff in the pull request:

https://github.com/apache/nifi/pull/452#discussion_r63956868
  
--- Diff: 
nifi-api/src/main/java/org/apache/nifi/authorization/AccessPolicy.java ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Defines a policy for a set of entities to perform a set of actions on a 
given resource.
+ */
+public class AccessPolicy {
+
+private final String identifier;
+
+private final Resource resource;
+
+private final Set entities;
+
+private final Set actions;
+
+private AccessPolicy(final AccessPolicyBuilder builder) {
+this.identifier = builder.identifier;
+this.resource = builder.resource;
+
+Set entities = new HashSet<>();
+if (builder.entities != null) {
+entities.addAll(builder.entities);
+}
+this.entities = Collections.unmodifiableSet(entities);
+
+Set actions = new HashSet<>();
+if (builder.actions != null) {
+actions.addAll(builder.actions);
+}
+this.actions = Collections.unmodifiableSet(actions);
+
+if (this.identifier == null || this.identifier.trim().isEmpty()) {
+throw new IllegalArgumentException("Identifier can not be null 
or empty");
+}
+
+if (this.resource == null) {
+throw new IllegalArgumentException("Resource can not be null");
+}
+
+if (this.entities == null || this.entities.isEmpty()) {
+throw new IllegalArgumentException("Entities can not be null 
or empty");
+}
+
+if (this.actions == null || this.actions.isEmpty()) {
+throw new IllegalArgumentException("Actions can not be null or 
empty");
+}
+}
+
+/**
+ * @return the identifier for this policy
+ */
+public String getIdentifier() {
+return identifier;
+}
+
+/**
+ * @return the resource for this policy
+ */
+public Resource getResource() {
+return resource;
+}
+
+/**
+ * @return an unmodifiable set of entity ids for this policy
+ */
+public Set getEntities() {
+return entities;
+}
+
+/**
+ * @return an unmodifiable set of actions for this policy
+ */
+public Set getActions() {
+return actions;
+}
+
+@Override
+public boolean equals(Object obj) {
+if (obj == null) {
+return false;
+}
+if (getClass() != obj.getClass()) {
+return false;
+}
+
+final AccessPolicy other = (AccessPolicy) obj;
+return Objects.equals(this.identifier, other.identifier);
+}
+
+@Override
+public int hashCode() {
+return Objects.hashCode(this.identifier);
+}
+
+@Override
+public String toString() {
+return String.format("identifier[%s], resource[%s], entityId[%s], 
action[%s]",
--- End diff --

I think there is a trailing ", " in the String.format. This applies the 
toString() in User, Group, and AccessPolicy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...

2016-05-19 Thread mcgilman
Github user mcgilman commented on a diff in the pull request:

https://github.com/apache/nifi/pull/452#discussion_r63955889
  
--- Diff: 
nifi-api/src/main/java/org/apache/nifi/authorization/AccessPolicy.java ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Defines a policy for a set of entities to perform a set of actions on a 
given resource.
+ */
+public class AccessPolicy {
+
+private final String identifier;
+
+private final Resource resource;
+
+private final Set entities;
+
+private final Set actions;
+
+private AccessPolicy(final AccessPolicyBuilder builder) {
+this.identifier = builder.identifier;
+this.resource = builder.resource;
+
+Set entities = new HashSet<>();
+if (builder.entities != null) {
--- End diff --

Pretty sure this is guaranteed non-null based on the Builder. This comment 
applies to all the Set's in User, Group, and AccessPolicy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1498: putFile - Flowfiles routed to failur...

2016-05-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/445


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1498: putFile - Flowfiles routed to failur...

2016-05-19 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/445#issuecomment-220438461
  
Thanks for tackling this.  Will merge.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1898 fixed @OnSchedule methods to accept P...

2016-05-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/455


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1898 fixed @OnSchedule methods to accept P...

2016-05-19 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/455#issuecomment-220436789
  
Was able to build and verify that the Flume processors work as anticipated 
with this change.  Thanks! Will merge in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on the pull request:

https://github.com/apache/nifi/pull/392#issuecomment-220432007
  
@olegz pushed out a commit addressing the comments (sans the SSL Context 
Service property mapping, due to my comment above)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63941204
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
+
+protected ProcessorLog logger;
+protected IMqttClient mqttClient;
+protected final Object mqttClientConnectLock = new Object();
+protected volatile String broker;
+protected volatile String clientID;
+protected MqttConnectOptions connOpts;
+protected MemoryPersistence persistence = new MemoryPersistence();
+
+public ProcessSessionFactory processSessionFactory;
+
+public static final Validator QOS_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+Integer inputInt = Integer.parseInt(input);
+if (inputInt < 0 || inputInt > 2) {
+return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
+}
+return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+}
+};
+
+public static final Validator BROKER_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+try{
+URI brokerURI = new URI(input);
+if (!"".equals(brokerURI.getPath())) {
+ret

Removal of ProcessorLog

2016-05-19 Thread Aldrin Piri
Hey folks,

As a heads up, PR #403 [1]/NIFI-1811 is to be merged in imminently.  This
is part of cleaning up some of our code debt as we move toward 1.0 and
removing of the ProcessorLog which was deprecated/supplanted by
ComponentLog.

For contributors, if possible, it would be desirable to make use of the
ComponentLog proactively in any of your 0.x endeavors where *new* logging
is being incorporated.  This is completely API compatible and the super
interface of the ProcessorLog (which had been relegated to nothing more
than a marker interface to maintain API compatibility when its methods were
extracted).  I just now noticed that this interface is not tagged as
deprecated and will make that adjustment on the 0.x branch to help with
this process and give a little guidance on this direction.

For committers, there is a bit of an effort required to adjust any
additions to the 1.0 branch to ensure any *new* usage of the ProcessorLog
in 0.x contributions are migrated to ComponentLog.

Thanks!

[1] https://github.com/apache/nifi/pull/403


[GitHub] nifi pull request: NIFI-1811 Removed ProcessorLog and updated depe...

2016-05-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/403


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1811 Removed ProcessorLog and updated depe...

2016-05-19 Thread pvillard31
Github user pvillard31 commented on the pull request:

https://github.com/apache/nifi/pull/403#issuecomment-220421867
  
No problem @apiri. I agree: the sooner the better. Let me know if you need 
something from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1811 Removed ProcessorLog and updated depe...

2016-05-19 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/403#issuecomment-220420085
  
@pvillard31 Thanks for tackling this.  I was working on some API type items 
and wanted to ensure this got addressed before too much effort went into that 
and the conflicts worsened.  I was able to rebase and  fold the associated 
changes into the new commits since your original PR.  This will be merged in 
shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63919624
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+.name("Topic")
+.description("The topic to publish the message to.")
+.expressionLanguageSupported(true)
+.required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+.name("Quality of Service(QoS)")
+.description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+"Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(QOS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+.name("Retain Message")
+.description("Whether or not the retain flag should be set on 
the MQTT message.")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(RETAIN_VALIDATOR)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.descript

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63920230
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+.name("Topic")
+.description("The topic to publish the message to.")
+.expressionLanguageSupported(true)
+.required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+.name("Quality of Service(QoS)")
+.description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+"Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(QOS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+.name("Retain Message")
+.description("Whether or not the retain flag should be set on 
the MQTT message.")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(RETAIN_VALIDATOR)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.descript

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63919932
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+.name("Topic")
+.description("The topic to publish the message to.")
+.expressionLanguageSupported(true)
+.required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+.name("Quality of Service(QoS)")
+.description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+"Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(QOS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+.name("Retain Message")
+.description("Whether or not the retain flag should be set on 
the MQTT message.")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(RETAIN_VALIDATOR)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("

[GitHub] nifi-minifi-cpp pull request: Minifi 6

2016-05-19 Thread benqiu2016
GitHub user benqiu2016 opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/2

Minifi 6

MINIFI-6: First drop of working C++ MiNiFi

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/nifi-minifi-cpp MINIFI-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/2.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63919082
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
+
+protected ProcessorLog logger;
+protected IMqttClient mqttClient;
+protected final Object mqttClientConnectLock = new Object();
+protected volatile String broker;
+protected volatile String clientID;
+protected MqttConnectOptions connOpts;
+protected MemoryPersistence persistence = new MemoryPersistence();
+
+public ProcessSessionFactory processSessionFactory;
+
+public static final Validator QOS_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+Integer inputInt = Integer.parseInt(input);
+if (inputInt < 0 || inputInt > 2) {
+return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
+}
+return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+}
+};
+
+public static final Validator BROKER_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+try{
+URI brokerURI = new URI(input);
+if (!"".equals(brokerURI.getPath())) {
+return n

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63918814
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+.name("Topic")
+.description("The topic to publish the message to.")
+.expressionLanguageSupported(true)
+.required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+.name("Quality of Service(QoS)")
+.description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+"Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(QOS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+.name("Retain Message")
+.description("Whether or not the retain flag should be set on 
the MQTT message.")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(RETAIN_VALIDATOR)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63918531
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
+
+protected ProcessorLog logger;
+protected IMqttClient mqttClient;
+protected final Object mqttClientConnectLock = new Object();
+protected volatile String broker;
+protected volatile String clientID;
+protected MqttConnectOptions connOpts;
+protected MemoryPersistence persistence = new MemoryPersistence();
+
+public ProcessSessionFactory processSessionFactory;
+
+public static final Validator QOS_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+Integer inputInt = Integer.parseInt(input);
+if (inputInt < 0 || inputInt > 2) {
+return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
+}
+return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+}
+};
+
+public static final Validator BROKER_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+try{
+URI brokerURI = new URI(input);
+if (!"".equals(brokerURI.getPath())) {
+ret

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63918463
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+.name("Topic")
+.description("The topic to publish the message to.")
+.expressionLanguageSupported(true)
+.required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+.name("Quality of Service(QoS)")
+.description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+"Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(QOS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+.name("Retain Message")
+.description("Whether or not the retain flag should be set on 
the MQTT message.")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(RETAIN_VALIDATOR)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.descript

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63917779
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+.name("Topic")
+.description("The topic to publish the message to.")
+.expressionLanguageSupported(true)
+.required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+.name("Quality of Service(QoS)")
+.description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+"Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(QOS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+.name("Retain Message")
+.description("Whether or not the retain flag should be set on 
the MQTT message.")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(RETAIN_VALIDATOR)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63908853
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+.name("Topic")
+.description("The topic to publish the message to.")
+.expressionLanguageSupported(true)
+.required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+.name("Quality of Service(QoS)")
+.description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+"Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(QOS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+.name("Retain Message")
+.description("Whether or not the retain flag should be set on 
the MQTT message.")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(RETAIN_VALIDATOR)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63907479
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
+
+protected ProcessorLog logger;
+protected IMqttClient mqttClient;
+protected final Object mqttClientConnectLock = new Object();
+protected volatile String broker;
+protected volatile String clientID;
+protected MqttConnectOptions connOpts;
+protected MemoryPersistence persistence = new MemoryPersistence();
+
+public ProcessSessionFactory processSessionFactory;
+
+public static final Validator QOS_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+Integer inputInt = Integer.parseInt(input);
+if (inputInt < 0 || inputInt > 2) {
+return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
+}
+return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+}
+};
+
+public static final Validator BROKER_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+try{
+URI brokerURI = new URI(input);
+if (!"".equals(brokerURI.getPath())) {
+return n

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63907025
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
+
+protected ProcessorLog logger;
+protected IMqttClient mqttClient;
+protected final Object mqttClientConnectLock = new Object();
+protected volatile String broker;
+protected volatile String clientID;
+protected MqttConnectOptions connOpts;
+protected MemoryPersistence persistence = new MemoryPersistence();
+
+public ProcessSessionFactory processSessionFactory;
+
+public static final Validator QOS_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+Integer inputInt = Integer.parseInt(input);
+if (inputInt < 0 || inputInt > 2) {
+return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
+}
+return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+}
+};
+
+public static final Validator BROKER_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+try{
+URI brokerURI = new URI(input);
+if (!"".equals(brokerURI.getPath())) {
+ret

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63906843
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
+
+protected ProcessorLog logger;
+protected IMqttClient mqttClient;
+protected final Object mqttClientConnectLock = new Object();
+protected volatile String broker;
+protected volatile String clientID;
+protected MqttConnectOptions connOpts;
+protected MemoryPersistence persistence = new MemoryPersistence();
+
+public ProcessSessionFactory processSessionFactory;
+
+public static final Validator QOS_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+Integer inputInt = Integer.parseInt(input);
+if (inputInt < 0 || inputInt > 2) {
+return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
+}
+return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+}
+};
+
+public static final Validator BROKER_VALIDATOR = new Validator() {
+
+@Override
+public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+try{
+URI brokerURI = new URI(input);
+if (!"".equals(brokerURI.getPath())) {
+return n

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63903305
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+
+@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially // we want to have a consistent mapping between clientID 
and MQTT connection
+@CapabilityDescription("Subscribes to a topic and receives messages from 
an MQTT broker")
+@SeeAlso({PublishMQTT.class})
+@WritesAttributes({
+@WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT 
broker that was the message source"),
+@WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT 
topic on which message was received"),
+@WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality 
of service for this message."),
+@WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, 
description="Whether or not this message might be a duplicate of one which has 
already been received."),
+@WritesAttribute(attribute=IS_RETAINED_ATTRIBUT

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63902702
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+
+@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially // we want to have a consistent mapping between clientID 
and MQTT connection
+@CapabilityDescription("Subscribes to a topic and receives messages from 
an MQTT broker")
+@SeeAlso({PublishMQTT.class})
+@WritesAttributes({
+@WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT 
broker that was the message source"),
+@WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT 
topic on which message was received"),
+@WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality 
of service for this message."),
+@WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, 
description="Whether or not this message might be a duplicate of one which has 
already been received."),
+@WritesAttribute(attribute=IS_RETAINED_ATTRIBUT

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63902327
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+
+@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially // we want to have a consistent mapping between clientID 
and MQTT connection
+@CapabilityDescription("Subscribes to a topic and receives messages from 
an MQTT broker")
+@SeeAlso({PublishMQTT.class})
+@WritesAttributes({
+@WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT 
broker that was the message source"),
+@WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT 
topic on which message was received"),
+@WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality 
of service for this message."),
+@WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, 
description="Whether or not this message might be a duplicate of one which has 
already been received."),
+@WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY

[GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors

2016-05-19 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/392#discussion_r63900979
  
--- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+
+@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially // we want to have a consistent mapping between clientID 
and MQTT connection
+@CapabilityDescription("Subscribes to a topic and receives messages from 
an MQTT broker")
+@SeeAlso({PublishMQTT.class})
+@WritesAttributes({
+@WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT 
broker that was the message source"),
+@WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT 
topic on which message was received"),
+@WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality 
of service for this message."),
+@WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, 
description="Whether or not this message might be a duplicate of one which has 
already been received."),
+@WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY

Re: Hadoop Sequence File Processor changes the key.

2016-05-19 Thread Bryan Bende
Arturo,

That makes sense.

I created this JIRA to capture the issue and conversation:
https://issues.apache.org/jira/browse/NIFI-1902

Feel free to comment on it if I missed anything.

-Bryan


On Thu, May 19, 2016 at 10:27 AM, Arturo Michel <
arturo.mic...@leotech.com.sg> wrote:

> HI Bryan,
>
> Thanks for the response.
>
> I would actually advocate to implement both changes, but the ".sf" suffix
> in the filename has a workaround.
>
> The ".sf" suffix can be changed by changing the filename using the
> UpdateAttribute processor after creation.
>
> As for the key, there is no way to manipulate it after the file has been
> created (this is expected). The key value, however, should be independent
> of filename attribute.
>
> Your proposed solutions seems the best way of achieving it.
>
>
> Best Regards.
> Arturo
>
> 
> From: Bryan Bende 
> Sent: 18 May 2016 02:51
> To: dev@nifi.apache.org
> Subject: Re: Hadoop Sequence File Processor changes the key.
>
> Hi Arturo,
>
> Sorry for the delayed response, and thanks for pointing this out.
>
> I don't have that much experience using sequence files, but assuming the
> ".sf" suffix has no meaning besides aesthetics, then it seems like there
> could be two possible solutions...
>
> One would be to not force the ".sf" suffix to be added to filename, and if
> someone wants that suffix then they can set the filename using
> UpdateAttribute.
>
> The other option would be to not use filename as the key... we could have
> another property like "Key Attribute" and the value would be the name of
> the attribute to use as the key. This way you can still set filename to end
> in ".sf" and the key can be something else.
>
> I lean towards the second approach, what do you think?
>
> -Bryan
>
>
> On Fri, May 13, 2016 at 4:49 AM, Arturo Michel <
> arturo.mic...@leotech.com.sg
> > wrote:
>
> > I am using the createHadoopSequenceFile processor to create a sequence
> > file from incoming data to time stamp the data, using the current time as
> > the key and the data as the value of the sequence file.
> >
> >
> > I change the file name attribute (momentarily) to ${now()} as to get a
> > sequence file where the key is the time and the content is the data.
> > However the processor adds the .sf suffix which makes it all the way to
> the
> > key.
> >
> >
> > I end up with the following structure [40668712567.sf | [data bytes]]
> >
> >
> > I understand that the file is written as filename.sf but shouldn't the
> key
> > omit the .sf suffix and only be the file name?
> >
> >
> > Looking at Processor code in
> >
> >
> > <
> >
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
> > >
> >
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
> >
> >
> > 155 final String fileName =
> > flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
> > 156 flowFile = session.putAttribute(flowFile,
> > CoreAttributes.FILENAME.key(), fileName);
> > 157try {
> > 158flowFile = sequenceFileWriter.writeSequenceFile(flowFile,
> > session, getConfiguration(), compressionType);
> > 159session.transfer(flowFile, RELATIONSHIP_SUCCESS);
> > 160getLogger().info("Transferred flowfile {} to {}", new
> > Object[]{flowFile, RELATIONSHIP_SUCCESS});
> > 161} catch (ProcessException e) {
> > 162getLogger().error("Failed to create Sequence File.
> > Transferring {} to 'failure'", new Object[]{flowFile}, e);
> > 163session.transfer(flowFile, RELATIONSHIP_FAILURE);
> > 164}
> >
> >
> >
> > The file name is changed before passing the flow file to the writer. The
> > default sequence writer (and I think also the others) use the file name
> as
> > received to write the key.
> >
> >
> >
> >
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
> >
> >
> > 117 String key = flowFile.getAttribute(CoreAttributes.FILENAME.key());
> >
> > 118 writer.append(new Text(key), inStreamWritable);
> >
> >
> > If there is a better way of accomplishing this?
> >
> >
> >
> > Best Regards.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > This email is intended only for the individual or entity to which it is
> > addressed and may contain information that is private, restricted,
> > confidential or secret and exempt from disclosure under applicable law.
> > If the reader of this disclaimer is not the intended recipient, you are
> > hereby notified that any dissemination, distribution or copying of this
> > document is strictly prohibited. If you received this in error, please
> > notify the sender and delete it immediately after reading this
> 

Re: Hadoop Sequence File Processor changes the key.

2016-05-19 Thread Arturo Michel
HI Bryan,

Thanks for the response.

I would actually advocate to implement both changes, but the ".sf" suffix in 
the filename has a workaround.

The ".sf" suffix can be changed by changing the filename using the 
UpdateAttribute processor after creation.

As for the key, there is no way to manipulate it after the file has been 
created (this is expected). The key value, however, should be independent of 
filename attribute.

Your proposed solutions seems the best way of achieving it.


Best Regards.
Arturo


From: Bryan Bende 
Sent: 18 May 2016 02:51
To: dev@nifi.apache.org
Subject: Re: Hadoop Sequence File Processor changes the key.

Hi Arturo,

Sorry for the delayed response, and thanks for pointing this out.

I don't have that much experience using sequence files, but assuming the
".sf" suffix has no meaning besides aesthetics, then it seems like there
could be two possible solutions...

One would be to not force the ".sf" suffix to be added to filename, and if
someone wants that suffix then they can set the filename using
UpdateAttribute.

The other option would be to not use filename as the key... we could have
another property like "Key Attribute" and the value would be the name of
the attribute to use as the key. This way you can still set filename to end
in ".sf" and the key can be something else.

I lean towards the second approach, what do you think?

-Bryan


On Fri, May 13, 2016 at 4:49 AM, Arturo Michel  wrote:

> I am using the createHadoopSequenceFile processor to create a sequence
> file from incoming data to time stamp the data, using the current time as
> the key and the data as the value of the sequence file.
>
>
> I change the file name attribute (momentarily) to ${now()} as to get a
> sequence file where the key is the time and the content is the data.
> However the processor adds the .sf suffix which makes it all the way to the
> key.
>
>
> I end up with the following structure [40668712567.sf | [data bytes]]
>
>
> I understand that the file is written as filename.sf but shouldn't the key
> omit the .sf suffix and only be the file name?
>
>
> Looking at Processor code in
>
>
> <
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
> >
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
>
>
> 155 final String fileName =
> flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
> 156 flowFile = session.putAttribute(flowFile,
> CoreAttributes.FILENAME.key(), fileName);
> 157try {
> 158flowFile = sequenceFileWriter.writeSequenceFile(flowFile,
> session, getConfiguration(), compressionType);
> 159session.transfer(flowFile, RELATIONSHIP_SUCCESS);
> 160getLogger().info("Transferred flowfile {} to {}", new
> Object[]{flowFile, RELATIONSHIP_SUCCESS});
> 161} catch (ProcessException e) {
> 162getLogger().error("Failed to create Sequence File.
> Transferring {} to 'failure'", new Object[]{flowFile}, e);
> 163session.transfer(flowFile, RELATIONSHIP_FAILURE);
> 164}
>
>
>
> The file name is changed before passing the flow file to the writer. The
> default sequence writer (and I think also the others) use the file name as
> received to write the key.
>
>
>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
>
>
> 117 String key = flowFile.getAttribute(CoreAttributes.FILENAME.key());
>
> 118 writer.append(new Text(key), inStreamWritable);
>
>
> If there is a better way of accomplishing this?
>
>
>
> Best Regards.
>
>
>
>
>
>
>
>
>
>
> This email is intended only for the individual or entity to which it is
> addressed and may contain information that is private, restricted,
> confidential or secret and exempt from disclosure under applicable law.
> If the reader of this disclaimer is not the intended recipient, you are
> hereby notified that any dissemination, distribution or copying of this
> document is strictly prohibited. If you received this in error, please
> notify the sender and delete it immediately after reading this disclaimer.
> Thank you.
>
>
>
>



This email is intended only for the individual or entity to which it is 
addressed and may contain information that is private, restricted, confidential 
or secret and exempt from disclosure under applicable law.
If the reader of this disclaimer is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this document is 
strictly prohibited. If you received this in error, please notify the sender 
and delete it immediately after reading this disclaimer.
Thank you.





[GitHub] nifi pull request: NIFI-1898 fixed @OnSchedule methods to accept P...

2016-05-19 Thread olegz
GitHub user olegz opened a pull request:

https://github.com/apache/nifi/pull/455

NIFI-1898 fixed @OnSchedule methods to accept ProcessContext



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/olegz/nifi NIFI-1898

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/455.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #455


commit fda6d338dfe1c18b537d30fcd406676293b1a3cc
Author: Oleg Zhurakousky 
Date:   2016-05-19T14:10:21Z

NIFI-1898 fixed @OnSchedule methods to accept ProcessContext




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1745: Refactor how revisions are handled a...

2016-05-19 Thread mcgilman
Github user mcgilman commented on the pull request:

https://github.com/apache/nifi/pull/454#issuecomment-220330762
  
@markap14 When canceling revision claims for Connectable components in the 
verifyXxx() methods, DAO access will not return null so you do not need to 
check for it there. A ResourceNotFoundException will be thrown. Is that 
acceptable given it's usage?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: ListenSMTP processor

2016-05-19 Thread Simon Elliston Ball
Yes, exactly. The challenge would be what to do with the mime boundary header 
(include with content, or dump and rebuild with a merge).

Simon

> On 19 May 2016, at 12:45, Andre  wrote:
> 
> Simon,
> 
> Are you suggesting attributes similar to UnpackContent?
> 
> If yes, seems like a great approach.
> 
> Cheers
> On 19 May 2016 14:50, "Simon Elliston Ball" 
> wrote:
> 
> Fantastic idea!
> 
> Would SplitEmail not make sense to divide by the mime boundary? If you add
> fragment indices in the way other Split processors do, it would be easy to
> recombine an email after processing splits. To be honest, I'm not sure what
> the use case for doing so would be, but it feels consistent with the Split,
> Process, Merge pattern you see elsewhere in NiFi.
> 
> Simon
> 
>> On 19 May 2016, at 03:11, Joe Witt  wrote:
>> 
>> Andre
>> 
>> I like the idea.  I'd suggest having 'ListenSMTP' go ahead and create
>> a good set of FlowFile attributes for things like
>> to/from/cc/subject/number of attachments/time/etc... that make sense
>> for a given e-mail.  The body of the flowfile would be the entire
>> message which i believe would include the attachments themselves which
>> is fair game.  If you did need/want to split out the attachments in
>> your flow then I'd say the 'ParseEmail' idea is good but perhaps call
>> it 'SplitEmail' or 'ExtractEmailAttachment' or something like that.
>> 
>> Thanks
>> Joe
>> 
>>> On Wed, May 18, 2016 at 7:43 PM, Andre F de Miranda 
> wrote:
>>> All,
>>> 
>>> I have been considering writing a "ListenSMTP" processor and was
> wondering
>>> *what is the best way of dealing with multiple attachments*.
>>> 
>>> Looking in here
>>> 
> https://mail-archives.apache.org/mod_mbox/nifi-users/201602.mbox/%3ccaljk9a5ulcitnfo0dlsvd5d-jkcsqm+rqjxuruzwgrdbqad...@mail.gmail.com%3E
>>> 
>>> 
>>> I can read Joe suggesting not using attributes to store large volumes of
>>> data, so far so good, however, as far as I understand a flowfile can only
>>> contain one "content".
>>> 
>>> Currently the way I envision this would be modular that taps into the
>>> pattern set by ListenSyslog / ParseSyslog:
>>> 
>>> ListenSMTP - A processor that only provides an SMTP interface
>>> 
>>> ParseEmail - A processor that reads the flowfile holding the email body
> and
>>> split it into 1 or more flowfiles containing the attached mime objects.
>>> 
>>> The advantage here is that people can use FetchFile or to create a
> GetIMAP
>>> processor to parse messages.
>>> 
>>> Would anyone have a different view on how to achieve this?
>>> 
>>> I thank you in advance



[GitHub] nifi pull request: NIFI-1745: Refactor how revisions are handled a...

2016-05-19 Thread mcgilman
Github user mcgilman commented on a diff in the pull request:

https://github.com/apache/nifi/pull/454#discussion_r63874050
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
 ---
@@ -423,7 +440,7 @@ public boolean requestWriteLock(final Revision 
proposedRevision) throws ExpiredR
 throw ise;
 }
 
-if (stamp.getClientId() == null || 
stamp.getClientId().equals(proposedRevision.getClientId())) {
+if (stamp.getUser() == null || 
stamp.getUser().equals(user)) {
--- End diff --

I think we need to be checking both the user and the client id here. To 
protect against the case when the same user has multiple clients open (like 
multiple browser windows). One client shouldn't be able to assume the 
transaction from another client. Should also have a unit test for this scenario.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...

2016-05-19 Thread mcgilman
Github user mcgilman commented on the pull request:

https://github.com/apache/nifi/pull/452#issuecomment-220307255
  
@alopresto Thanks for the very thorough review! This API is designed 
strictly for handling the persistence of access policies which would also 
include Users and Groups. These objects are designed as simple data objects or 
POJOs. While the MutableAuthorizer can load these records, it must be told when 
to save. It wouldn't know a given User/Group/Policy has been modified. This is 
part of the motivation to have the objects be immutable. The intent is very 
clear to an implementor of MutableAuthorizer that the objects won't be modified 
outside of their knowing. I believe that @bbende recommendation of introducing 
a Builder for creating new versions of a given User/Group/Policy is a great way 
to handle the cloning.

The end goal here was to design the API such that the implementation only 
needed to be concerned with persistence.

**uniqueness constraint** - How we address this would ultimately be based 
on whether we support reloading the Users/Groups/Policies while the application 
is running. We decided against this as the motivation for the MutableAuthorizer 
API was to support an Authorizer that was managed by NiFi. With this approach 
we would be enforcing uniqueness at startup and when new records were added 
outside of the MutableAuthorizer.

**ID mutability** - The identifier of a record is not mutable. The identity 
of a user and name of a group are (using the clone/builder approach described). 
This is to more easily support name chanes or typos without having to re-create 
all the policies for that entity as well.

**locks & merge conflicts** - Thread locking is handled by the web tier 
using the same mechanism as the other Resources. For 1.0.0 we are introducing 
fine grain locking through a RevisionManager. Obtaining a write lock for a 
given record would require a client to include a Revision. The RevisionManager 
manager would validate the Revision and lock the Revision to prevent other 
clients from modifying the same record.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: ListenSMTP processor

2016-05-19 Thread Andre
Simon,

Are you suggesting attributes similar to UnpackContent?

If yes, seems like a great approach.

Cheers
On 19 May 2016 14:50, "Simon Elliston Ball" 
wrote:

Fantastic idea!

Would SplitEmail not make sense to divide by the mime boundary? If you add
fragment indices in the way other Split processors do, it would be easy to
recombine an email after processing splits. To be honest, I'm not sure what
the use case for doing so would be, but it feels consistent with the Split,
Process, Merge pattern you see elsewhere in NiFi.

Simon

> On 19 May 2016, at 03:11, Joe Witt  wrote:
>
> Andre
>
> I like the idea.  I'd suggest having 'ListenSMTP' go ahead and create
> a good set of FlowFile attributes for things like
> to/from/cc/subject/number of attachments/time/etc... that make sense
> for a given e-mail.  The body of the flowfile would be the entire
> message which i believe would include the attachments themselves which
> is fair game.  If you did need/want to split out the attachments in
> your flow then I'd say the 'ParseEmail' idea is good but perhaps call
> it 'SplitEmail' or 'ExtractEmailAttachment' or something like that.
>
> Thanks
> Joe
>
>> On Wed, May 18, 2016 at 7:43 PM, Andre F de Miranda 
wrote:
>> All,
>>
>> I have been considering writing a "ListenSMTP" processor and was
wondering
>> *what is the best way of dealing with multiple attachments*.
>>
>> Looking in here
>>
https://mail-archives.apache.org/mod_mbox/nifi-users/201602.mbox/%3ccaljk9a5ulcitnfo0dlsvd5d-jkcsqm+rqjxuruzwgrdbqad...@mail.gmail.com%3E
>>
>>
>> I can read Joe suggesting not using attributes to store large volumes of
>> data, so far so good, however, as far as I understand a flowfile can only
>> contain one "content".
>>
>> Currently the way I envision this would be modular that taps into the
>> pattern set by ListenSyslog / ParseSyslog:
>>
>> ListenSMTP - A processor that only provides an SMTP interface
>>
>> ParseEmail - A processor that reads the flowfile holding the email body
and
>> split it into 1 or more flowfiles containing the attached mime objects.
>>
>> The advantage here is that people can use FetchFile or to create a
GetIMAP
>> processor to parse messages.
>>
>> Would anyone have a different view on how to achieve this?
>>
>> I thank you in advance


Re: ListenSMTP processor

2016-05-19 Thread Andre
Joe,

That's exactly the idea.

I envision to, from, cc, connecting host (src_ip of the last hop), subject,
time and possibly an option to iterate over the headers,  adding
discretionary key value pairs for things like spamassassin scores, etc.

I pkan to keep things simple so I don't intend to add things like SPF,
DKIM, etc but keen to consider.

Happy to call it ExtractMailAttachment. I considered this type of more
explicit name previously but settled for parse just because syslog adopted
parse as well(although ListenSyslog is also capable of parsing).

Will raise a JIRA to track.

Cheers
On 19 May 2016 12:12, "Joe Witt"  wrote:

> Andre
>
> I like the idea.  I'd suggest having 'ListenSMTP' go ahead and create
> a good set of FlowFile attributes for things like
> to/from/cc/subject/number of attachments/time/etc... that make sense
> for a given e-mail.  The body of the flowfile would be the entire
> message which i believe would include the attachments themselves which
> is fair game.  If you did need/want to split out the attachments in
> your flow then I'd say the 'ParseEmail' idea is good but perhaps call
> it 'SplitEmail' or 'ExtractEmailAttachment' or something like that.
>
> Thanks
> Joe
>
> On Wed, May 18, 2016 at 7:43 PM, Andre F de Miranda 
> wrote:
> > All,
> >
> > I have been considering writing a "ListenSMTP" processor and was
> wondering
> > *what is the best way of dealing with multiple attachments*.
> >
> > Looking in here
> >
> https://mail-archives.apache.org/mod_mbox/nifi-users/201602.mbox/%3ccaljk9a5ulcitnfo0dlsvd5d-jkcsqm+rqjxuruzwgrdbqad...@mail.gmail.com%3E
> >
> >
> > I can read Joe suggesting not using attributes to store large volumes of
> > data, so far so good, however, as far as I understand a flowfile can only
> > contain one "content".
> >
> > Currently the way I envision this would be modular that taps into the
> > pattern set by ListenSyslog / ParseSyslog:
> >
> > ListenSMTP - A processor that only provides an SMTP interface
> >
> > ParseEmail - A processor that reads the flowfile holding the email body
> and
> > split it into 1 or more flowfiles containing the attached mime objects.
> >
> > The advantage here is that people can use FetchFile or to create a
> GetIMAP
> > processor to parse messages.
> >
> > Would anyone have a different view on how to achieve this?
> >
> > I thank you in advance
>


[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

2016-05-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/366


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---