[GitHub] nifi pull request: NIFI-856 Implements experimental ListenLumberja...

2016-04-22 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/290#issuecomment-21320
  
@trixpan thanks for the info.  My reviewing energy is running low for the 
night but will look to tackle this in the next few days.  Thanks again!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1747 add maven-war-plugin to create jar as...

2016-04-22 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/340#issuecomment-213666528
  
Hey @jdye64!  This certainly seems to make sense, but not sure I am a fan 
of the classifiers route.  Would specifying a separate execution for the build, 
such as:

``` xml

org.apache.maven.plugins
maven-jar-plugin


create-jar
compile

jar




```

This would be easier to rely on from a dependency standpoint in lieu of 
having to do additionally specify a classifier in the pom descriptor.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-856 Implements experimental ListenLumberja...

2016-04-22 Thread trixpan
Github user trixpan commented on the pull request:

https://github.com/apache/nifi/pull/290#issuecomment-213663400
  
Yes.  Please note the code will work with minor modifications with filebeat 
but because of lumberjack v2 protocol still in state of flux 
(https://github.com/elastic/libbeat/issues/279) I intentionally left it out. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1214

2016-04-22 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/321#issuecomment-213663078
  
Hey @ToivoAdams!  Sorry for the delays on reviewing.  It looks like this 
was originally done on what is now 0.x and has many of the commits from what is 
now master incorporated.  Would it be possible to please get your 
commits/branch rebased against master?  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-856 Implements experimental ListenLumberja...

2016-04-22 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/290#issuecomment-213662697
  
@trixpan Hey, thanks for the contribution, sorry we've been a little slow 
getting around to it.  In terms of testing this out, is it just a matter of 
using logstash forwarder (https://github.com/elastic/logstash-forwarder) which 
is built upon the lumberjack protocol 
(https://github.com/elastic/logstash-forwarder/blob/master/PROTOCOL.md)?  
Thanks again and will start digging in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[NOTICE] jira lockdown

2016-04-22 Thread Sean Busbey
Hi folks!

Just a quick heads-up that the ASF JIRA is currently locked down to
counter a spam attack. Unfortunately, this lock down prevents our
normal open-policy that allows anyone with a JIRA account to create,
assign, and comment on issues.

If you are caught up in this, please drop me a note either on or off
list with your JIRA user name and I'll get you added to a formal JIRA
role so that you can interact with the NiFi project.

-Sean


[GitHub] nifi pull request: NIFI-1594: Add option to bulk using Index or Up...

2016-04-22 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/255#issuecomment-213661619
  
Hey @joaohf!  Looks like @mattyb149 has already started giving some input 
but saw that there are a few test failures concerning expression language such 
as the one below (full sample log is available at: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/114414795/log.txt).  
Haven't dug deep into the tests but wanted to call it out to your attention to 
get a fix for later incorporation.

Thanks!

```

testPutElasticSearchOnTrigger(org.apache.nifi.processors.elasticsearch.TestPutElasticsearch)
  Time elapsed: 0.01 sec  <<< FAILURE!
java.lang.AssertionError: java.lang.IllegalStateException: Attempting to 
retrieve value of PropertyDescriptor[Index Operation] without first evaluating 
Expressions, even though the PropertyDescriptor indicates that the Expression 
Language is Supported. If you realize that this is the case and do not want 
this error to occur, it can be disabled by calling 
TestRunner.setValidateExpressionUsage(false)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/238#issuecomment-213660402
  
Overall, seems to function well with some minor adjustments needed.  Would 
like to hash out some of the items with managing state.  Thanks for tackling 
this, we are certainly overdue for such functionality and I know many will be 
pleased to see this coming out in the next release. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/238#discussion_r60822522
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Amazon", "S3", "AWS", "list"})
+@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. 
For each object that is listed, creates a FlowFile that represents \"\n" +
+"+ \"the object so that it can be fetched in conjunction 
with FetchS3Object. This Processor is designed to run on Primary Node only 
\"\n" +
+"+ \"in a cluster. If the primary node changes, the new 
Primary Node will pick up where the previous node left off without duplicating 
\"\n" +
+"+ \"all of the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a 
listing of keys, the timestamp of the newest key is stored, "
++ "along with the keys that share that same timestamp. This allows 
the Processor to list only keys that have been added or modified after "
++ "this date the next time that the Processor is run. State is 
stored across the cluster so that this Processor can be run on Primary Node 
only and if a new Primary "
++ "Node is selected, the new node can pick up where the previous 
node left off, without duplicating the data.")
+@WritesAttributes({
+@WritesAttribute(attribute = "s3.bucket", description = "The name 
of the S3 bucket"),
+@WritesAttribute(attribute = "filename", description = "The name 
of the file"),
+@WritesAttribute(attribute = "s3.etag", description = "The ETag 
that can be used to see if the file has changed"),
+@WritesAttribute(attribute = "s3.lastModified", description = "The 
last modified time in milliseconds since epoch in UTC time"),
+@WritesAttribute(attribute = "s3.length", description = "The size 
of the object in bytes"),
+@WritesAttribute(attribute = "s3.storeClass", description = "The 
storage class of the object"),})
+@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
+public class ListS3 extends 

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/238#discussion_r60822467
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 ---
@@ -46,7 +46,7 @@
 import com.amazonaws.services.s3.model.S3Object;
 
 @SupportsBatching
--- End diff --

Should also be a @TriggerSerially given the state you have below and the 
inconsistencies that could be generated in multithreaded scenarios.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/238#discussion_r60822400
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Amazon", "S3", "AWS", "list"})
+@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. 
For each object that is listed, creates a FlowFile that represents \"\n" +
+"+ \"the object so that it can be fetched in conjunction 
with FetchS3Object. This Processor is designed to run on Primary Node only 
\"\n" +
+"+ \"in a cluster. If the primary node changes, the new 
Primary Node will pick up where the previous node left off without duplicating 
\"\n" +
+"+ \"all of the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a 
listing of keys, the timestamp of the newest key is stored, "
++ "along with the keys that share that same timestamp. This allows 
the Processor to list only keys that have been added or modified after "
++ "this date the next time that the Processor is run. State is 
stored across the cluster so that this Processor can be run on Primary Node 
only and if a new Primary "
++ "Node is selected, the new node can pick up where the previous 
node left off, without duplicating the data.")
+@WritesAttributes({
+@WritesAttribute(attribute = "s3.bucket", description = "The name 
of the S3 bucket"),
+@WritesAttribute(attribute = "filename", description = "The name 
of the file"),
+@WritesAttribute(attribute = "s3.etag", description = "The ETag 
that can be used to see if the file has changed"),
+@WritesAttribute(attribute = "s3.lastModified", description = "The 
last modified time in milliseconds since epoch in UTC time"),
+@WritesAttribute(attribute = "s3.length", description = "The size 
of the object in bytes"),
+@WritesAttribute(attribute = "s3.storeClass", description = "The 
storage class of the object"),})
+@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
+public class ListS3 extends 

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/238#discussion_r60822342
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Amazon", "S3", "AWS", "list"})
+@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. 
For each object that is listed, creates a FlowFile that represents \"\n" +
+"+ \"the object so that it can be fetched in conjunction 
with FetchS3Object. This Processor is designed to run on Primary Node only 
\"\n" +
+"+ \"in a cluster. If the primary node changes, the new 
Primary Node will pick up where the previous node left off without duplicating 
\"\n" +
+"+ \"all of the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a 
listing of keys, the timestamp of the newest key is stored, "
++ "along with the keys that share that same timestamp. This allows 
the Processor to list only keys that have been added or modified after "
++ "this date the next time that the Processor is run. State is 
stored across the cluster so that this Processor can be run on Primary Node 
only and if a new Primary "
++ "Node is selected, the new node can pick up where the previous 
node left off, without duplicating the data.")
+@WritesAttributes({
+@WritesAttribute(attribute = "s3.bucket", description = "The name 
of the S3 bucket"),
+@WritesAttribute(attribute = "filename", description = "The name 
of the file"),
+@WritesAttribute(attribute = "s3.etag", description = "The ETag 
that can be used to see if the file has changed"),
+@WritesAttribute(attribute = "s3.lastModified", description = "The 
last modified time in milliseconds since epoch in UTC time"),
+@WritesAttribute(attribute = "s3.length", description = "The size 
of the object in bytes"),
+@WritesAttribute(attribute = "s3.storeClass", description = "The 
storage class of the object"),})
+@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
+public class ListS3 extends 

Re: Enumerating processors, queues, etc.

2016-04-22 Thread Joe Witt
Ok cool.  From the the 'gathering of the raw materials of knowledge
perspective' - If the things you want internal access to are already
exposed via the REST API then we're definitely safe on the threading
side in so far as the things I was worried about when considering
internal access.

If we focus on the examples you mention in both 3 and 4 I believe
these are examples which would be fulfilled by interacting through the
REST API.  For item 3 while there may be some server side backing
configuration values and status data the client side will be the thing
rendering/displaying colors.  For item 4 which is about interacting
with the application (start/stop/etc...) these are things which would
need to occur through the REST API because that is the cluster-wide
coordination mechanism/layer.

How it is designed/intended today would be that server side functions
(controller services or reporting tasks) could be used to
set/establish server side knowledge of behavior and the client would
get access to this at runtime and behave as it needs given this
knowledge.  If the client is a visual UI then it could, for instance,
render things differently.  If the client is something automated
invoking the REST API endpoints then it could do things like 'start'
or 'stop' or alter the flow.

The ideas you're presenting which make for a more engaging user
experience definitely make sense to me and we definitely should do
more to make these happen.  I'm just pointing out that they sound less
like Controller Service or Reporting Task type things and more like
data we should expose via the REST API.  This would allow clients be
they services or browsers to take whatever action they might want to.

Thanks
Joe

On Fri, Apr 22, 2016 at 10:31 PM, Joe Skora  wrote:
> Joe W,
>
> The use case that originally got me thinking about this was a processor
> highlighter that looks for processors above or below configured thresholds,
> possibly by instance or type.  I think that requires the ability to
>
>1. enumerate the processor and/or queue collections,
>2. query each processor or queue for stats like those shown in the UI,
>3. highlight the processor some how, by changing color for instance, and
>4. (possibly) affect the processor by stopping / starting it.
>
> I realize this exposes system internal and threading concerns, but the REST
> API already provides the information externally.  Any controller service
> using this information must be designed to not have a negative impact on
> the system, but that's already true of any custom processor or controller
> service since they can overload or lock up the framework if they behave
> badly.
>
> Overall, I think the visibility into the data flows, hot/cold spots, larger
> than expected ingests, etc. provides value that would far outweigh concerns
> about any new risk this capability would create.
>
> Thanks,
> Joe S
>
> On Fri, Apr 22, 2016 at 2:25 PM, Joe Witt  wrote:
>
>> Yeah understood.  So let's dig into this more.  We need to avoid over
>> exposure of internal state which one might want to crawl through
>> because that introduces some multi-threaded challenges and could limit
>> our ability to evolve internals.  However, if we understand the
>> questions you'd like to be able to ask of certain things better
>> perhaps we can better expose those results.
>>
>> Can you try stating what you're looking for in a bit more specific
>> examples.  For instance you said "want to iterate over the processor
>> collections...to look for performance thresholds" - What sorts of
>> performance threshold questions?
>>
>> On Fri, Apr 22, 2016 at 2:20 PM, Joe Skora  wrote:
>> > Joe Witt - Not really, this kind of went sideways from where I was
>> > originally headed.
>> >
>> > I'm looking for a way for a controller service to iterate over the
>> > processor and queue collections (maybe others as well) to look for
>> > performance thresholds or other issues and then provide feedback somehow
>> or
>> > report externally.
>> >
>> > If it can be done through the REST API, seems like it should be possible
>> > from within the framework as well.
>> >
>> > On Fri, Apr 22, 2016 at 1:32 PM, Joe Witt  wrote:
>> >
>> >> Joe Skora - does Jeremy's JIRA cover your use case needs?
>> >>
>> >> On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer  wrote:
>> >> > Mark,
>> >> >
>> >> > ok that makes sense. I have created a jira for this improvement
>> >> > https://issues.apache.org/jira/browse/NIFI-1805
>> >> >
>> >> > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne 
>> >> wrote:
>> >> >
>> >> >> Jeremy,
>> >> >>
>> >> >> It should be relatively easy. In FlowController, we would have to
>> update
>> >> >> getGroupStatus() to set the values on ConnectionStatus
>> >> >> and of course update ConnectionStatus to have getters & setters for
>> the
>> >> >> new values. That should be about it, I think.
>> 

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/238#discussion_r60822275
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Amazon", "S3", "AWS", "list"})
+@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. 
For each object that is listed, creates a FlowFile that represents \"\n" +
+"+ \"the object so that it can be fetched in conjunction 
with FetchS3Object. This Processor is designed to run on Primary Node only 
\"\n" +
+"+ \"in a cluster. If the primary node changes, the new 
Primary Node will pick up where the previous node left off without duplicating 
\"\n" +
+"+ \"all of the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a 
listing of keys, the timestamp of the newest key is stored, "
++ "along with the keys that share that same timestamp. This allows 
the Processor to list only keys that have been added or modified after "
++ "this date the next time that the Processor is run. State is 
stored across the cluster so that this Processor can be run on Primary Node 
only and if a new Primary "
++ "Node is selected, the new node can pick up where the previous 
node left off, without duplicating the data.")
+@WritesAttributes({
+@WritesAttribute(attribute = "s3.bucket", description = "The name 
of the S3 bucket"),
+@WritesAttribute(attribute = "filename", description = "The name 
of the file"),
+@WritesAttribute(attribute = "s3.etag", description = "The ETag 
that can be used to see if the file has changed"),
+@WritesAttribute(attribute = "s3.lastModified", description = "The 
last modified time in milliseconds since epoch in UTC time"),
+@WritesAttribute(attribute = "s3.length", description = "The size 
of the object in bytes"),
+@WritesAttribute(attribute = "s3.storeClass", description = "The 
storage class of the object"),})
+@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
+public class ListS3 extends 

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/238#discussion_r60822174
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Amazon", "S3", "AWS", "list"})
+@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. 
For each object that is listed, creates a FlowFile that represents \"\n" +
--- End diff --

There are some extraneous \s and +s that were folded into your description 
presumably from the IDE auto escaping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Joe Skora
Joe W,

The use case that originally got me thinking about this was a processor
highlighter that looks for processors above or below configured thresholds,
possibly by instance or type.  I think that requires the ability to

   1. enumerate the processor and/or queue collections,
   2. query each processor or queue for stats like those shown in the UI,
   3. highlight the processor some how, by changing color for instance, and
   4. (possibly) affect the processor by stopping / starting it.

I realize this exposes system internal and threading concerns, but the REST
API already provides the information externally.  Any controller service
using this information must be designed to not have a negative impact on
the system, but that's already true of any custom processor or controller
service since they can overload or lock up the framework if they behave
badly.

Overall, I think the visibility into the data flows, hot/cold spots, larger
than expected ingests, etc. provides value that would far outweigh concerns
about any new risk this capability would create.

Thanks,
Joe S

On Fri, Apr 22, 2016 at 2:25 PM, Joe Witt  wrote:

> Yeah understood.  So let's dig into this more.  We need to avoid over
> exposure of internal state which one might want to crawl through
> because that introduces some multi-threaded challenges and could limit
> our ability to evolve internals.  However, if we understand the
> questions you'd like to be able to ask of certain things better
> perhaps we can better expose those results.
>
> Can you try stating what you're looking for in a bit more specific
> examples.  For instance you said "want to iterate over the processor
> collections...to look for performance thresholds" - What sorts of
> performance threshold questions?
>
> On Fri, Apr 22, 2016 at 2:20 PM, Joe Skora  wrote:
> > Joe Witt - Not really, this kind of went sideways from where I was
> > originally headed.
> >
> > I'm looking for a way for a controller service to iterate over the
> > processor and queue collections (maybe others as well) to look for
> > performance thresholds or other issues and then provide feedback somehow
> or
> > report externally.
> >
> > If it can be done through the REST API, seems like it should be possible
> > from within the framework as well.
> >
> > On Fri, Apr 22, 2016 at 1:32 PM, Joe Witt  wrote:
> >
> >> Joe Skora - does Jeremy's JIRA cover your use case needs?
> >>
> >> On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer  wrote:
> >> > Mark,
> >> >
> >> > ok that makes sense. I have created a jira for this improvement
> >> > https://issues.apache.org/jira/browse/NIFI-1805
> >> >
> >> > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne 
> >> wrote:
> >> >
> >> >> Jeremy,
> >> >>
> >> >> It should be relatively easy. In FlowController, we would have to
> update
> >> >> getGroupStatus() to set the values on ConnectionStatus
> >> >> and of course update ConnectionStatus to have getters & setters for
> the
> >> >> new values. That should be about it, I think.
> >> >>
> >> >> -Mark
> >> >>
> >> >>
> >> >> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer 
> wrote:
> >> >> >
> >> >> > Mark,
> >> >> >
> >> >> > What would the process look like for doing that? Would that be
> >> something
> >> >> > trivial or require some reworking?
> >> >> >
> >> >> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne  >
> >> >> wrote:
> >> >> >
> >> >> >> I definitely don't think we should be exposing the FlowController
> to
> >> a
> >> >> >> Reporting Task.
> >> >> >> However, I think exposing information about whether or not
> >> backpressure
> >> >> is
> >> >> >> being applied
> >> >> >> (or even is configured) is a very reasonable idea.
> >> >> >>
> >> >> >> -Mark
> >> >> >>
> >> >> >>
> >> >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer 
> wrote:
> >> >> >>>
> >> >> >>> I could see the argument for not making that available. What
> about
> >> some
> >> >> >>> sort of reference that would allow the ReportingTask to to
> >> determine if
> >> >> >>> backpressure is being applied to a Connection? It currently seems
> >> you
> >> >> can
> >> >> >>> see the number of bytes and/or objects count queued in a
> connection
> >> but
> >> >> >>> don't have any reference to the values a user has setup for
> >> >> backpressure
> >> >> >> in
> >> >> >>> the UI. Is there a way to get those values in the scope of the
> >> >> >>> ReportingTask?
> >> >> >>>
> >> >> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende 
> >> >> wrote:
> >> >> >>>
> >> >>  I think the only way you could do it directly without the REST
> API
> >> is
> >> >> by
> >> >>  having access to the FlowController,
> >> >>  but that is purposely not exposed to extension points...
> actually
> >> >>  StandardFlowController is what implements the
> >> >>  EventAccess interface which ends up 

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

2016-04-22 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/238#issuecomment-213652173
  
reviewing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: added voltatile provenance repository docstring

2016-04-22 Thread randerzander
GitHub user randerzander opened a pull request:

https://github.com/apache/nifi/pull/378

added voltatile provenance repository docstring



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/randerzander/nifi master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/378.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #378


commit d68956320af1d3c905bcdc7ea7e2fe45bf88b956
Author: Randy Gelhausen 
Date:   2016-04-22T21:41:29Z

added voltatile provenance repository docstring




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Enhancement for NIFI-1045 : Add "backup suffix"...

2016-04-22 Thread PuspenduBanerjee
Github user PuspenduBanerjee closed the pull request at:

https://github.com/apache/nifi/pull/230


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NiFi-924: Camel integration

2016-04-22 Thread PuspenduBanerjee
Github user PuspenduBanerjee commented on the pull request:

https://github.com/apache/nifi/pull/219#issuecomment-213595825
  
@joewitt  @olegz Thank you all and sorry for the delay in reply, was dead 
busy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NiFi-924: Camel integration

2016-04-22 Thread PuspenduBanerjee
Github user PuspenduBanerjee closed the pull request at:

https://github.com/apache/nifi/pull/219


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

2016-04-22 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/366#discussion_r60797930
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.MockSessionFactory;
+import org.apache.nifi.util.SharedSessionState;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class AbstractKafkaProcessorLifecycelTest {
+
+@Test
+public void validateBaseProperties() throws Exception {
+TestRunner runner = 
TestRunners.newTestRunner(DummyProcessor.class);
+runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
+runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
+runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+
+try {
+runner.assertValid();
+fail();
+} catch (AssertionError e) {
+assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+}
+
+runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
+try {
+runner.assertValid();
+fail();
+} catch (AssertionError e) {
+assertTrue(e.getMessage().contains("'bootstrap.servers' 
validated against 'foo' is invalid"));
+}
+runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
+
+runner.removeProperty(ConsumeKafka.TOPIC);
+try {
+runner.assertValid();
+fail();
+} catch (AssertionError e) {
+assertTrue(e.getMessage().contains("'topic' is invalid because 
topic is required"));
+}
+
+runner.setProperty(ConsumeKafka.TOPIC, "");
+try {
+runner.assertValid();
+fail();
+} catch (AssertionError e) {
+assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+}
+
+runner.setProperty(ConsumeKafka.TOPIC, "  ");
   

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

2016-04-22 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/366#discussion_r60792649
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 ---
@@ -78,152 +81,182 @@
 }
 
 /**
- *
- */
-void setProcessLog(ProcessorLog processLog) {
-this.processLog = processLog;
-}
-
-/**
- * Publishes messages to Kafka topic. It supports three publishing
- * mechanisms.
+ * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} 
to
+ * determine how many messages to Kafka will be sent from a provided
+ * {@link InputStream} (see {@link 
PublishingContext#getContentStream()}).
+ * It supports two publishing modes:
  * 
- * Sending the entire content stream as a single Kafka 
message.
- * Splitting the incoming content stream into chunks and sending
- * individual chunks as separate Kafka messages.
- * Splitting the incoming content stream into chunks and sending 
only
- * the chunks that have failed previously @see
- * {@link SplittableMessageContext#getFailedSegments()}.
+ * Sending all messages constructed from
+ * {@link StreamDemarcator#nextToken()} operation.
+ * Sending only unacknowledged messages constructed from
+ * {@link StreamDemarcator#nextToken()} operation.
  * 
+ * The unacknowledged messages are determined from the value of
+ * {@link PublishingContext#getLastAckedMessageIndex()}.
+ * 
  * This method assumes content stream affinity where it is expected 
that the
  * content stream that represents the same Kafka message(s) will 
remain the
  * same across possible retries. This is required specifically for 
cases
  * where delimiter is used and a single content stream may represent
- * multiple Kafka messages. The failed segment list will keep the 
index of
- * of each content stream segment that had failed to be sent to Kafka, 
so
- * upon retry only the failed segments are sent.
+ * multiple Kafka messages. The
+ * {@link PublishingContext#getLastAckedMessageIndex()} will provide 
the
+ * index of the last ACKed message, so upon retry only messages with 
the
+ * higher index are sent.
  *
- * @param messageContext
- *instance of {@link SplittableMessageContext} which hold
- *context information about the message to be sent
- * @param contentStream
- *instance of open {@link InputStream} carrying the 
content of
- *the message(s) to be send to Kafka
- * @param partitionKey
- *the value of the partition key. Only relevant is user 
wishes
- *to provide a custom partition key instead of relying on
- *variety of provided {@link Partitioner}(s)
- * @param maxBufferSize maximum message size
- * @return The set containing the failed segment indexes for messages 
that
- * failed to be sent to Kafka.
+ * @param publishingContext
+ *instance of {@link PublishingContext} which hold context
+ *information about the message(s) to be sent.
+ * @return The index of the last successful offset.
  */
-BitSet publish(SplittableMessageContext messageContext, InputStream 
contentStream, Integer partitionKey,
-int maxBufferSize) {
-List sendFutures = 
this.split(messageContext, contentStream, partitionKey, maxBufferSize);
-return this.publish(sendFutures);
+KafkaPublisherResult publish(PublishingContext publishingContext) {
+StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
+publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
+
+int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
+List resultFutures = new ArrayList<>();
+
+byte[] messageBytes;
+int tokenCounter = 0;
+for (; (messageBytes = streamTokenizer.nextToken()) != null; 
tokenCounter++) {
+if (prevLastAckedMessageIndex < tokenCounter) {
+Integer partitionId = publishingContext.getPartitionId();
+if (partitionId == null && publishingContext.getKeyBytes() 
!= null) {
+partitionId = 
this.getPartition(publishingContext.getKeyBytes(), 
publishingContext.getTopic());
+}
+ProducerRecord message =
+new 

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/323


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread mcgilman
Github user mcgilman commented on the pull request:

https://github.com/apache/nifi/pull/323#issuecomment-213553667
  
Functionally this is much better than before. Now when running clustered 
and we start losing nodes we are able to see the current cluster participation. 
However, once we have lost the quorum we are no longer able to update the 
Primary Node. This behavior is expected, however. Once the quorum is 
re-established the Primary is updated accordingly. As we move towards zero 
master clustering, when there is no quorum we will return appropriate response 
codes. 

Looks good. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] From Contributor to Committer

2016-04-22 Thread Joe Witt
Agreed.  Unless there is any objection I plan to assume lazy consensus
and remove the draft marking tomorrow or shortly thereafter.

On Fri, Apr 22, 2016 at 2:53 PM, Sean Busbey  wrote:
> FWIW, I don't read the current draft as requiring that someone
> strictly progress contributor -> committer -> PMC. I do read it as
> stating that the evaluation of project activity should normally be
> over some period of time for committers and a longer period of time
> for PMC status (e.g. maybe we miss someone who's behaving like a
> committer over a normal 3 month period, but we catch it at 6. At that
> point they've been around as long as a PMC so we skip the committer
> step).
>
> Of course, I might be biased by the fact that I skipped directly to
> PMC status. :)
>
> On Wed, Apr 20, 2016 at 5:23 PM, Joe Witt  wrote:
>> Bryan,
>>
>> That is a good point and I do think we should avoid putting such a
>> rule/requirement in place.  However, practically speaking I would
>> imagine that is how it will play out most often.
>>
>> I've taken the comments of this thread largely based on Tony's
>> excellent start and created a draft wiki page for it here [1].  If
>> discussion remains on track then I'll assume lazy consensus and remove
>> the draft notice.
>>
>> [1] 
>> https://cwiki.apache.org/confluence/display/NIFI/Progression+from+user+to+Project+Management+Committee
>>
>> Thanks
>> Joe
>>
>> On Wed, Apr 20, 2016 at 12:05 PM, Bryan Bende  wrote:
>>> So are we saying as a community that a contributor has to first become a
>>> committer, and then only after continued consistent engagement could then
>>> be considered for PMC?
>>>
>>> I don't have any issue with that approach, although it is not exactly what
>>> I thought when we first created the two tiers.
>>>
>>> On Wed, Apr 20, 2016 at 11:10 AM, Joe Witt  wrote:
>>>
 Tony,

 There appears to be consensus around these thoughts.  Perhaps we
 should document this on a Wiki page?

 I think generally for committer status it would be good to see a
 number of these things for a period of time and then for PMC status to
 see those contributions continue and ideally expand for a longer
 duration.  Another few months?

 Thanks
 Joe

 On Wed, Apr 13, 2016 at 2:58 PM, Joe Witt  wrote:
 > Tony,
 >
 > I agree with the points you raise and the completeness of the
 > perspective you share. I do think we should add to that a focus on
 > licensing and legal aspects.
 >
 > - The individual has shown an effort to aid the community in producing
 > release which are consistent with ASF licensing requirements and the
 > guidance followed in the Apache NiFi community to adhere to those
 > policies.  This understanding could be shown when introducing new
 > dependencies (including transitive) by ensuring that all licensing and
 > notice updates have occurred.  Another good example is flagging
 > potentially copyrighted or insufficiently cited items like Skora found
 > recently in the Kafka tests.  One of our most important jobs as a
 > community is to put out legal releases and that is certainly a team
 > effort!
 >
 > Thanks
 > Joe
 >
 > On Sun, Apr 10, 2016 at 10:56 PM, Sean Busbey  wrote:
 >> Thanks for starting this Tony!
 >>
 >> As a PMC member, I really try to focus on things that help the
 >> community where we tend to have limited bandwidth: reviews weigh
 >> heavily, as does helping out new folks on users@, and doing public
 >> talking/workshops.
 >>
 >> I also am inclined to vote in favor of folks who show the kind of
 >> project dedication that we expect from PMC members. While we still
 >> need to do a better job of describing those things, at the moment I'm
 >> thinking of things like voting on release candidates, watching out for
 >> our trademarks, and investing the time needed to handle our licensing
 >> responsibilities.
 >>
 >> On Sun, Apr 10, 2016 at 9:38 AM, Tony Kurc  wrote:
 >>> First off, I recommend this reading this page to understand what the
 Apache
 >>> NiFi PMC draws from when making a decision
 >>>
 >>> http://community.apache.org/contributors/index.html
 >>>
 >>> I thought it would be helpful for me to walk through how I interpret
 that
 >>> guidance, and what that means for NiFi. For those that didn't read,
 there
 >>> are four aspects of contribution that are worth considering someone for
 >>> committership: community, project, documentation and code. Really, the
 >>> committer decision comes down to: has this person built up enough
 merit in
 >>> the community that I have a high degree of confidence that I trust
 him/her
 >>> with write access to the code and 

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60782947
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebClusterManagerCoordinator implements ClusterCoordinator {
+private static final Logger logger = 
LoggerFactory.getLogger(WebClusterManagerCoordinator.class);
+
+private final WebClusterManager clusterManager;
+
+public WebClusterManagerCoordinator(final WebClusterManager 
clusterManager) {
+this.clusterManager = clusterManager;
+}
+
+@Override
+public void requestNodeConnect(final NodeIdentifier nodeId) {
+final Node node = clusterManager.getRawNode(nodeId.getId());
+
+if (node == null) {
+final ConnectionRequest connectionRequest = new 
ConnectionRequest(nodeId);
+clusterManager.requestConnection(connectionRequest);
+} else {
+node.setStatus(Status.DISCONNECTED);
+clusterManager.requestReconnection(nodeId.getId(), 
"Anonymous");
+}
+}
+
+@Override
+public void finishNodeConnection(final NodeIdentifier nodeId) {
+final Node node = clusterManager.getRawNode(nodeId.getId());
+if (node == null) {
+logger.error("Attempting to Finish Node Connection but could 
not find Node with Identifier {}", nodeId);
+return;
+}
+
+node.setStatus(Status.CONNECTED);
+}
+
+@Override
+public void requestNodeDisconnect(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode, final String explanation) {
+try {
+clusterManager.requestDisconnection(nodeId, false, 
explanation);
+
+if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) {
+final Node node = 
clusterManager.getRawNode(nodeId.getId());
+if (node != null) {
+node.setHeartbeatDisconnection();
+}
+}
+} catch (final Exception e) {
+logger.error("Failed to request node {} disconnect from 
cluster due to {}", nodeId, e);
+logger.error("", e);
--- End diff --

The first line logs a toString() of the Exception, whereas the second line 
logs the stack trace. Personally, I find the log messages easier to read this 
way when someone copies & pastes it, as they often do not look to grab a stack 
trace but if the toString() of the Exception is part of the original message, 
it makes it more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60782492
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class CuratorHeartbeatMonitor implements HeartbeatMonitor {
+private static final Logger logger = 
LoggerFactory.getLogger(CuratorHeartbeatMonitor.class);
+private static final Unmarshaller unmarshaller;
+
+private final ClusterCoordinator clusterCoordinator;
+private final ZooKeeperClientConfig zkClientConfig;
+private final String heartbeatPath;
+private final int heartbeatIntervalMillis;
+
+private volatile CuratorFramework curatorClient;
+private volatile ScheduledFuture future;
+private volatile Map 
latestHeartbeatMessages;
+private volatile long latestHeartbeatTime;
+
+private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
+
+static {
+try {
+final JAXBContext jaxbContext = 
JAXBContext.newInstance(HeartbeatMessage.class);
+unmarshaller = jaxbContext.createUnmarshaller();
+} catch (final Exception e) {
+throw new RuntimeException("Failed to create an Unmarshaller 
for unmarshalling Heartbeat Messages", e);
+}
+}
+
+public CuratorHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final Properties properties) {
+this.clusterCoordinator = clusterCoordinator;
+this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(properties);
+this.heartbeatPath = 
zkClientConfig.resolvePath("cluster/heartbeats");
+
+final String heartbeatInterval = 
properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+
+this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void start() {
+final RetryPolicy retryPolicy = new RetryForever(5000);
+curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+  

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60782403
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class CuratorHeartbeatMonitor implements HeartbeatMonitor {
+private static final Logger logger = 
LoggerFactory.getLogger(CuratorHeartbeatMonitor.class);
+private static final Unmarshaller unmarshaller;
+
+private final ClusterCoordinator clusterCoordinator;
+private final ZooKeeperClientConfig zkClientConfig;
+private final String heartbeatPath;
+private final int heartbeatIntervalMillis;
+
+private volatile CuratorFramework curatorClient;
+private volatile ScheduledFuture future;
+private volatile Map 
latestHeartbeatMessages;
+private volatile long latestHeartbeatTime;
+
+private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
+
+static {
+try {
+final JAXBContext jaxbContext = 
JAXBContext.newInstance(HeartbeatMessage.class);
+unmarshaller = jaxbContext.createUnmarshaller();
+} catch (final Exception e) {
+throw new RuntimeException("Failed to create an Unmarshaller 
for unmarshalling Heartbeat Messages", e);
+}
+}
+
+public CuratorHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final Properties properties) {
+this.clusterCoordinator = clusterCoordinator;
+this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(properties);
+this.heartbeatPath = 
zkClientConfig.resolvePath("cluster/heartbeats");
+
+final String heartbeatInterval = 
properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+
+this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void start() {
+final RetryPolicy retryPolicy = new RetryForever(5000);
+curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+  

Re: Enumerating processors, queues, etc.

2016-04-22 Thread Joe Witt
Yeah understood.  So let's dig into this more.  We need to avoid over
exposure of internal state which one might want to crawl through
because that introduces some multi-threaded challenges and could limit
our ability to evolve internals.  However, if we understand the
questions you'd like to be able to ask of certain things better
perhaps we can better expose those results.

Can you try stating what you're looking for in a bit more specific
examples.  For instance you said "want to iterate over the processor
collections...to look for performance thresholds" - What sorts of
performance threshold questions?

On Fri, Apr 22, 2016 at 2:20 PM, Joe Skora  wrote:
> Joe Witt - Not really, this kind of went sideways from where I was
> originally headed.
>
> I'm looking for a way for a controller service to iterate over the
> processor and queue collections (maybe others as well) to look for
> performance thresholds or other issues and then provide feedback somehow or
> report externally.
>
> If it can be done through the REST API, seems like it should be possible
> from within the framework as well.
>
> On Fri, Apr 22, 2016 at 1:32 PM, Joe Witt  wrote:
>
>> Joe Skora - does Jeremy's JIRA cover your use case needs?
>>
>> On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer  wrote:
>> > Mark,
>> >
>> > ok that makes sense. I have created a jira for this improvement
>> > https://issues.apache.org/jira/browse/NIFI-1805
>> >
>> > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne 
>> wrote:
>> >
>> >> Jeremy,
>> >>
>> >> It should be relatively easy. In FlowController, we would have to update
>> >> getGroupStatus() to set the values on ConnectionStatus
>> >> and of course update ConnectionStatus to have getters & setters for the
>> >> new values. That should be about it, I think.
>> >>
>> >> -Mark
>> >>
>> >>
>> >> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer  wrote:
>> >> >
>> >> > Mark,
>> >> >
>> >> > What would the process look like for doing that? Would that be
>> something
>> >> > trivial or require some reworking?
>> >> >
>> >> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne 
>> >> wrote:
>> >> >
>> >> >> I definitely don't think we should be exposing the FlowController to
>> a
>> >> >> Reporting Task.
>> >> >> However, I think exposing information about whether or not
>> backpressure
>> >> is
>> >> >> being applied
>> >> >> (or even is configured) is a very reasonable idea.
>> >> >>
>> >> >> -Mark
>> >> >>
>> >> >>
>> >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
>> >> >>>
>> >> >>> I could see the argument for not making that available. What about
>> some
>> >> >>> sort of reference that would allow the ReportingTask to to
>> determine if
>> >> >>> backpressure is being applied to a Connection? It currently seems
>> you
>> >> can
>> >> >>> see the number of bytes and/or objects count queued in a connection
>> but
>> >> >>> don't have any reference to the values a user has setup for
>> >> backpressure
>> >> >> in
>> >> >>> the UI. Is there a way to get those values in the scope of the
>> >> >>> ReportingTask?
>> >> >>>
>> >> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende 
>> >> wrote:
>> >> >>>
>> >>  I think the only way you could do it directly without the REST API
>> is
>> >> by
>> >>  having access to the FlowController,
>> >>  but that is purposely not exposed to extension points... actually
>> >>  StandardFlowController is what implements the
>> >>  EventAccess interface which ends up providing the path way to the
>> >> status
>> >>  objects.
>> >> 
>> >>  I would have to defer to Joe, Mark, and others about whether we
>> would
>> >> >> want
>> >>  to expose direct access to components
>> >>  through controller services, or some other extension point.
>> >> 
>> >>  On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer 
>> >> wrote:
>> >> 
>> >> > Bryan,
>> >> >
>> >> > The ReportingTask enumeration makes sense and was helpful for
>> >> something
>> >> > else I am working on as well.
>> >> >
>> >> > Like Joe however I'm looking for a way to not just get the *Status
>> >>  objects
>> >> > but rather start and stop processors. Is there a way to do that
>> from
>> >> >> the
>> >> > ReportContext scope? I imagine you could pull the Processor "Id"
>> from
>> >> >> the
>> >> > ProcessorStatus and then use the REST API but was looking for
>> >> something
>> >> > more direct than having to use the REST API
>> >> >
>> >> >
>> >> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende 
>> >> wrote:
>> >> >
>> >> >> Hi Joe,
>> >> >>
>> >> >> I'm not sure if a controller service can do this, but a
>> >> ReportingTask
>> >>  has
>> >> >> access to similar information.
>> >> >>
>> >> >> A ReportingTask gets access 

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60782213
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class CuratorHeartbeatMonitor implements HeartbeatMonitor {
+private static final Logger logger = 
LoggerFactory.getLogger(CuratorHeartbeatMonitor.class);
+private static final Unmarshaller unmarshaller;
+
+private final ClusterCoordinator clusterCoordinator;
+private final ZooKeeperClientConfig zkClientConfig;
+private final String heartbeatPath;
+private final int heartbeatIntervalMillis;
+
+private volatile CuratorFramework curatorClient;
+private volatile ScheduledFuture future;
+private volatile Map 
latestHeartbeatMessages;
+private volatile long latestHeartbeatTime;
+
+private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
+
+static {
+try {
+final JAXBContext jaxbContext = 
JAXBContext.newInstance(HeartbeatMessage.class);
+unmarshaller = jaxbContext.createUnmarshaller();
+} catch (final Exception e) {
+throw new RuntimeException("Failed to create an Unmarshaller 
for unmarshalling Heartbeat Messages", e);
+}
+}
+
+public CuratorHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final Properties properties) {
+this.clusterCoordinator = clusterCoordinator;
+this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(properties);
+this.heartbeatPath = 
zkClientConfig.resolvePath("cluster/heartbeats");
+
+final String heartbeatInterval = 
properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+
+this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void start() {
+final RetryPolicy retryPolicy = new RetryForever(5000);
+curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+  

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60782150
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class CuratorHeartbeatMonitor implements HeartbeatMonitor {
+private static final Logger logger = 
LoggerFactory.getLogger(CuratorHeartbeatMonitor.class);
+private static final Unmarshaller unmarshaller;
+
+private final ClusterCoordinator clusterCoordinator;
+private final ZooKeeperClientConfig zkClientConfig;
+private final String heartbeatPath;
+private final int heartbeatIntervalMillis;
+
+private volatile CuratorFramework curatorClient;
+private volatile ScheduledFuture future;
+private volatile Map 
latestHeartbeatMessages;
+private volatile long latestHeartbeatTime;
+
+private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
+
+static {
+try {
+final JAXBContext jaxbContext = 
JAXBContext.newInstance(HeartbeatMessage.class);
+unmarshaller = jaxbContext.createUnmarshaller();
+} catch (final Exception e) {
+throw new RuntimeException("Failed to create an Unmarshaller 
for unmarshalling Heartbeat Messages", e);
+}
+}
+
+public CuratorHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final Properties properties) {
+this.clusterCoordinator = clusterCoordinator;
+this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(properties);
+this.heartbeatPath = 
zkClientConfig.resolvePath("cluster/heartbeats");
+
+final String heartbeatInterval = 
properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+
+this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void start() {
+final RetryPolicy retryPolicy = new RetryForever(5000);
+curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+  

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60781637
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class CuratorHeartbeatMonitor implements HeartbeatMonitor {
+private static final Logger logger = 
LoggerFactory.getLogger(CuratorHeartbeatMonitor.class);
+private static final Unmarshaller unmarshaller;
+
+private final ClusterCoordinator clusterCoordinator;
+private final ZooKeeperClientConfig zkClientConfig;
+private final String heartbeatPath;
+private final int heartbeatIntervalMillis;
+
+private volatile CuratorFramework curatorClient;
+private volatile ScheduledFuture future;
+private volatile Map 
latestHeartbeatMessages;
+private volatile long latestHeartbeatTime;
+
+private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
+
+static {
+try {
+final JAXBContext jaxbContext = 
JAXBContext.newInstance(HeartbeatMessage.class);
+unmarshaller = jaxbContext.createUnmarshaller();
+} catch (final Exception e) {
+throw new RuntimeException("Failed to create an Unmarshaller 
for unmarshalling Heartbeat Messages", e);
+}
+}
+
+public CuratorHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final Properties properties) {
+this.clusterCoordinator = clusterCoordinator;
+this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(properties);
+this.heartbeatPath = 
zkClientConfig.resolvePath("cluster/heartbeats");
+
+final String heartbeatInterval = 
properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+
+this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void start() {
+final RetryPolicy retryPolicy = new RetryForever(5000);
+curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+  

Re: Enumerating processors, queues, etc.

2016-04-22 Thread Joe Skora
Joe Witt - Not really, this kind of went sideways from where I was
originally headed.

I'm looking for a way for a controller service to iterate over the
processor and queue collections (maybe others as well) to look for
performance thresholds or other issues and then provide feedback somehow or
report externally.

If it can be done through the REST API, seems like it should be possible
from within the framework as well.

On Fri, Apr 22, 2016 at 1:32 PM, Joe Witt  wrote:

> Joe Skora - does Jeremy's JIRA cover your use case needs?
>
> On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer  wrote:
> > Mark,
> >
> > ok that makes sense. I have created a jira for this improvement
> > https://issues.apache.org/jira/browse/NIFI-1805
> >
> > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne 
> wrote:
> >
> >> Jeremy,
> >>
> >> It should be relatively easy. In FlowController, we would have to update
> >> getGroupStatus() to set the values on ConnectionStatus
> >> and of course update ConnectionStatus to have getters & setters for the
> >> new values. That should be about it, I think.
> >>
> >> -Mark
> >>
> >>
> >> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer  wrote:
> >> >
> >> > Mark,
> >> >
> >> > What would the process look like for doing that? Would that be
> something
> >> > trivial or require some reworking?
> >> >
> >> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne 
> >> wrote:
> >> >
> >> >> I definitely don't think we should be exposing the FlowController to
> a
> >> >> Reporting Task.
> >> >> However, I think exposing information about whether or not
> backpressure
> >> is
> >> >> being applied
> >> >> (or even is configured) is a very reasonable idea.
> >> >>
> >> >> -Mark
> >> >>
> >> >>
> >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
> >> >>>
> >> >>> I could see the argument for not making that available. What about
> some
> >> >>> sort of reference that would allow the ReportingTask to to
> determine if
> >> >>> backpressure is being applied to a Connection? It currently seems
> you
> >> can
> >> >>> see the number of bytes and/or objects count queued in a connection
> but
> >> >>> don't have any reference to the values a user has setup for
> >> backpressure
> >> >> in
> >> >>> the UI. Is there a way to get those values in the scope of the
> >> >>> ReportingTask?
> >> >>>
> >> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende 
> >> wrote:
> >> >>>
> >>  I think the only way you could do it directly without the REST API
> is
> >> by
> >>  having access to the FlowController,
> >>  but that is purposely not exposed to extension points... actually
> >>  StandardFlowController is what implements the
> >>  EventAccess interface which ends up providing the path way to the
> >> status
> >>  objects.
> >> 
> >>  I would have to defer to Joe, Mark, and others about whether we
> would
> >> >> want
> >>  to expose direct access to components
> >>  through controller services, or some other extension point.
> >> 
> >>  On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer 
> >> wrote:
> >> 
> >> > Bryan,
> >> >
> >> > The ReportingTask enumeration makes sense and was helpful for
> >> something
> >> > else I am working on as well.
> >> >
> >> > Like Joe however I'm looking for a way to not just get the *Status
> >>  objects
> >> > but rather start and stop processors. Is there a way to do that
> from
> >> >> the
> >> > ReportContext scope? I imagine you could pull the Processor "Id"
> from
> >> >> the
> >> > ProcessorStatus and then use the REST API but was looking for
> >> something
> >> > more direct than having to use the REST API
> >> >
> >> >
> >> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende 
> >> wrote:
> >> >
> >> >> Hi Joe,
> >> >>
> >> >> I'm not sure if a controller service can do this, but a
> >> ReportingTask
> >>  has
> >> >> access to similar information.
> >> >>
> >> >> A ReportingTask gets access to a ReportingContext, which can
> access
> >> >> EventAccess which can access ProcessGroupStatus.
> >> >>
> >> >> From ProcessGroupStatus you are at the root process group and can
> >> > enumerate
> >> >> the flow:
> >> >>
> >> >> private Collection connectionStatus = new
> >> > ArrayList<>();
> >> >> private Collection processorStatus = new
> >>  ArrayList<>();
> >> >> private Collection processGroupStatus = new
> >> >> ArrayList<>();
> >> >> private Collection
> >> remoteProcessGroupStatus
> >> >> =
> >> > new
> >> >> ArrayList<>();
> >> >> private Collection inputPortStatus = new
> ArrayList<>();
> >> >> private Collection outputPortStatus = new
> ArrayList<>();
> >> >>
> >> >> Not sure if that is what you were looking for.
> >> 

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60781488
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class CuratorHeartbeatMonitor implements HeartbeatMonitor {
+private static final Logger logger = 
LoggerFactory.getLogger(CuratorHeartbeatMonitor.class);
+private static final Unmarshaller unmarshaller;
+
+private final ClusterCoordinator clusterCoordinator;
+private final ZooKeeperClientConfig zkClientConfig;
+private final String heartbeatPath;
+private final int heartbeatIntervalMillis;
+
+private volatile CuratorFramework curatorClient;
+private volatile ScheduledFuture future;
+private volatile Map 
latestHeartbeatMessages;
+private volatile long latestHeartbeatTime;
+
+private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
+
+static {
+try {
+final JAXBContext jaxbContext = 
JAXBContext.newInstance(HeartbeatMessage.class);
+unmarshaller = jaxbContext.createUnmarshaller();
+} catch (final Exception e) {
+throw new RuntimeException("Failed to create an Unmarshaller 
for unmarshalling Heartbeat Messages", e);
+}
+}
+
+public CuratorHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final Properties properties) {
+this.clusterCoordinator = clusterCoordinator;
+this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(properties);
+this.heartbeatPath = 
zkClientConfig.resolvePath("cluster/heartbeats");
+
+final String heartbeatInterval = 
properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+
+this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void start() {
+final RetryPolicy retryPolicy = new RetryForever(5000);
+curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+  

[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/323#discussion_r60781310
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+
+public class AdaptedNodeConnectionStatus {
+private NodeConnectionState state;
+private DisconnectionCode disconnectCode;
+private String disconnectReason;
+private Long connectionRequestTime;
+
+public AdaptedNodeConnectionStatus() {
+}
--- End diff --

Nope. Removing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Monitoring duplicate file entries with ExecuteSQL

2016-04-22 Thread Simon Ball
You need to quote the attribute in your SQL: '${filename}' to produce valid 
SQL. 

One alternative to this would be the use ListFile -> FetchFile together. With 
that pattern, ListFile maintains the state of which files have been processed 
for you within nifi, meaning you wouldn't need the separate MySQL log, so could 
reduce your complexity of you don't need the SQL log for anything else.

Simon 


> On 22 Apr 2016, at 18:22, mfzeidan  wrote:
> 
> I am trying to use a ExecutesSQL process to check if a file has already been
> processed.
> 
> Files are flowing into our nifi and we have it set up to move files to
> another folder. Once the file is moved, we just log the history into MYSQL. 
> 
> I want to have it check if the filename is already in the db, thus checking
> if we have already processed this file. 
> 
> SELECT * FROM tbl WHERE ${filename} = File_Name(this is the column header in
> our MYSQL.)
> 
> I have yet to get anything besides an error being generated saying my sql
> statement is incorrect. Am I going about this the wrong way? Does anyone
> have a better way of checking this sort of thing? I have added in future
> file names(as the file names are predictable) but it still will give me an
> error.
> 
> Thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-nifi-developer-list.39713.n7.nabble.com/Monitoring-duplicate-file-entries-with-ExecuteSQL-tp9490.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
> 


[GitHub] nifi pull request: NIFI-1805

2016-04-22 Thread jdye64
Github user jdye64 commented on the pull request:

https://github.com/apache/nifi/pull/377#issuecomment-213528810
  
It might make more sense to leave those sort of operations to a 
ReportingTask however? Does anyone have any thoughts around that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1805

2016-04-22 Thread jdye64
Github user jdye64 commented on the pull request:

https://github.com/apache/nifi/pull/377#issuecomment-213528547
  
@aperepel that isn't a bad idea. That takes a little more discussion around 
how often we continue to post that bulletin if the threshold is constantly 
exceeded, what severity, etc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1805

2016-04-22 Thread jdye64
GitHub user jdye64 opened a pull request:

https://github.com/apache/nifi/pull/377

NIFI-1805

Expose BackPressureObjectThreshold and BackPressureDataSizeThreshold to
ConnectionStatus

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jdye64/nifi NIFI-1805

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/377.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #377


commit 51505bd34f46ea55f0e807ab1d4e764977421aa6
Author: Jeremy Dyer 
Date:   2016-04-22T17:28:29Z

NIFI-1805

Expose BackPressureObjectThreshold and BackPressureDataSizeThreshold to
ConnectionStatus




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1805

2016-04-22 Thread aperepel
Github user aperepel commented on the pull request:

https://github.com/apache/nifi/pull/377#issuecomment-213526177
  
I wonder if, upon crossing a threshold, we should raise a bulletin maybe?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Joe Witt
Joe Skora - does Jeremy's JIRA cover your use case needs?

On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer  wrote:
> Mark,
>
> ok that makes sense. I have created a jira for this improvement
> https://issues.apache.org/jira/browse/NIFI-1805
>
> On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne  wrote:
>
>> Jeremy,
>>
>> It should be relatively easy. In FlowController, we would have to update
>> getGroupStatus() to set the values on ConnectionStatus
>> and of course update ConnectionStatus to have getters & setters for the
>> new values. That should be about it, I think.
>>
>> -Mark
>>
>>
>> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer  wrote:
>> >
>> > Mark,
>> >
>> > What would the process look like for doing that? Would that be something
>> > trivial or require some reworking?
>> >
>> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne 
>> wrote:
>> >
>> >> I definitely don't think we should be exposing the FlowController to a
>> >> Reporting Task.
>> >> However, I think exposing information about whether or not backpressure
>> is
>> >> being applied
>> >> (or even is configured) is a very reasonable idea.
>> >>
>> >> -Mark
>> >>
>> >>
>> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
>> >>>
>> >>> I could see the argument for not making that available. What about some
>> >>> sort of reference that would allow the ReportingTask to to determine if
>> >>> backpressure is being applied to a Connection? It currently seems you
>> can
>> >>> see the number of bytes and/or objects count queued in a connection but
>> >>> don't have any reference to the values a user has setup for
>> backpressure
>> >> in
>> >>> the UI. Is there a way to get those values in the scope of the
>> >>> ReportingTask?
>> >>>
>> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende 
>> wrote:
>> >>>
>>  I think the only way you could do it directly without the REST API is
>> by
>>  having access to the FlowController,
>>  but that is purposely not exposed to extension points... actually
>>  StandardFlowController is what implements the
>>  EventAccess interface which ends up providing the path way to the
>> status
>>  objects.
>> 
>>  I would have to defer to Joe, Mark, and others about whether we would
>> >> want
>>  to expose direct access to components
>>  through controller services, or some other extension point.
>> 
>>  On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer 
>> wrote:
>> 
>> > Bryan,
>> >
>> > The ReportingTask enumeration makes sense and was helpful for
>> something
>> > else I am working on as well.
>> >
>> > Like Joe however I'm looking for a way to not just get the *Status
>>  objects
>> > but rather start and stop processors. Is there a way to do that from
>> >> the
>> > ReportContext scope? I imagine you could pull the Processor "Id" from
>> >> the
>> > ProcessorStatus and then use the REST API but was looking for
>> something
>> > more direct than having to use the REST API
>> >
>> >
>> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende 
>> wrote:
>> >
>> >> Hi Joe,
>> >>
>> >> I'm not sure if a controller service can do this, but a
>> ReportingTask
>>  has
>> >> access to similar information.
>> >>
>> >> A ReportingTask gets access to a ReportingContext, which can access
>> >> EventAccess which can access ProcessGroupStatus.
>> >>
>> >> From ProcessGroupStatus you are at the root process group and can
>> > enumerate
>> >> the flow:
>> >>
>> >> private Collection connectionStatus = new
>> > ArrayList<>();
>> >> private Collection processorStatus = new
>>  ArrayList<>();
>> >> private Collection processGroupStatus = new
>> >> ArrayList<>();
>> >> private Collection
>> remoteProcessGroupStatus
>> >> =
>> > new
>> >> ArrayList<>();
>> >> private Collection inputPortStatus = new ArrayList<>();
>> >> private Collection outputPortStatus = new ArrayList<>();
>> >>
>> >> Not sure if that is what you were looking for.
>> >>
>> >> -Bryan
>> >>
>> >>
>> >> On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora 
>> wrote:
>> >>
>> >>> Is it possible and if so what is the best way for a controller
>>  service
>> > to
>> >>> get the collection of all processors or queues?
>> >>>
>> >>> The goal being to iterate over the collection of processors or
>> queues
>> > to
>> >>> gather information or make adjustments to the flow.
>> >>>
>> >>
>> >
>> 
>> >>
>> >>
>>
>>


Monitoring duplicate file entries with ExecuteSQL

2016-04-22 Thread mfzeidan
I am trying to use a ExecutesSQL process to check if a file has already been
processed.

Files are flowing into our nifi and we have it set up to move files to
another folder. Once the file is moved, we just log the history into MYSQL. 

I want to have it check if the filename is already in the db, thus checking
if we have already processed this file. 

SELECT * FROM tbl WHERE ${filename} = File_Name(this is the column header in
our MYSQL.)

I have yet to get anything besides an error being generated saying my sql
statement is incorrect. Am I going about this the wrong way? Does anyone
have a better way of checking this sort of thing? I have added in future
file names(as the file names are predictable) but it still will give me an
error.

Thanks



--
View this message in context: 
http://apache-nifi-developer-list.39713.n7.nabble.com/Monitoring-duplicate-file-entries-with-ExecuteSQL-tp9490.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.


[GitHub] nifi pull request: NIFI-1678

2016-04-22 Thread mcgilman
Github user mcgilman commented on the pull request:

https://github.com/apache/nifi/pull/323#issuecomment-213514155
  
Reviewing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Jeremy Dyer
Mark,

ok that makes sense. I have created a jira for this improvement
https://issues.apache.org/jira/browse/NIFI-1805

On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne  wrote:

> Jeremy,
>
> It should be relatively easy. In FlowController, we would have to update
> getGroupStatus() to set the values on ConnectionStatus
> and of course update ConnectionStatus to have getters & setters for the
> new values. That should be about it, I think.
>
> -Mark
>
>
> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer  wrote:
> >
> > Mark,
> >
> > What would the process look like for doing that? Would that be something
> > trivial or require some reworking?
> >
> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne 
> wrote:
> >
> >> I definitely don't think we should be exposing the FlowController to a
> >> Reporting Task.
> >> However, I think exposing information about whether or not backpressure
> is
> >> being applied
> >> (or even is configured) is a very reasonable idea.
> >>
> >> -Mark
> >>
> >>
> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
> >>>
> >>> I could see the argument for not making that available. What about some
> >>> sort of reference that would allow the ReportingTask to to determine if
> >>> backpressure is being applied to a Connection? It currently seems you
> can
> >>> see the number of bytes and/or objects count queued in a connection but
> >>> don't have any reference to the values a user has setup for
> backpressure
> >> in
> >>> the UI. Is there a way to get those values in the scope of the
> >>> ReportingTask?
> >>>
> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende 
> wrote:
> >>>
>  I think the only way you could do it directly without the REST API is
> by
>  having access to the FlowController,
>  but that is purposely not exposed to extension points... actually
>  StandardFlowController is what implements the
>  EventAccess interface which ends up providing the path way to the
> status
>  objects.
> 
>  I would have to defer to Joe, Mark, and others about whether we would
> >> want
>  to expose direct access to components
>  through controller services, or some other extension point.
> 
>  On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer 
> wrote:
> 
> > Bryan,
> >
> > The ReportingTask enumeration makes sense and was helpful for
> something
> > else I am working on as well.
> >
> > Like Joe however I'm looking for a way to not just get the *Status
>  objects
> > but rather start and stop processors. Is there a way to do that from
> >> the
> > ReportContext scope? I imagine you could pull the Processor "Id" from
> >> the
> > ProcessorStatus and then use the REST API but was looking for
> something
> > more direct than having to use the REST API
> >
> >
> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende 
> wrote:
> >
> >> Hi Joe,
> >>
> >> I'm not sure if a controller service can do this, but a
> ReportingTask
>  has
> >> access to similar information.
> >>
> >> A ReportingTask gets access to a ReportingContext, which can access
> >> EventAccess which can access ProcessGroupStatus.
> >>
> >> From ProcessGroupStatus you are at the root process group and can
> > enumerate
> >> the flow:
> >>
> >> private Collection connectionStatus = new
> > ArrayList<>();
> >> private Collection processorStatus = new
>  ArrayList<>();
> >> private Collection processGroupStatus = new
> >> ArrayList<>();
> >> private Collection
> remoteProcessGroupStatus
> >> =
> > new
> >> ArrayList<>();
> >> private Collection inputPortStatus = new ArrayList<>();
> >> private Collection outputPortStatus = new ArrayList<>();
> >>
> >> Not sure if that is what you were looking for.
> >>
> >> -Bryan
> >>
> >>
> >> On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora 
> wrote:
> >>
> >>> Is it possible and if so what is the best way for a controller
>  service
> > to
> >>> get the collection of all processors or queues?
> >>>
> >>> The goal being to iterate over the collection of processors or
> queues
> > to
> >>> gather information or make adjustments to the flow.
> >>>
> >>
> >
> 
> >>
> >>
>
>


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Mark Payne
Jeremy,

It should be relatively easy. In FlowController, we would have to update 
getGroupStatus() to set the values on ConnectionStatus
and of course update ConnectionStatus to have getters & setters for the new 
values. That should be about it, I think.

-Mark


> On Apr 22, 2016, at 12:17 PM, Jeremy Dyer  wrote:
> 
> Mark,
> 
> What would the process look like for doing that? Would that be something
> trivial or require some reworking?
> 
> On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne  wrote:
> 
>> I definitely don't think we should be exposing the FlowController to a
>> Reporting Task.
>> However, I think exposing information about whether or not backpressure is
>> being applied
>> (or even is configured) is a very reasonable idea.
>> 
>> -Mark
>> 
>> 
>>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
>>> 
>>> I could see the argument for not making that available. What about some
>>> sort of reference that would allow the ReportingTask to to determine if
>>> backpressure is being applied to a Connection? It currently seems you can
>>> see the number of bytes and/or objects count queued in a connection but
>>> don't have any reference to the values a user has setup for backpressure
>> in
>>> the UI. Is there a way to get those values in the scope of the
>>> ReportingTask?
>>> 
>>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende  wrote:
>>> 
 I think the only way you could do it directly without the REST API is by
 having access to the FlowController,
 but that is purposely not exposed to extension points... actually
 StandardFlowController is what implements the
 EventAccess interface which ends up providing the path way to the status
 objects.
 
 I would have to defer to Joe, Mark, and others about whether we would
>> want
 to expose direct access to components
 through controller services, or some other extension point.
 
 On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer  wrote:
 
> Bryan,
> 
> The ReportingTask enumeration makes sense and was helpful for something
> else I am working on as well.
> 
> Like Joe however I'm looking for a way to not just get the *Status
 objects
> but rather start and stop processors. Is there a way to do that from
>> the
> ReportContext scope? I imagine you could pull the Processor "Id" from
>> the
> ProcessorStatus and then use the REST API but was looking for something
> more direct than having to use the REST API
> 
> 
> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:
> 
>> Hi Joe,
>> 
>> I'm not sure if a controller service can do this, but a ReportingTask
 has
>> access to similar information.
>> 
>> A ReportingTask gets access to a ReportingContext, which can access
>> EventAccess which can access ProcessGroupStatus.
>> 
>> From ProcessGroupStatus you are at the root process group and can
> enumerate
>> the flow:
>> 
>> private Collection connectionStatus = new
> ArrayList<>();
>> private Collection processorStatus = new
 ArrayList<>();
>> private Collection processGroupStatus = new
>> ArrayList<>();
>> private Collection remoteProcessGroupStatus
>> =
> new
>> ArrayList<>();
>> private Collection inputPortStatus = new ArrayList<>();
>> private Collection outputPortStatus = new ArrayList<>();
>> 
>> Not sure if that is what you were looking for.
>> 
>> -Bryan
>> 
>> 
>> On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
>> 
>>> Is it possible and if so what is the best way for a controller
 service
> to
>>> get the collection of all processors or queues?
>>> 
>>> The goal being to iterate over the collection of processors or queues
> to
>>> gather information or make adjustments to the flow.
>>> 
>> 
> 
 
>> 
>> 



[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

2016-04-22 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/366#discussion_r60766260
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * Base class for {@link Processor}s to publish and consume messages from 
Kafka
+ *
+ * @see PutKafka
+ */
+abstract class AbstractKafkaProcessor extends 
AbstractSessionFactoryProcessor {
+
+
+private volatile boolean acceptTask = true;
+
+private final AtomicInteger taskCounter = new AtomicInteger();
+
+
+/**
+ * @see KafkaPublisher
+ */
+volatile T kafkaResource;
+
+/**
+ *
+ */
+@Override
+public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+final ProcessSession session = sessionFactory.createSession();
+if (this.acceptTask) {
+try {
+this.taskCounter.incrementAndGet();
+if (this.kafkaResource == null) {
+synchronized (this) {
+if (this.kafkaResource == null) {
+this.kafkaResource = 
this.buildKafkaResource(context, session);
+}
+}
+}
+
+this.rendezvousWithKafka(context, session);
+session.commit();
+} catch (Throwable e) {
+this.acceptTask = false;
+this.getLogger().error("{} failed to process due to {}; 
rolling back session", new Object[] { this, e });
+session.rollback(true);
+} finally {
+this.resetProcessorIfNecessary();
+}
+} else {
+context.yield();
+}
+}
+/**
+ * Resets the processor to initial state if necessary. The necessary 
state
+ * means there are no active task which could only happen if currently
+ * executing tasks are given a chance to finish while no new tasks are
+ * accepted (see {@link #acceptTask}
+ */
+private boolean resetProcessorIfNecessary() {
+boolean reset = this.taskCounter.decrementAndGet() == 0 && 
!this.acceptTask;
+if (reset) {
+this.close();
+this.acceptTask = true;
+}
+return reset;
+}
+
+/**
+ * Will call {@link Closeable#close()} on the target resource after 
which
+ * the target resource will be set to null
+ *
+ * @see KafkaPublisher
+ */
+@OnStopped
+public void close() {
--- End diff --

@olegz could we reduce the scope here?  The concern I have is that 
technically this is a check-then-modify scenario and it could be called by both 
internal processor threads executing resetProcessorIfNecessary and whenever the 
framework is calling onStopped.  Now, by the way the framework works onStopped 
should only be called once there are no threads left so technically this should 
be legit.  Just would not want someone to also call this from other places in 
the code if we can avoid it.  So recommend we 

Re: Enumerating processors, queues, etc.

2016-04-22 Thread Jeremy Dyer
Mark,

What would the process look like for doing that? Would that be something
trivial or require some reworking?

On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne  wrote:

> I definitely don't think we should be exposing the FlowController to a
> Reporting Task.
> However, I think exposing information about whether or not backpressure is
> being applied
> (or even is configured) is a very reasonable idea.
>
> -Mark
>
>
> > On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
> >
> > I could see the argument for not making that available. What about some
> > sort of reference that would allow the ReportingTask to to determine if
> > backpressure is being applied to a Connection? It currently seems you can
> > see the number of bytes and/or objects count queued in a connection but
> > don't have any reference to the values a user has setup for backpressure
> in
> > the UI. Is there a way to get those values in the scope of the
> > ReportingTask?
> >
> > On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende  wrote:
> >
> >> I think the only way you could do it directly without the REST API is by
> >> having access to the FlowController,
> >> but that is purposely not exposed to extension points... actually
> >> StandardFlowController is what implements the
> >> EventAccess interface which ends up providing the path way to the status
> >> objects.
> >>
> >> I would have to defer to Joe, Mark, and others about whether we would
> want
> >> to expose direct access to components
> >> through controller services, or some other extension point.
> >>
> >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer  wrote:
> >>
> >>> Bryan,
> >>>
> >>> The ReportingTask enumeration makes sense and was helpful for something
> >>> else I am working on as well.
> >>>
> >>> Like Joe however I'm looking for a way to not just get the *Status
> >> objects
> >>> but rather start and stop processors. Is there a way to do that from
> the
> >>> ReportContext scope? I imagine you could pull the Processor "Id" from
> the
> >>> ProcessorStatus and then use the REST API but was looking for something
> >>> more direct than having to use the REST API
> >>>
> >>>
> >>> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:
> >>>
>  Hi Joe,
> 
>  I'm not sure if a controller service can do this, but a ReportingTask
> >> has
>  access to similar information.
> 
>  A ReportingTask gets access to a ReportingContext, which can access
>  EventAccess which can access ProcessGroupStatus.
> 
>  From ProcessGroupStatus you are at the root process group and can
> >>> enumerate
>  the flow:
> 
>  private Collection connectionStatus = new
> >>> ArrayList<>();
>  private Collection processorStatus = new
> >> ArrayList<>();
>  private Collection processGroupStatus = new
>  ArrayList<>();
>  private Collection remoteProcessGroupStatus
> =
> >>> new
>  ArrayList<>();
>  private Collection inputPortStatus = new ArrayList<>();
>  private Collection outputPortStatus = new ArrayList<>();
> 
>  Not sure if that is what you were looking for.
> 
>  -Bryan
> 
> 
>  On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
> 
> > Is it possible and if so what is the best way for a controller
> >> service
> >>> to
> > get the collection of all processors or queues?
> >
> > The goal being to iterate over the collection of processors or queues
> >>> to
> > gather information or make adjustments to the flow.
> >
> 
> >>>
> >>
>
>


[GitHub] nifi-minifi pull request: MINIFI-27 adding debug statements to Boo...

2016-04-22 Thread JPercivall
GitHub user JPercivall opened a pull request:

https://github.com/apache/nifi-minifi/pull/16

MINIFI-27 adding debug statements to BootstrapCodec



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JPercivall/nifi-minifi MINIFI-27

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi/pull/16.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16


commit 9891fb3fd57149ab874342e122775b5b16653d87
Author: Joseph Percivall 
Date:   2016-04-22T14:56:45Z

MINIFI-27 adding debug statements to BootstrapCodec




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Joe Skora
For an extensible system like NiFi, what is the rational behind not
allowing the system introspection into the processor and queue collections?

On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne  wrote:

> I definitely don't think we should be exposing the FlowController to a
> Reporting Task.
> However, I think exposing information about whether or not backpressure is
> being applied
> (or even is configured) is a very reasonable idea.
>
> -Mark
>
>
> > On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
> >
> > I could see the argument for not making that available. What about some
> > sort of reference that would allow the ReportingTask to to determine if
> > backpressure is being applied to a Connection? It currently seems you can
> > see the number of bytes and/or objects count queued in a connection but
> > don't have any reference to the values a user has setup for backpressure
> in
> > the UI. Is there a way to get those values in the scope of the
> > ReportingTask?
> >
> > On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende  wrote:
> >
> >> I think the only way you could do it directly without the REST API is by
> >> having access to the FlowController,
> >> but that is purposely not exposed to extension points... actually
> >> StandardFlowController is what implements the
> >> EventAccess interface which ends up providing the path way to the status
> >> objects.
> >>
> >> I would have to defer to Joe, Mark, and others about whether we would
> want
> >> to expose direct access to components
> >> through controller services, or some other extension point.
> >>
> >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer  wrote:
> >>
> >>> Bryan,
> >>>
> >>> The ReportingTask enumeration makes sense and was helpful for something
> >>> else I am working on as well.
> >>>
> >>> Like Joe however I'm looking for a way to not just get the *Status
> >> objects
> >>> but rather start and stop processors. Is there a way to do that from
> the
> >>> ReportContext scope? I imagine you could pull the Processor "Id" from
> the
> >>> ProcessorStatus and then use the REST API but was looking for something
> >>> more direct than having to use the REST API
> >>>
> >>>
> >>> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:
> >>>
>  Hi Joe,
> 
>  I'm not sure if a controller service can do this, but a ReportingTask
> >> has
>  access to similar information.
> 
>  A ReportingTask gets access to a ReportingContext, which can access
>  EventAccess which can access ProcessGroupStatus.
> 
>  From ProcessGroupStatus you are at the root process group and can
> >>> enumerate
>  the flow:
> 
>  private Collection connectionStatus = new
> >>> ArrayList<>();
>  private Collection processorStatus = new
> >> ArrayList<>();
>  private Collection processGroupStatus = new
>  ArrayList<>();
>  private Collection remoteProcessGroupStatus
> =
> >>> new
>  ArrayList<>();
>  private Collection inputPortStatus = new ArrayList<>();
>  private Collection outputPortStatus = new ArrayList<>();
> 
>  Not sure if that is what you were looking for.
> 
>  -Bryan
> 
> 
>  On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
> 
> > Is it possible and if so what is the best way for a controller
> >> service
> >>> to
> > get the collection of all processors or queues?
> >
> > The goal being to iterate over the collection of processors or queues
> >>> to
> > gather information or make adjustments to the flow.
> >
> 
> >>>
> >>
>
>


Feature Proposal for Temporal Analytics using Stateful Processors

2016-04-22 Thread Joe Percivall
Hello Dev,

In the same vein as the discuss thread I initiated for adding Decimals to 
Expression Language (EL), I created a Feature Proposal for "Leverage Local 
State to Do Temporal Analytics"[1]. This again is a continuation of what I 
presented at the Apache NiFi MeetUp in Maryland[2]. The relatively recent 
additions of state to processors[3] and the ability for processors to expose 
key value pairs to EL[4] allows for the creation of processors which enable 
Users to do temporal analysis and data analytics using EL.

I look forward to any questions, comments, or concerns you may have.

[1] 
https://cwiki.apache.org/confluence/display/NIFI/Leverage+Local+State+to+Do+Temporal+Analytics
[2] http://www.meetup.com/ApacheNiFi/events/229158777/
[3] https://issues.apache.org/jira/browse/NIFI-259
[4] https://issues.apache.org/jira/browse/NIFI-1249


Joe
- - - - - - 
Joseph Percivall
linkedin.com/in/Percivall
e: joeperciv...@yahoo.com


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Joe Skora
I'm fine with the REST API being a better answer or part of the answer.

A separate system that can monitor and manage an instance through the REST
API might be the right way to go, plus it separates the concerns of monitor
and monitored.

But to me, a controller service seems like a logical choice for a tool to
be configure with thresholds that will be used to evaluate processors and
queues and either alert or modify UI parameters when components exceed the
thresholds.  My original use case was the "Processor Stop Light Service"
that could change the background (or something similar) for controllers
based on their state.

On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende  wrote:

> I think the only way you could do it directly without the REST API is by
> having access to the FlowController,
> but that is purposely not exposed to extension points... actually
> StandardFlowController is what implements the
> EventAccess interface which ends up providing the path way to the status
> objects.
>
> I would have to defer to Joe, Mark, and others about whether we would want
> to expose direct access to components
> through controller services, or some other extension point.
>
> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer  wrote:
>
> > Bryan,
> >
> > The ReportingTask enumeration makes sense and was helpful for something
> > else I am working on as well.
> >
> > Like Joe however I'm looking for a way to not just get the *Status
> objects
> > but rather start and stop processors. Is there a way to do that from the
> > ReportContext scope? I imagine you could pull the Processor "Id" from the
> > ProcessorStatus and then use the REST API but was looking for something
> > more direct than having to use the REST API
> >
> >
> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:
> >
> > > Hi Joe,
> > >
> > > I'm not sure if a controller service can do this, but a ReportingTask
> has
> > > access to similar information.
> > >
> > > A ReportingTask gets access to a ReportingContext, which can access
> > > EventAccess which can access ProcessGroupStatus.
> > >
> > > From ProcessGroupStatus you are at the root process group and can
> > enumerate
> > > the flow:
> > >
> > > private Collection connectionStatus = new
> > ArrayList<>();
> > > private Collection processorStatus = new
> ArrayList<>();
> > > private Collection processGroupStatus = new
> > > ArrayList<>();
> > > private Collection remoteProcessGroupStatus =
> > new
> > > ArrayList<>();
> > > private Collection inputPortStatus = new ArrayList<>();
> > > private Collection outputPortStatus = new ArrayList<>();
> > >
> > > Not sure if that is what you were looking for.
> > >
> > > -Bryan
> > >
> > >
> > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
> > >
> > > > Is it possible and if so what is the best way for a controller
> service
> > to
> > > > get the collection of all processors or queues?
> > > >
> > > > The goal being to iterate over the collection of processors or queues
> > to
> > > > gather information or make adjustments to the flow.
> > > >
> > >
> >
>


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Mark Payne
I definitely don't think we should be exposing the FlowController to a 
Reporting Task.
However, I think exposing information about whether or not backpressure is 
being applied
(or even is configured) is a very reasonable idea.

-Mark


> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer  wrote:
> 
> I could see the argument for not making that available. What about some
> sort of reference that would allow the ReportingTask to to determine if
> backpressure is being applied to a Connection? It currently seems you can
> see the number of bytes and/or objects count queued in a connection but
> don't have any reference to the values a user has setup for backpressure in
> the UI. Is there a way to get those values in the scope of the
> ReportingTask?
> 
> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende  wrote:
> 
>> I think the only way you could do it directly without the REST API is by
>> having access to the FlowController,
>> but that is purposely not exposed to extension points... actually
>> StandardFlowController is what implements the
>> EventAccess interface which ends up providing the path way to the status
>> objects.
>> 
>> I would have to defer to Joe, Mark, and others about whether we would want
>> to expose direct access to components
>> through controller services, or some other extension point.
>> 
>> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer  wrote:
>> 
>>> Bryan,
>>> 
>>> The ReportingTask enumeration makes sense and was helpful for something
>>> else I am working on as well.
>>> 
>>> Like Joe however I'm looking for a way to not just get the *Status
>> objects
>>> but rather start and stop processors. Is there a way to do that from the
>>> ReportContext scope? I imagine you could pull the Processor "Id" from the
>>> ProcessorStatus and then use the REST API but was looking for something
>>> more direct than having to use the REST API
>>> 
>>> 
>>> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:
>>> 
 Hi Joe,
 
 I'm not sure if a controller service can do this, but a ReportingTask
>> has
 access to similar information.
 
 A ReportingTask gets access to a ReportingContext, which can access
 EventAccess which can access ProcessGroupStatus.
 
 From ProcessGroupStatus you are at the root process group and can
>>> enumerate
 the flow:
 
 private Collection connectionStatus = new
>>> ArrayList<>();
 private Collection processorStatus = new
>> ArrayList<>();
 private Collection processGroupStatus = new
 ArrayList<>();
 private Collection remoteProcessGroupStatus =
>>> new
 ArrayList<>();
 private Collection inputPortStatus = new ArrayList<>();
 private Collection outputPortStatus = new ArrayList<>();
 
 Not sure if that is what you were looking for.
 
 -Bryan
 
 
 On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
 
> Is it possible and if so what is the best way for a controller
>> service
>>> to
> get the collection of all processors or queues?
> 
> The goal being to iterate over the collection of processors or queues
>>> to
> gather information or make adjustments to the flow.
> 
 
>>> 
>> 



Re: Enumerating processors, queues, etc.

2016-04-22 Thread Jeremy Dyer
I could see the argument for not making that available. What about some
sort of reference that would allow the ReportingTask to to determine if
backpressure is being applied to a Connection? It currently seems you can
see the number of bytes and/or objects count queued in a connection but
don't have any reference to the values a user has setup for backpressure in
the UI. Is there a way to get those values in the scope of the
ReportingTask?

On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende  wrote:

> I think the only way you could do it directly without the REST API is by
> having access to the FlowController,
> but that is purposely not exposed to extension points... actually
> StandardFlowController is what implements the
> EventAccess interface which ends up providing the path way to the status
> objects.
>
> I would have to defer to Joe, Mark, and others about whether we would want
> to expose direct access to components
> through controller services, or some other extension point.
>
> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer  wrote:
>
> > Bryan,
> >
> > The ReportingTask enumeration makes sense and was helpful for something
> > else I am working on as well.
> >
> > Like Joe however I'm looking for a way to not just get the *Status
> objects
> > but rather start and stop processors. Is there a way to do that from the
> > ReportContext scope? I imagine you could pull the Processor "Id" from the
> > ProcessorStatus and then use the REST API but was looking for something
> > more direct than having to use the REST API
> >
> >
> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:
> >
> > > Hi Joe,
> > >
> > > I'm not sure if a controller service can do this, but a ReportingTask
> has
> > > access to similar information.
> > >
> > > A ReportingTask gets access to a ReportingContext, which can access
> > > EventAccess which can access ProcessGroupStatus.
> > >
> > > From ProcessGroupStatus you are at the root process group and can
> > enumerate
> > > the flow:
> > >
> > > private Collection connectionStatus = new
> > ArrayList<>();
> > > private Collection processorStatus = new
> ArrayList<>();
> > > private Collection processGroupStatus = new
> > > ArrayList<>();
> > > private Collection remoteProcessGroupStatus =
> > new
> > > ArrayList<>();
> > > private Collection inputPortStatus = new ArrayList<>();
> > > private Collection outputPortStatus = new ArrayList<>();
> > >
> > > Not sure if that is what you were looking for.
> > >
> > > -Bryan
> > >
> > >
> > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
> > >
> > > > Is it possible and if so what is the best way for a controller
> service
> > to
> > > > get the collection of all processors or queues?
> > > >
> > > > The goal being to iterate over the collection of processors or queues
> > to
> > > > gather information or make adjustments to the flow.
> > > >
> > >
> >
>


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Bryan Bende
I think the only way you could do it directly without the REST API is by
having access to the FlowController,
but that is purposely not exposed to extension points... actually
StandardFlowController is what implements the
EventAccess interface which ends up providing the path way to the status
objects.

I would have to defer to Joe, Mark, and others about whether we would want
to expose direct access to components
through controller services, or some other extension point.

On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer  wrote:

> Bryan,
>
> The ReportingTask enumeration makes sense and was helpful for something
> else I am working on as well.
>
> Like Joe however I'm looking for a way to not just get the *Status objects
> but rather start and stop processors. Is there a way to do that from the
> ReportContext scope? I imagine you could pull the Processor "Id" from the
> ProcessorStatus and then use the REST API but was looking for something
> more direct than having to use the REST API
>
>
> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:
>
> > Hi Joe,
> >
> > I'm not sure if a controller service can do this, but a ReportingTask has
> > access to similar information.
> >
> > A ReportingTask gets access to a ReportingContext, which can access
> > EventAccess which can access ProcessGroupStatus.
> >
> > From ProcessGroupStatus you are at the root process group and can
> enumerate
> > the flow:
> >
> > private Collection connectionStatus = new
> ArrayList<>();
> > private Collection processorStatus = new ArrayList<>();
> > private Collection processGroupStatus = new
> > ArrayList<>();
> > private Collection remoteProcessGroupStatus =
> new
> > ArrayList<>();
> > private Collection inputPortStatus = new ArrayList<>();
> > private Collection outputPortStatus = new ArrayList<>();
> >
> > Not sure if that is what you were looking for.
> >
> > -Bryan
> >
> >
> > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
> >
> > > Is it possible and if so what is the best way for a controller service
> to
> > > get the collection of all processors or queues?
> > >
> > > The goal being to iterate over the collection of processors or queues
> to
> > > gather information or make adjustments to the flow.
> > >
> >
>


[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...

2016-04-22 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi-minifi/pull/15#discussion_r60741952
  
--- Diff: 
minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java 
---
@@ -1379,7 +1430,7 @@ private void saveFile(final InputStream 
configInputStream, File configFile) {
 }
 }
 
-private void performTransformation(InputStream configIs, String 
configDestinationPath) {
+public void performTransformation(InputStream configIs, String 
configDestinationPath) {
--- End diff --

That is true, I will make it package private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...

2016-04-22 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi-minifi/pull/15#discussion_r60741623
  
--- Diff: 
minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java 
---
@@ -1080,15 +,31 @@ public void start() throws IOException, 
InterruptedException {
 
 final File reloadFile = getReloadFile(defaultLogger);
 if (reloadFile.exists()) {
-defaultLogger.info("Currently reloading 
configuration.  Will not restart MiNiFi.");
+defaultLogger.info("Currently reloading 
configuration. Will wait to restart MiNiFi.");
 Thread.sleep(5000L);
 continue;
 }
 
 final boolean previouslyStarted = getNifiStarted();
 if (!previouslyStarted) {
-defaultLogger.info("MiNiFi never started. Will not 
restart MiNiFi");
-return;
+final File swapConfigFile = 
getSwapFile(defaultLogger);
+if(swapConfigFile.exists()){
+defaultLogger.info("Swap file exists, MiNiFi 
failed trying to change configuration. Reverting to old configuration.");
+
+final String confDir = 
getBootstrapProperties().getProperty(CONF_DIR_KEY);
+((MiNiFiConfigurationChangeListener) 
changeListener).performTransformation(new FileInputStream(swapConfigFile), 
confDir);
--- End diff --

I agree, changing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Jeremy Dyer
Bryan,

The ReportingTask enumeration makes sense and was helpful for something
else I am working on as well.

Like Joe however I'm looking for a way to not just get the *Status objects
but rather start and stop processors. Is there a way to do that from the
ReportContext scope? I imagine you could pull the Processor "Id" from the
ProcessorStatus and then use the REST API but was looking for something
more direct than having to use the REST API


On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende  wrote:

> Hi Joe,
>
> I'm not sure if a controller service can do this, but a ReportingTask has
> access to similar information.
>
> A ReportingTask gets access to a ReportingContext, which can access
> EventAccess which can access ProcessGroupStatus.
>
> From ProcessGroupStatus you are at the root process group and can enumerate
> the flow:
>
> private Collection connectionStatus = new ArrayList<>();
> private Collection processorStatus = new ArrayList<>();
> private Collection processGroupStatus = new
> ArrayList<>();
> private Collection remoteProcessGroupStatus = new
> ArrayList<>();
> private Collection inputPortStatus = new ArrayList<>();
> private Collection outputPortStatus = new ArrayList<>();
>
> Not sure if that is what you were looking for.
>
> -Bryan
>
>
> On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:
>
> > Is it possible and if so what is the best way for a controller service to
> > get the collection of all processors or queues?
> >
> > The goal being to iterate over the collection of processors or queues to
> > gather information or make adjustments to the flow.
> >
>


[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi-minifi/pull/15#discussion_r60739792
  
--- Diff: 
minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java 
---
@@ -1379,7 +1430,7 @@ private void saveFile(final InputStream 
configInputStream, File configFile) {
 }
 }
 
-private void performTransformation(InputStream configIs, String 
configDestinationPath) {
+public void performTransformation(InputStream configIs, String 
configDestinationPath) {
--- End diff --

Understand the need for increased visibility but think protected or package 
private would be sufficient.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi-minifi/pull/15#discussion_r60739656
  
--- Diff: 
minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java 
---
@@ -1080,15 +,31 @@ public void start() throws IOException, 
InterruptedException {
 
 final File reloadFile = getReloadFile(defaultLogger);
 if (reloadFile.exists()) {
-defaultLogger.info("Currently reloading 
configuration.  Will not restart MiNiFi.");
+defaultLogger.info("Currently reloading 
configuration. Will wait to restart MiNiFi.");
 Thread.sleep(5000L);
 continue;
 }
 
 final boolean previouslyStarted = getNifiStarted();
 if (!previouslyStarted) {
-defaultLogger.info("MiNiFi never started. Will not 
restart MiNiFi");
-return;
+final File swapConfigFile = 
getSwapFile(defaultLogger);
+if(swapConfigFile.exists()){
+defaultLogger.info("Swap file exists, MiNiFi 
failed trying to change configuration. Reverting to old configuration.");
+
+final String confDir = 
getBootstrapProperties().getProperty(CONF_DIR_KEY);
+((MiNiFiConfigurationChangeListener) 
changeListener).performTransformation(new FileInputStream(swapConfigFile), 
confDir);
--- End diff --

Would prefer to just declare the field of this class to just be a 
MiNiFiConfigurationChangeListener instead of having to do the cast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...

2016-04-22 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi-minifi/pull/15#discussion_r60738272
  
--- Diff: minifi-assembly/pom.xml ---
@@ -261,7 +261,7 @@ limitations under the License.
 
 ./lib
 
-8080
--- End diff --

I switched it because we're planning on getting rid of the UI anyway so 
currently it is primarily for debugging purposes and it's easier to debug when 
it doesn't have a conflicting default port as NiFi. If it makes more sense to 
switch it back I can then I would also change it in the ConfigTransformer 
(defaults to 8081 during transformation).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Enumerating processors, queues, etc.

2016-04-22 Thread Bryan Bende
Hi Joe,

I'm not sure if a controller service can do this, but a ReportingTask has
access to similar information.

A ReportingTask gets access to a ReportingContext, which can access
EventAccess which can access ProcessGroupStatus.

>From ProcessGroupStatus you are at the root process group and can enumerate
the flow:

private Collection connectionStatus = new ArrayList<>();
private Collection processorStatus = new ArrayList<>();
private Collection processGroupStatus = new
ArrayList<>();
private Collection remoteProcessGroupStatus = new
ArrayList<>();
private Collection inputPortStatus = new ArrayList<>();
private Collection outputPortStatus = new ArrayList<>();

Not sure if that is what you were looking for.

-Bryan


On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora  wrote:

> Is it possible and if so what is the best way for a controller service to
> get the collection of all processors or queues?
>
> The goal being to iterate over the collection of processors or queues to
> gather information or make adjustments to the flow.
>


[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...

2016-04-22 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi-minifi/pull/15#discussion_r60736740
  
--- Diff: minifi-assembly/pom.xml ---
@@ -261,7 +261,7 @@ limitations under the License.
 
 ./lib
 
-8080
--- End diff --

Should be converted back to the typical 8080 default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Enumerating processors, queues, etc.

2016-04-22 Thread Joe Skora
Is it possible and if so what is the best way for a controller service to
get the collection of all processors or queues?

The goal being to iterate over the collection of processors or queues to
gather information or make adjustments to the flow.