[GitHub] nifi pull request: NIFI-856 Implements experimental ListenLumberja...
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/290#issuecomment-21320 @trixpan thanks for the info. My reviewing energy is running low for the night but will look to tackle this in the next few days. Thanks again! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1747 add maven-war-plugin to create jar as...
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/340#issuecomment-213666528 Hey @jdye64! This certainly seems to make sense, but not sure I am a fan of the classifiers route. Would specifying a separate execution for the build, such as: ``` xml org.apache.maven.plugins maven-jar-plugin create-jar compile jar ``` This would be easier to rely on from a dependency standpoint in lieu of having to do additionally specify a classifier in the pom descriptor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-856 Implements experimental ListenLumberja...
Github user trixpan commented on the pull request: https://github.com/apache/nifi/pull/290#issuecomment-213663400 Yes. Please note the code will work with minor modifications with filebeat but because of lumberjack v2 protocol still in state of flux (https://github.com/elastic/libbeat/issues/279) I intentionally left it out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1214
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/321#issuecomment-213663078 Hey @ToivoAdams! Sorry for the delays on reviewing. It looks like this was originally done on what is now 0.x and has many of the commits from what is now master incorporated. Would it be possible to please get your commits/branch rebased against master? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-856 Implements experimental ListenLumberja...
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/290#issuecomment-213662697 @trixpan Hey, thanks for the contribution, sorry we've been a little slow getting around to it. In terms of testing this out, is it just a matter of using logstash forwarder (https://github.com/elastic/logstash-forwarder) which is built upon the lumberjack protocol (https://github.com/elastic/logstash-forwarder/blob/master/PROTOCOL.md)? Thanks again and will start digging in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[NOTICE] jira lockdown
Hi folks! Just a quick heads-up that the ASF JIRA is currently locked down to counter a spam attack. Unfortunately, this lock down prevents our normal open-policy that allows anyone with a JIRA account to create, assign, and comment on issues. If you are caught up in this, please drop me a note either on or off list with your JIRA user name and I'll get you added to a formal JIRA role so that you can interact with the NiFi project. -Sean
[GitHub] nifi pull request: NIFI-1594: Add option to bulk using Index or Up...
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/255#issuecomment-213661619 Hey @joaohf! Looks like @mattyb149 has already started giving some input but saw that there are a few test failures concerning expression language such as the one below (full sample log is available at: https://s3.amazonaws.com/archive.travis-ci.org/jobs/114414795/log.txt). Haven't dug deep into the tests but wanted to call it out to your attention to get a fix for later incorporation. Thanks! ``` testPutElasticSearchOnTrigger(org.apache.nifi.processors.elasticsearch.TestPutElasticsearch) Time elapsed: 0.01 sec <<< FAILURE! java.lang.AssertionError: java.lang.IllegalStateException: Attempting to retrieve value of PropertyDescriptor[Index Operation] without first evaluating Expressions, even though the PropertyDescriptor indicates that the Expression Language is Supported. If you realize that this is the case and do not want this error to occur, it can be disabled by calling TestRunner.setValidateExpressionUsage(false) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/238#issuecomment-213660402 Overall, seems to function well with some minor adjustments needed. Would like to hash out some of the items with managing state. Thanks for tackling this, we are certainly overdue for such functionality and I know many will be pleased to see this coming out in the next release. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/238#discussion_r60822522 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"Amazon", "S3", "AWS", "list"}) +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" + +"+ \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" + +"+ \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" + +"+ \"all of the data.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, " ++ "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after " ++ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary " ++ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ +@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), +@WritesAttribute(attribute = "filename", description = "The name of the file"), +@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), +@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), +@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), +@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) +public class ListS3 extends
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/238#discussion_r60822467 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java --- @@ -46,7 +46,7 @@ import com.amazonaws.services.s3.model.S3Object; @SupportsBatching --- End diff -- Should also be a @TriggerSerially given the state you have below and the inconsistencies that could be generated in multithreaded scenarios. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/238#discussion_r60822400 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"Amazon", "S3", "AWS", "list"}) +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" + +"+ \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" + +"+ \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" + +"+ \"all of the data.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, " ++ "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after " ++ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary " ++ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ +@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), +@WritesAttribute(attribute = "filename", description = "The name of the file"), +@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), +@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), +@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), +@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) +public class ListS3 extends
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/238#discussion_r60822342 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"Amazon", "S3", "AWS", "list"}) +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" + +"+ \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" + +"+ \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" + +"+ \"all of the data.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, " ++ "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after " ++ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary " ++ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ +@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), +@WritesAttribute(attribute = "filename", description = "The name of the file"), +@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), +@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), +@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), +@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) +public class ListS3 extends
Re: Enumerating processors, queues, etc.
Ok cool. From the the 'gathering of the raw materials of knowledge perspective' - If the things you want internal access to are already exposed via the REST API then we're definitely safe on the threading side in so far as the things I was worried about when considering internal access. If we focus on the examples you mention in both 3 and 4 I believe these are examples which would be fulfilled by interacting through the REST API. For item 3 while there may be some server side backing configuration values and status data the client side will be the thing rendering/displaying colors. For item 4 which is about interacting with the application (start/stop/etc...) these are things which would need to occur through the REST API because that is the cluster-wide coordination mechanism/layer. How it is designed/intended today would be that server side functions (controller services or reporting tasks) could be used to set/establish server side knowledge of behavior and the client would get access to this at runtime and behave as it needs given this knowledge. If the client is a visual UI then it could, for instance, render things differently. If the client is something automated invoking the REST API endpoints then it could do things like 'start' or 'stop' or alter the flow. The ideas you're presenting which make for a more engaging user experience definitely make sense to me and we definitely should do more to make these happen. I'm just pointing out that they sound less like Controller Service or Reporting Task type things and more like data we should expose via the REST API. This would allow clients be they services or browsers to take whatever action they might want to. Thanks Joe On Fri, Apr 22, 2016 at 10:31 PM, Joe Skorawrote: > Joe W, > > The use case that originally got me thinking about this was a processor > highlighter that looks for processors above or below configured thresholds, > possibly by instance or type. I think that requires the ability to > >1. enumerate the processor and/or queue collections, >2. query each processor or queue for stats like those shown in the UI, >3. highlight the processor some how, by changing color for instance, and >4. (possibly) affect the processor by stopping / starting it. > > I realize this exposes system internal and threading concerns, but the REST > API already provides the information externally. Any controller service > using this information must be designed to not have a negative impact on > the system, but that's already true of any custom processor or controller > service since they can overload or lock up the framework if they behave > badly. > > Overall, I think the visibility into the data flows, hot/cold spots, larger > than expected ingests, etc. provides value that would far outweigh concerns > about any new risk this capability would create. > > Thanks, > Joe S > > On Fri, Apr 22, 2016 at 2:25 PM, Joe Witt wrote: > >> Yeah understood. So let's dig into this more. We need to avoid over >> exposure of internal state which one might want to crawl through >> because that introduces some multi-threaded challenges and could limit >> our ability to evolve internals. However, if we understand the >> questions you'd like to be able to ask of certain things better >> perhaps we can better expose those results. >> >> Can you try stating what you're looking for in a bit more specific >> examples. For instance you said "want to iterate over the processor >> collections...to look for performance thresholds" - What sorts of >> performance threshold questions? >> >> On Fri, Apr 22, 2016 at 2:20 PM, Joe Skora wrote: >> > Joe Witt - Not really, this kind of went sideways from where I was >> > originally headed. >> > >> > I'm looking for a way for a controller service to iterate over the >> > processor and queue collections (maybe others as well) to look for >> > performance thresholds or other issues and then provide feedback somehow >> or >> > report externally. >> > >> > If it can be done through the REST API, seems like it should be possible >> > from within the framework as well. >> > >> > On Fri, Apr 22, 2016 at 1:32 PM, Joe Witt wrote: >> > >> >> Joe Skora - does Jeremy's JIRA cover your use case needs? >> >> >> >> On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer wrote: >> >> > Mark, >> >> > >> >> > ok that makes sense. I have created a jira for this improvement >> >> > https://issues.apache.org/jira/browse/NIFI-1805 >> >> > >> >> > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne >> >> wrote: >> >> > >> >> >> Jeremy, >> >> >> >> >> >> It should be relatively easy. In FlowController, we would have to >> update >> >> >> getGroupStatus() to set the values on ConnectionStatus >> >> >> and of course update ConnectionStatus to have getters & setters for >> the >> >> >> new values. That should be about it, I think. >>
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/238#discussion_r60822275 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"Amazon", "S3", "AWS", "list"}) +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" + +"+ \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" + +"+ \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" + +"+ \"all of the data.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, " ++ "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after " ++ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary " ++ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ +@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), +@WritesAttribute(attribute = "filename", description = "The name of the file"), +@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), +@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), +@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), +@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) +public class ListS3 extends
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/238#discussion_r60822174 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"Amazon", "S3", "AWS", "list"}) +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" + --- End diff -- There are some extraneous \s and +s that were folded into your description presumably from the IDE auto escaping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Enumerating processors, queues, etc.
Joe W, The use case that originally got me thinking about this was a processor highlighter that looks for processors above or below configured thresholds, possibly by instance or type. I think that requires the ability to 1. enumerate the processor and/or queue collections, 2. query each processor or queue for stats like those shown in the UI, 3. highlight the processor some how, by changing color for instance, and 4. (possibly) affect the processor by stopping / starting it. I realize this exposes system internal and threading concerns, but the REST API already provides the information externally. Any controller service using this information must be designed to not have a negative impact on the system, but that's already true of any custom processor or controller service since they can overload or lock up the framework if they behave badly. Overall, I think the visibility into the data flows, hot/cold spots, larger than expected ingests, etc. provides value that would far outweigh concerns about any new risk this capability would create. Thanks, Joe S On Fri, Apr 22, 2016 at 2:25 PM, Joe Wittwrote: > Yeah understood. So let's dig into this more. We need to avoid over > exposure of internal state which one might want to crawl through > because that introduces some multi-threaded challenges and could limit > our ability to evolve internals. However, if we understand the > questions you'd like to be able to ask of certain things better > perhaps we can better expose those results. > > Can you try stating what you're looking for in a bit more specific > examples. For instance you said "want to iterate over the processor > collections...to look for performance thresholds" - What sorts of > performance threshold questions? > > On Fri, Apr 22, 2016 at 2:20 PM, Joe Skora wrote: > > Joe Witt - Not really, this kind of went sideways from where I was > > originally headed. > > > > I'm looking for a way for a controller service to iterate over the > > processor and queue collections (maybe others as well) to look for > > performance thresholds or other issues and then provide feedback somehow > or > > report externally. > > > > If it can be done through the REST API, seems like it should be possible > > from within the framework as well. > > > > On Fri, Apr 22, 2016 at 1:32 PM, Joe Witt wrote: > > > >> Joe Skora - does Jeremy's JIRA cover your use case needs? > >> > >> On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer wrote: > >> > Mark, > >> > > >> > ok that makes sense. I have created a jira for this improvement > >> > https://issues.apache.org/jira/browse/NIFI-1805 > >> > > >> > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne > >> wrote: > >> > > >> >> Jeremy, > >> >> > >> >> It should be relatively easy. In FlowController, we would have to > update > >> >> getGroupStatus() to set the values on ConnectionStatus > >> >> and of course update ConnectionStatus to have getters & setters for > the > >> >> new values. That should be about it, I think. > >> >> > >> >> -Mark > >> >> > >> >> > >> >> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer > wrote: > >> >> > > >> >> > Mark, > >> >> > > >> >> > What would the process look like for doing that? Would that be > >> something > >> >> > trivial or require some reworking? > >> >> > > >> >> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne > > >> >> wrote: > >> >> > > >> >> >> I definitely don't think we should be exposing the FlowController > to > >> a > >> >> >> Reporting Task. > >> >> >> However, I think exposing information about whether or not > >> backpressure > >> >> is > >> >> >> being applied > >> >> >> (or even is configured) is a very reasonable idea. > >> >> >> > >> >> >> -Mark > >> >> >> > >> >> >> > >> >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer > wrote: > >> >> >>> > >> >> >>> I could see the argument for not making that available. What > about > >> some > >> >> >>> sort of reference that would allow the ReportingTask to to > >> determine if > >> >> >>> backpressure is being applied to a Connection? It currently seems > >> you > >> >> can > >> >> >>> see the number of bytes and/or objects count queued in a > connection > >> but > >> >> >>> don't have any reference to the values a user has setup for > >> >> backpressure > >> >> >> in > >> >> >>> the UI. Is there a way to get those values in the scope of the > >> >> >>> ReportingTask? > >> >> >>> > >> >> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende > >> >> wrote: > >> >> >>> > >> >> I think the only way you could do it directly without the REST > API > >> is > >> >> by > >> >> having access to the FlowController, > >> >> but that is purposely not exposed to extension points... > actually > >> >> StandardFlowController is what implements the > >> >> EventAccess interface which ends up
[GitHub] nifi pull request: NIFI-840: Create ListS3 processor
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/238#issuecomment-213652173 reviewing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: added voltatile provenance repository docstring
GitHub user randerzander opened a pull request: https://github.com/apache/nifi/pull/378 added voltatile provenance repository docstring You can merge this pull request into a Git repository by running: $ git pull https://github.com/randerzander/nifi master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/378.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #378 commit d68956320af1d3c905bcdc7ea7e2fe45bf88b956 Author: Randy GelhausenDate: 2016-04-22T21:41:29Z added voltatile provenance repository docstring --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Enhancement for NIFI-1045 : Add "backup suffix"...
Github user PuspenduBanerjee closed the pull request at: https://github.com/apache/nifi/pull/230 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NiFi-924: Camel integration
Github user PuspenduBanerjee commented on the pull request: https://github.com/apache/nifi/pull/219#issuecomment-213595825 @joewitt @olegz Thank you all and sorry for the delay in reply, was dead busy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NiFi-924: Camel integration
Github user PuspenduBanerjee closed the pull request at: https://github.com/apache/nifi/pull/219 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/366#discussion_r60797930 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.MockSessionFactory; +import org.apache.nifi.util.SharedSessionState; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class AbstractKafkaProcessorLifecycelTest { + +@Test +public void validateBaseProperties() throws Exception { +TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class); +runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, ""); +runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo"); +runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + +try { +runner.assertValid(); +fail(); +} catch (AssertionError e) { +assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); +} + +runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo"); +try { +runner.assertValid(); +fail(); +} catch (AssertionError e) { +assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid")); +} +runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234"); + +runner.removeProperty(ConsumeKafka.TOPIC); +try { +runner.assertValid(); +fail(); +} catch (AssertionError e) { +assertTrue(e.getMessage().contains("'topic' is invalid because topic is required")); +} + +runner.setProperty(ConsumeKafka.TOPIC, ""); +try { +runner.assertValid(); +fail(); +} catch (AssertionError e) { +assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); +} + +runner.setProperty(ConsumeKafka.TOPIC, " ");
[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/366#discussion_r60792649 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java --- @@ -78,152 +81,182 @@ } /** - * - */ -void setProcessLog(ProcessorLog processLog) { -this.processLog = processLog; -} - -/** - * Publishes messages to Kafka topic. It supports three publishing - * mechanisms. + * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to + * determine how many messages to Kafka will be sent from a provided + * {@link InputStream} (see {@link PublishingContext#getContentStream()}). + * It supports two publishing modes: * - * Sending the entire content stream as a single Kafka message. - * Splitting the incoming content stream into chunks and sending - * individual chunks as separate Kafka messages. - * Splitting the incoming content stream into chunks and sending only - * the chunks that have failed previously @see - * {@link SplittableMessageContext#getFailedSegments()}. + * Sending all messages constructed from + * {@link StreamDemarcator#nextToken()} operation. + * Sending only unacknowledged messages constructed from + * {@link StreamDemarcator#nextToken()} operation. * + * The unacknowledged messages are determined from the value of + * {@link PublishingContext#getLastAckedMessageIndex()}. + * * This method assumes content stream affinity where it is expected that the * content stream that represents the same Kafka message(s) will remain the * same across possible retries. This is required specifically for cases * where delimiter is used and a single content stream may represent - * multiple Kafka messages. The failed segment list will keep the index of - * of each content stream segment that had failed to be sent to Kafka, so - * upon retry only the failed segments are sent. + * multiple Kafka messages. The + * {@link PublishingContext#getLastAckedMessageIndex()} will provide the + * index of the last ACKed message, so upon retry only messages with the + * higher index are sent. * - * @param messageContext - *instance of {@link SplittableMessageContext} which hold - *context information about the message to be sent - * @param contentStream - *instance of open {@link InputStream} carrying the content of - *the message(s) to be send to Kafka - * @param partitionKey - *the value of the partition key. Only relevant is user wishes - *to provide a custom partition key instead of relying on - *variety of provided {@link Partitioner}(s) - * @param maxBufferSize maximum message size - * @return The set containing the failed segment indexes for messages that - * failed to be sent to Kafka. + * @param publishingContext + *instance of {@link PublishingContext} which hold context + *information about the message(s) to be sent. + * @return The index of the last successful offset. */ -BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, -int maxBufferSize) { -ListsendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize); -return this.publish(sendFutures); +KafkaPublisherResult publish(PublishingContext publishingContext) { +StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), +publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); + +int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); +List resultFutures = new ArrayList<>(); + +byte[] messageBytes; +int tokenCounter = 0; +for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { +if (prevLastAckedMessageIndex < tokenCounter) { +Integer partitionId = publishingContext.getPartitionId(); +if (partitionId == null && publishingContext.getKeyBytes() != null) { +partitionId = this.getPartition(publishingContext.getKeyBytes(), publishingContext.getTopic()); +} +ProducerRecord message = +new
[GitHub] nifi pull request: NIFI-1678
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/323 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1678
Github user mcgilman commented on the pull request: https://github.com/apache/nifi/pull/323#issuecomment-213553667 Functionally this is much better than before. Now when running clustered and we start losing nodes we are able to see the current cluster participation. However, once we have lost the quorum we are no longer able to update the Primary Node. This behavior is expected, however. Once the quorum is re-established the Primary is updated accordingly. As we move towards zero master clustering, when there is no quorum we will return appropriate response codes. Looks good. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] From Contributor to Committer
Agreed. Unless there is any objection I plan to assume lazy consensus and remove the draft marking tomorrow or shortly thereafter. On Fri, Apr 22, 2016 at 2:53 PM, Sean Busbeywrote: > FWIW, I don't read the current draft as requiring that someone > strictly progress contributor -> committer -> PMC. I do read it as > stating that the evaluation of project activity should normally be > over some period of time for committers and a longer period of time > for PMC status (e.g. maybe we miss someone who's behaving like a > committer over a normal 3 month period, but we catch it at 6. At that > point they've been around as long as a PMC so we skip the committer > step). > > Of course, I might be biased by the fact that I skipped directly to > PMC status. :) > > On Wed, Apr 20, 2016 at 5:23 PM, Joe Witt wrote: >> Bryan, >> >> That is a good point and I do think we should avoid putting such a >> rule/requirement in place. However, practically speaking I would >> imagine that is how it will play out most often. >> >> I've taken the comments of this thread largely based on Tony's >> excellent start and created a draft wiki page for it here [1]. If >> discussion remains on track then I'll assume lazy consensus and remove >> the draft notice. >> >> [1] >> https://cwiki.apache.org/confluence/display/NIFI/Progression+from+user+to+Project+Management+Committee >> >> Thanks >> Joe >> >> On Wed, Apr 20, 2016 at 12:05 PM, Bryan Bende wrote: >>> So are we saying as a community that a contributor has to first become a >>> committer, and then only after continued consistent engagement could then >>> be considered for PMC? >>> >>> I don't have any issue with that approach, although it is not exactly what >>> I thought when we first created the two tiers. >>> >>> On Wed, Apr 20, 2016 at 11:10 AM, Joe Witt wrote: >>> Tony, There appears to be consensus around these thoughts. Perhaps we should document this on a Wiki page? I think generally for committer status it would be good to see a number of these things for a period of time and then for PMC status to see those contributions continue and ideally expand for a longer duration. Another few months? Thanks Joe On Wed, Apr 13, 2016 at 2:58 PM, Joe Witt wrote: > Tony, > > I agree with the points you raise and the completeness of the > perspective you share. I do think we should add to that a focus on > licensing and legal aspects. > > - The individual has shown an effort to aid the community in producing > release which are consistent with ASF licensing requirements and the > guidance followed in the Apache NiFi community to adhere to those > policies. This understanding could be shown when introducing new > dependencies (including transitive) by ensuring that all licensing and > notice updates have occurred. Another good example is flagging > potentially copyrighted or insufficiently cited items like Skora found > recently in the Kafka tests. One of our most important jobs as a > community is to put out legal releases and that is certainly a team > effort! > > Thanks > Joe > > On Sun, Apr 10, 2016 at 10:56 PM, Sean Busbey wrote: >> Thanks for starting this Tony! >> >> As a PMC member, I really try to focus on things that help the >> community where we tend to have limited bandwidth: reviews weigh >> heavily, as does helping out new folks on users@, and doing public >> talking/workshops. >> >> I also am inclined to vote in favor of folks who show the kind of >> project dedication that we expect from PMC members. While we still >> need to do a better job of describing those things, at the moment I'm >> thinking of things like voting on release candidates, watching out for >> our trademarks, and investing the time needed to handle our licensing >> responsibilities. >> >> On Sun, Apr 10, 2016 at 9:38 AM, Tony Kurc wrote: >>> First off, I recommend this reading this page to understand what the Apache >>> NiFi PMC draws from when making a decision >>> >>> http://community.apache.org/contributors/index.html >>> >>> I thought it would be helpful for me to walk through how I interpret that >>> guidance, and what that means for NiFi. For those that didn't read, there >>> are four aspects of contribution that are worth considering someone for >>> committership: community, project, documentation and code. Really, the >>> committer decision comes down to: has this person built up enough merit in >>> the community that I have a high degree of confidence that I trust him/her >>> with write access to the code and
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60782947 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.manager.impl; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.node.Node.Status; +import org.apache.nifi.cluster.protocol.ConnectionRequest; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebClusterManagerCoordinator implements ClusterCoordinator { +private static final Logger logger = LoggerFactory.getLogger(WebClusterManagerCoordinator.class); + +private final WebClusterManager clusterManager; + +public WebClusterManagerCoordinator(final WebClusterManager clusterManager) { +this.clusterManager = clusterManager; +} + +@Override +public void requestNodeConnect(final NodeIdentifier nodeId) { +final Node node = clusterManager.getRawNode(nodeId.getId()); + +if (node == null) { +final ConnectionRequest connectionRequest = new ConnectionRequest(nodeId); +clusterManager.requestConnection(connectionRequest); +} else { +node.setStatus(Status.DISCONNECTED); +clusterManager.requestReconnection(nodeId.getId(), "Anonymous"); +} +} + +@Override +public void finishNodeConnection(final NodeIdentifier nodeId) { +final Node node = clusterManager.getRawNode(nodeId.getId()); +if (node == null) { +logger.error("Attempting to Finish Node Connection but could not find Node with Identifier {}", nodeId); +return; +} + +node.setStatus(Status.CONNECTED); +} + +@Override +public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { +try { +clusterManager.requestDisconnection(nodeId, false, explanation); + +if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) { +final Node node = clusterManager.getRawNode(nodeId.getId()); +if (node != null) { +node.setHeartbeatDisconnection(); +} +} +} catch (final Exception e) { +logger.error("Failed to request node {} disconnect from cluster due to {}", nodeId, e); +logger.error("", e); --- End diff -- The first line logs a toString() of the Exception, whereas the second line logs the stack trace. Personally, I find the log messages easier to read this way when someone copies & pastes it, as they often do not look to grab a stack trace but if the toString() of the Exception is part of the original message, it makes it more clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60782492 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { +private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); +private static final Unmarshaller unmarshaller; + +private final ClusterCoordinator clusterCoordinator; +private final ZooKeeperClientConfig zkClientConfig; +private final String heartbeatPath; +private final int heartbeatIntervalMillis; + +private volatile CuratorFramework curatorClient; +private volatile ScheduledFuture future; +private volatile MaplatestHeartbeatMessages; +private volatile long latestHeartbeatTime; + +private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + +static { +try { +final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); +unmarshaller = jaxbContext.createUnmarshaller(); +} catch (final Exception e) { +throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); +} +} + +public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { +this.clusterCoordinator = clusterCoordinator; +this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); +this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + +final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, +NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + +this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); +} + +@Override +public void start() { +final RetryPolicy retryPolicy = new RetryForever(5000); +curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), +
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60782403 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { +private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); +private static final Unmarshaller unmarshaller; + +private final ClusterCoordinator clusterCoordinator; +private final ZooKeeperClientConfig zkClientConfig; +private final String heartbeatPath; +private final int heartbeatIntervalMillis; + +private volatile CuratorFramework curatorClient; +private volatile ScheduledFuture future; +private volatile MaplatestHeartbeatMessages; +private volatile long latestHeartbeatTime; + +private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + +static { +try { +final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); +unmarshaller = jaxbContext.createUnmarshaller(); +} catch (final Exception e) { +throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); +} +} + +public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { +this.clusterCoordinator = clusterCoordinator; +this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); +this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + +final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, +NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + +this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); +} + +@Override +public void start() { +final RetryPolicy retryPolicy = new RetryForever(5000); +curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), +
Re: Enumerating processors, queues, etc.
Yeah understood. So let's dig into this more. We need to avoid over exposure of internal state which one might want to crawl through because that introduces some multi-threaded challenges and could limit our ability to evolve internals. However, if we understand the questions you'd like to be able to ask of certain things better perhaps we can better expose those results. Can you try stating what you're looking for in a bit more specific examples. For instance you said "want to iterate over the processor collections...to look for performance thresholds" - What sorts of performance threshold questions? On Fri, Apr 22, 2016 at 2:20 PM, Joe Skorawrote: > Joe Witt - Not really, this kind of went sideways from where I was > originally headed. > > I'm looking for a way for a controller service to iterate over the > processor and queue collections (maybe others as well) to look for > performance thresholds or other issues and then provide feedback somehow or > report externally. > > If it can be done through the REST API, seems like it should be possible > from within the framework as well. > > On Fri, Apr 22, 2016 at 1:32 PM, Joe Witt wrote: > >> Joe Skora - does Jeremy's JIRA cover your use case needs? >> >> On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer wrote: >> > Mark, >> > >> > ok that makes sense. I have created a jira for this improvement >> > https://issues.apache.org/jira/browse/NIFI-1805 >> > >> > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne >> wrote: >> > >> >> Jeremy, >> >> >> >> It should be relatively easy. In FlowController, we would have to update >> >> getGroupStatus() to set the values on ConnectionStatus >> >> and of course update ConnectionStatus to have getters & setters for the >> >> new values. That should be about it, I think. >> >> >> >> -Mark >> >> >> >> >> >> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer wrote: >> >> > >> >> > Mark, >> >> > >> >> > What would the process look like for doing that? Would that be >> something >> >> > trivial or require some reworking? >> >> > >> >> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne >> >> wrote: >> >> > >> >> >> I definitely don't think we should be exposing the FlowController to >> a >> >> >> Reporting Task. >> >> >> However, I think exposing information about whether or not >> backpressure >> >> is >> >> >> being applied >> >> >> (or even is configured) is a very reasonable idea. >> >> >> >> >> >> -Mark >> >> >> >> >> >> >> >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer wrote: >> >> >>> >> >> >>> I could see the argument for not making that available. What about >> some >> >> >>> sort of reference that would allow the ReportingTask to to >> determine if >> >> >>> backpressure is being applied to a Connection? It currently seems >> you >> >> can >> >> >>> see the number of bytes and/or objects count queued in a connection >> but >> >> >>> don't have any reference to the values a user has setup for >> >> backpressure >> >> >> in >> >> >>> the UI. Is there a way to get those values in the scope of the >> >> >>> ReportingTask? >> >> >>> >> >> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende >> >> wrote: >> >> >>> >> >> I think the only way you could do it directly without the REST API >> is >> >> by >> >> having access to the FlowController, >> >> but that is purposely not exposed to extension points... actually >> >> StandardFlowController is what implements the >> >> EventAccess interface which ends up providing the path way to the >> >> status >> >> objects. >> >> >> >> I would have to defer to Joe, Mark, and others about whether we >> would >> >> >> want >> >> to expose direct access to components >> >> through controller services, or some other extension point. >> >> >> >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer >> >> wrote: >> >> >> >> > Bryan, >> >> > >> >> > The ReportingTask enumeration makes sense and was helpful for >> >> something >> >> > else I am working on as well. >> >> > >> >> > Like Joe however I'm looking for a way to not just get the *Status >> >> objects >> >> > but rather start and stop processors. Is there a way to do that >> from >> >> >> the >> >> > ReportContext scope? I imagine you could pull the Processor "Id" >> from >> >> >> the >> >> > ProcessorStatus and then use the REST API but was looking for >> >> something >> >> > more direct than having to use the REST API >> >> > >> >> > >> >> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende >> >> wrote: >> >> > >> >> >> Hi Joe, >> >> >> >> >> >> I'm not sure if a controller service can do this, but a >> >> ReportingTask >> >> has >> >> >> access to similar information. >> >> >> >> >> >> A ReportingTask gets access
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60782213 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { +private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); +private static final Unmarshaller unmarshaller; + +private final ClusterCoordinator clusterCoordinator; +private final ZooKeeperClientConfig zkClientConfig; +private final String heartbeatPath; +private final int heartbeatIntervalMillis; + +private volatile CuratorFramework curatorClient; +private volatile ScheduledFuture future; +private volatile MaplatestHeartbeatMessages; +private volatile long latestHeartbeatTime; + +private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + +static { +try { +final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); +unmarshaller = jaxbContext.createUnmarshaller(); +} catch (final Exception e) { +throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); +} +} + +public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { +this.clusterCoordinator = clusterCoordinator; +this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); +this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + +final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, +NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + +this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); +} + +@Override +public void start() { +final RetryPolicy retryPolicy = new RetryForever(5000); +curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), +
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60782150 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { +private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); +private static final Unmarshaller unmarshaller; + +private final ClusterCoordinator clusterCoordinator; +private final ZooKeeperClientConfig zkClientConfig; +private final String heartbeatPath; +private final int heartbeatIntervalMillis; + +private volatile CuratorFramework curatorClient; +private volatile ScheduledFuture future; +private volatile MaplatestHeartbeatMessages; +private volatile long latestHeartbeatTime; + +private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + +static { +try { +final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); +unmarshaller = jaxbContext.createUnmarshaller(); +} catch (final Exception e) { +throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); +} +} + +public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { +this.clusterCoordinator = clusterCoordinator; +this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); +this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + +final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, +NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + +this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); +} + +@Override +public void start() { +final RetryPolicy retryPolicy = new RetryForever(5000); +curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), +
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60781637 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { +private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); +private static final Unmarshaller unmarshaller; + +private final ClusterCoordinator clusterCoordinator; +private final ZooKeeperClientConfig zkClientConfig; +private final String heartbeatPath; +private final int heartbeatIntervalMillis; + +private volatile CuratorFramework curatorClient; +private volatile ScheduledFuture future; +private volatile MaplatestHeartbeatMessages; +private volatile long latestHeartbeatTime; + +private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + +static { +try { +final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); +unmarshaller = jaxbContext.createUnmarshaller(); +} catch (final Exception e) { +throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); +} +} + +public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { +this.clusterCoordinator = clusterCoordinator; +this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); +this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + +final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, +NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + +this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); +} + +@Override +public void start() { +final RetryPolicy retryPolicy = new RetryForever(5000); +curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), +
Re: Enumerating processors, queues, etc.
Joe Witt - Not really, this kind of went sideways from where I was originally headed. I'm looking for a way for a controller service to iterate over the processor and queue collections (maybe others as well) to look for performance thresholds or other issues and then provide feedback somehow or report externally. If it can be done through the REST API, seems like it should be possible from within the framework as well. On Fri, Apr 22, 2016 at 1:32 PM, Joe Wittwrote: > Joe Skora - does Jeremy's JIRA cover your use case needs? > > On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyer wrote: > > Mark, > > > > ok that makes sense. I have created a jira for this improvement > > https://issues.apache.org/jira/browse/NIFI-1805 > > > > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne > wrote: > > > >> Jeremy, > >> > >> It should be relatively easy. In FlowController, we would have to update > >> getGroupStatus() to set the values on ConnectionStatus > >> and of course update ConnectionStatus to have getters & setters for the > >> new values. That should be about it, I think. > >> > >> -Mark > >> > >> > >> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer wrote: > >> > > >> > Mark, > >> > > >> > What would the process look like for doing that? Would that be > something > >> > trivial or require some reworking? > >> > > >> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne > >> wrote: > >> > > >> >> I definitely don't think we should be exposing the FlowController to > a > >> >> Reporting Task. > >> >> However, I think exposing information about whether or not > backpressure > >> is > >> >> being applied > >> >> (or even is configured) is a very reasonable idea. > >> >> > >> >> -Mark > >> >> > >> >> > >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer wrote: > >> >>> > >> >>> I could see the argument for not making that available. What about > some > >> >>> sort of reference that would allow the ReportingTask to to > determine if > >> >>> backpressure is being applied to a Connection? It currently seems > you > >> can > >> >>> see the number of bytes and/or objects count queued in a connection > but > >> >>> don't have any reference to the values a user has setup for > >> backpressure > >> >> in > >> >>> the UI. Is there a way to get those values in the scope of the > >> >>> ReportingTask? > >> >>> > >> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende > >> wrote: > >> >>> > >> I think the only way you could do it directly without the REST API > is > >> by > >> having access to the FlowController, > >> but that is purposely not exposed to extension points... actually > >> StandardFlowController is what implements the > >> EventAccess interface which ends up providing the path way to the > >> status > >> objects. > >> > >> I would have to defer to Joe, Mark, and others about whether we > would > >> >> want > >> to expose direct access to components > >> through controller services, or some other extension point. > >> > >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer > >> wrote: > >> > >> > Bryan, > >> > > >> > The ReportingTask enumeration makes sense and was helpful for > >> something > >> > else I am working on as well. > >> > > >> > Like Joe however I'm looking for a way to not just get the *Status > >> objects > >> > but rather start and stop processors. Is there a way to do that > from > >> >> the > >> > ReportContext scope? I imagine you could pull the Processor "Id" > from > >> >> the > >> > ProcessorStatus and then use the REST API but was looking for > >> something > >> > more direct than having to use the REST API > >> > > >> > > >> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende > >> wrote: > >> > > >> >> Hi Joe, > >> >> > >> >> I'm not sure if a controller service can do this, but a > >> ReportingTask > >> has > >> >> access to similar information. > >> >> > >> >> A ReportingTask gets access to a ReportingContext, which can > access > >> >> EventAccess which can access ProcessGroupStatus. > >> >> > >> >> From ProcessGroupStatus you are at the root process group and can > >> > enumerate > >> >> the flow: > >> >> > >> >> private Collection connectionStatus = new > >> > ArrayList<>(); > >> >> private Collection processorStatus = new > >> ArrayList<>(); > >> >> private Collection processGroupStatus = new > >> >> ArrayList<>(); > >> >> private Collection > >> remoteProcessGroupStatus > >> >> = > >> > new > >> >> ArrayList<>(); > >> >> private Collection inputPortStatus = new > ArrayList<>(); > >> >> private Collection outputPortStatus = new > ArrayList<>(); > >> >> > >> >> Not sure if that is what you were looking for. > >>
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60781488 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { +private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); +private static final Unmarshaller unmarshaller; + +private final ClusterCoordinator clusterCoordinator; +private final ZooKeeperClientConfig zkClientConfig; +private final String heartbeatPath; +private final int heartbeatIntervalMillis; + +private volatile CuratorFramework curatorClient; +private volatile ScheduledFuture future; +private volatile MaplatestHeartbeatMessages; +private volatile long latestHeartbeatTime; + +private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + +static { +try { +final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); +unmarshaller = jaxbContext.createUnmarshaller(); +} catch (final Exception e) { +throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); +} +} + +public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { +this.clusterCoordinator = clusterCoordinator; +this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); +this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + +final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, +NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + +this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); +} + +@Override +public void start() { +final RetryPolicy retryPolicy = new RetryForever(5000); +curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), +
[GitHub] nifi pull request: NIFI-1678
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60781310 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.protocol.jaxb.message; + +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; + +public class AdaptedNodeConnectionStatus { +private NodeConnectionState state; +private DisconnectionCode disconnectCode; +private String disconnectReason; +private Long connectionRequestTime; + +public AdaptedNodeConnectionStatus() { +} --- End diff -- Nope. Removing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Monitoring duplicate file entries with ExecuteSQL
You need to quote the attribute in your SQL: '${filename}' to produce valid SQL. One alternative to this would be the use ListFile -> FetchFile together. With that pattern, ListFile maintains the state of which files have been processed for you within nifi, meaning you wouldn't need the separate MySQL log, so could reduce your complexity of you don't need the SQL log for anything else. Simon > On 22 Apr 2016, at 18:22, mfzeidanwrote: > > I am trying to use a ExecutesSQL process to check if a file has already been > processed. > > Files are flowing into our nifi and we have it set up to move files to > another folder. Once the file is moved, we just log the history into MYSQL. > > I want to have it check if the filename is already in the db, thus checking > if we have already processed this file. > > SELECT * FROM tbl WHERE ${filename} = File_Name(this is the column header in > our MYSQL.) > > I have yet to get anything besides an error being generated saying my sql > statement is incorrect. Am I going about this the wrong way? Does anyone > have a better way of checking this sort of thing? I have added in future > file names(as the file names are predictable) but it still will give me an > error. > > Thanks > > > > -- > View this message in context: > http://apache-nifi-developer-list.39713.n7.nabble.com/Monitoring-duplicate-file-entries-with-ExecuteSQL-tp9490.html > Sent from the Apache NiFi Developer List mailing list archive at Nabble.com. >
[GitHub] nifi pull request: NIFI-1805
Github user jdye64 commented on the pull request: https://github.com/apache/nifi/pull/377#issuecomment-213528810 It might make more sense to leave those sort of operations to a ReportingTask however? Does anyone have any thoughts around that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1805
Github user jdye64 commented on the pull request: https://github.com/apache/nifi/pull/377#issuecomment-213528547 @aperepel that isn't a bad idea. That takes a little more discussion around how often we continue to post that bulletin if the threshold is constantly exceeded, what severity, etc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1805
GitHub user jdye64 opened a pull request: https://github.com/apache/nifi/pull/377 NIFI-1805 Expose BackPressureObjectThreshold and BackPressureDataSizeThreshold to ConnectionStatus You can merge this pull request into a Git repository by running: $ git pull https://github.com/jdye64/nifi NIFI-1805 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/377.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #377 commit 51505bd34f46ea55f0e807ab1d4e764977421aa6 Author: Jeremy DyerDate: 2016-04-22T17:28:29Z NIFI-1805 Expose BackPressureObjectThreshold and BackPressureDataSizeThreshold to ConnectionStatus --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1805
Github user aperepel commented on the pull request: https://github.com/apache/nifi/pull/377#issuecomment-213526177 I wonder if, upon crossing a threshold, we should raise a bulletin maybe? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Enumerating processors, queues, etc.
Joe Skora - does Jeremy's JIRA cover your use case needs? On Fri, Apr 22, 2016 at 12:44 PM, Jeremy Dyerwrote: > Mark, > > ok that makes sense. I have created a jira for this improvement > https://issues.apache.org/jira/browse/NIFI-1805 > > On Fri, Apr 22, 2016 at 12:27 PM, Mark Payne wrote: > >> Jeremy, >> >> It should be relatively easy. In FlowController, we would have to update >> getGroupStatus() to set the values on ConnectionStatus >> and of course update ConnectionStatus to have getters & setters for the >> new values. That should be about it, I think. >> >> -Mark >> >> >> > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer wrote: >> > >> > Mark, >> > >> > What would the process look like for doing that? Would that be something >> > trivial or require some reworking? >> > >> > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne >> wrote: >> > >> >> I definitely don't think we should be exposing the FlowController to a >> >> Reporting Task. >> >> However, I think exposing information about whether or not backpressure >> is >> >> being applied >> >> (or even is configured) is a very reasonable idea. >> >> >> >> -Mark >> >> >> >> >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer wrote: >> >>> >> >>> I could see the argument for not making that available. What about some >> >>> sort of reference that would allow the ReportingTask to to determine if >> >>> backpressure is being applied to a Connection? It currently seems you >> can >> >>> see the number of bytes and/or objects count queued in a connection but >> >>> don't have any reference to the values a user has setup for >> backpressure >> >> in >> >>> the UI. Is there a way to get those values in the scope of the >> >>> ReportingTask? >> >>> >> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende >> wrote: >> >>> >> I think the only way you could do it directly without the REST API is >> by >> having access to the FlowController, >> but that is purposely not exposed to extension points... actually >> StandardFlowController is what implements the >> EventAccess interface which ends up providing the path way to the >> status >> objects. >> >> I would have to defer to Joe, Mark, and others about whether we would >> >> want >> to expose direct access to components >> through controller services, or some other extension point. >> >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer >> wrote: >> >> > Bryan, >> > >> > The ReportingTask enumeration makes sense and was helpful for >> something >> > else I am working on as well. >> > >> > Like Joe however I'm looking for a way to not just get the *Status >> objects >> > but rather start and stop processors. Is there a way to do that from >> >> the >> > ReportContext scope? I imagine you could pull the Processor "Id" from >> >> the >> > ProcessorStatus and then use the REST API but was looking for >> something >> > more direct than having to use the REST API >> > >> > >> > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende >> wrote: >> > >> >> Hi Joe, >> >> >> >> I'm not sure if a controller service can do this, but a >> ReportingTask >> has >> >> access to similar information. >> >> >> >> A ReportingTask gets access to a ReportingContext, which can access >> >> EventAccess which can access ProcessGroupStatus. >> >> >> >> From ProcessGroupStatus you are at the root process group and can >> > enumerate >> >> the flow: >> >> >> >> private Collection connectionStatus = new >> > ArrayList<>(); >> >> private Collection processorStatus = new >> ArrayList<>(); >> >> private Collection processGroupStatus = new >> >> ArrayList<>(); >> >> private Collection >> remoteProcessGroupStatus >> >> = >> > new >> >> ArrayList<>(); >> >> private Collection inputPortStatus = new ArrayList<>(); >> >> private Collection outputPortStatus = new ArrayList<>(); >> >> >> >> Not sure if that is what you were looking for. >> >> >> >> -Bryan >> >> >> >> >> >> On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora >> wrote: >> >> >> >>> Is it possible and if so what is the best way for a controller >> service >> > to >> >>> get the collection of all processors or queues? >> >>> >> >>> The goal being to iterate over the collection of processors or >> queues >> > to >> >>> gather information or make adjustments to the flow. >> >>> >> >> >> > >> >> >> >> >> >> >>
Monitoring duplicate file entries with ExecuteSQL
I am trying to use a ExecutesSQL process to check if a file has already been processed. Files are flowing into our nifi and we have it set up to move files to another folder. Once the file is moved, we just log the history into MYSQL. I want to have it check if the filename is already in the db, thus checking if we have already processed this file. SELECT * FROM tbl WHERE ${filename} = File_Name(this is the column header in our MYSQL.) I have yet to get anything besides an error being generated saying my sql statement is incorrect. Am I going about this the wrong way? Does anyone have a better way of checking this sort of thing? I have added in future file names(as the file names are predictable) but it still will give me an error. Thanks -- View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Monitoring-duplicate-file-entries-with-ExecuteSQL-tp9490.html Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
[GitHub] nifi pull request: NIFI-1678
Github user mcgilman commented on the pull request: https://github.com/apache/nifi/pull/323#issuecomment-213514155 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Enumerating processors, queues, etc.
Mark, ok that makes sense. I have created a jira for this improvement https://issues.apache.org/jira/browse/NIFI-1805 On Fri, Apr 22, 2016 at 12:27 PM, Mark Paynewrote: > Jeremy, > > It should be relatively easy. In FlowController, we would have to update > getGroupStatus() to set the values on ConnectionStatus > and of course update ConnectionStatus to have getters & setters for the > new values. That should be about it, I think. > > -Mark > > > > On Apr 22, 2016, at 12:17 PM, Jeremy Dyer wrote: > > > > Mark, > > > > What would the process look like for doing that? Would that be something > > trivial or require some reworking? > > > > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne > wrote: > > > >> I definitely don't think we should be exposing the FlowController to a > >> Reporting Task. > >> However, I think exposing information about whether or not backpressure > is > >> being applied > >> (or even is configured) is a very reasonable idea. > >> > >> -Mark > >> > >> > >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer wrote: > >>> > >>> I could see the argument for not making that available. What about some > >>> sort of reference that would allow the ReportingTask to to determine if > >>> backpressure is being applied to a Connection? It currently seems you > can > >>> see the number of bytes and/or objects count queued in a connection but > >>> don't have any reference to the values a user has setup for > backpressure > >> in > >>> the UI. Is there a way to get those values in the scope of the > >>> ReportingTask? > >>> > >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende > wrote: > >>> > I think the only way you could do it directly without the REST API is > by > having access to the FlowController, > but that is purposely not exposed to extension points... actually > StandardFlowController is what implements the > EventAccess interface which ends up providing the path way to the > status > objects. > > I would have to defer to Joe, Mark, and others about whether we would > >> want > to expose direct access to components > through controller services, or some other extension point. > > On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer > wrote: > > > Bryan, > > > > The ReportingTask enumeration makes sense and was helpful for > something > > else I am working on as well. > > > > Like Joe however I'm looking for a way to not just get the *Status > objects > > but rather start and stop processors. Is there a way to do that from > >> the > > ReportContext scope? I imagine you could pull the Processor "Id" from > >> the > > ProcessorStatus and then use the REST API but was looking for > something > > more direct than having to use the REST API > > > > > > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende > wrote: > > > >> Hi Joe, > >> > >> I'm not sure if a controller service can do this, but a > ReportingTask > has > >> access to similar information. > >> > >> A ReportingTask gets access to a ReportingContext, which can access > >> EventAccess which can access ProcessGroupStatus. > >> > >> From ProcessGroupStatus you are at the root process group and can > > enumerate > >> the flow: > >> > >> private Collection connectionStatus = new > > ArrayList<>(); > >> private Collection processorStatus = new > ArrayList<>(); > >> private Collection processGroupStatus = new > >> ArrayList<>(); > >> private Collection > remoteProcessGroupStatus > >> = > > new > >> ArrayList<>(); > >> private Collection inputPortStatus = new ArrayList<>(); > >> private Collection outputPortStatus = new ArrayList<>(); > >> > >> Not sure if that is what you were looking for. > >> > >> -Bryan > >> > >> > >> On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora > wrote: > >> > >>> Is it possible and if so what is the best way for a controller > service > > to > >>> get the collection of all processors or queues? > >>> > >>> The goal being to iterate over the collection of processors or > queues > > to > >>> gather information or make adjustments to the flow. > >>> > >> > > > > >> > >> > >
Re: Enumerating processors, queues, etc.
Jeremy, It should be relatively easy. In FlowController, we would have to update getGroupStatus() to set the values on ConnectionStatus and of course update ConnectionStatus to have getters & setters for the new values. That should be about it, I think. -Mark > On Apr 22, 2016, at 12:17 PM, Jeremy Dyerwrote: > > Mark, > > What would the process look like for doing that? Would that be something > trivial or require some reworking? > > On Fri, Apr 22, 2016 at 10:26 AM, Mark Payne wrote: > >> I definitely don't think we should be exposing the FlowController to a >> Reporting Task. >> However, I think exposing information about whether or not backpressure is >> being applied >> (or even is configured) is a very reasonable idea. >> >> -Mark >> >> >>> On Apr 22, 2016, at 10:22 AM, Jeremy Dyer wrote: >>> >>> I could see the argument for not making that available. What about some >>> sort of reference that would allow the ReportingTask to to determine if >>> backpressure is being applied to a Connection? It currently seems you can >>> see the number of bytes and/or objects count queued in a connection but >>> don't have any reference to the values a user has setup for backpressure >> in >>> the UI. Is there a way to get those values in the scope of the >>> ReportingTask? >>> >>> On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende wrote: >>> I think the only way you could do it directly without the REST API is by having access to the FlowController, but that is purposely not exposed to extension points... actually StandardFlowController is what implements the EventAccess interface which ends up providing the path way to the status objects. I would have to defer to Joe, Mark, and others about whether we would >> want to expose direct access to components through controller services, or some other extension point. On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer wrote: > Bryan, > > The ReportingTask enumeration makes sense and was helpful for something > else I am working on as well. > > Like Joe however I'm looking for a way to not just get the *Status objects > but rather start and stop processors. Is there a way to do that from >> the > ReportContext scope? I imagine you could pull the Processor "Id" from >> the > ProcessorStatus and then use the REST API but was looking for something > more direct than having to use the REST API > > > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende wrote: > >> Hi Joe, >> >> I'm not sure if a controller service can do this, but a ReportingTask has >> access to similar information. >> >> A ReportingTask gets access to a ReportingContext, which can access >> EventAccess which can access ProcessGroupStatus. >> >> From ProcessGroupStatus you are at the root process group and can > enumerate >> the flow: >> >> private Collection connectionStatus = new > ArrayList<>(); >> private Collection processorStatus = new ArrayList<>(); >> private Collection processGroupStatus = new >> ArrayList<>(); >> private Collection remoteProcessGroupStatus >> = > new >> ArrayList<>(); >> private Collection inputPortStatus = new ArrayList<>(); >> private Collection outputPortStatus = new ArrayList<>(); >> >> Not sure if that is what you were looking for. >> >> -Bryan >> >> >> On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: >> >>> Is it possible and if so what is the best way for a controller service > to >>> get the collection of all processors or queues? >>> >>> The goal being to iterate over the collection of processors or queues > to >>> gather information or make adjustments to the flow. >>> >> > >> >>
[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/366#discussion_r60766260 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * Base class for {@link Processor}s to publish and consume messages from Kafka + * + * @see PutKafka + */ +abstract class AbstractKafkaProcessor extends AbstractSessionFactoryProcessor { + + +private volatile boolean acceptTask = true; + +private final AtomicInteger taskCounter = new AtomicInteger(); + + +/** + * @see KafkaPublisher + */ +volatile T kafkaResource; + +/** + * + */ +@Override +public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { +final ProcessSession session = sessionFactory.createSession(); +if (this.acceptTask) { +try { +this.taskCounter.incrementAndGet(); +if (this.kafkaResource == null) { +synchronized (this) { +if (this.kafkaResource == null) { +this.kafkaResource = this.buildKafkaResource(context, session); +} +} +} + +this.rendezvousWithKafka(context, session); +session.commit(); +} catch (Throwable e) { +this.acceptTask = false; +this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[] { this, e }); +session.rollback(true); +} finally { +this.resetProcessorIfNecessary(); +} +} else { +context.yield(); +} +} +/** + * Resets the processor to initial state if necessary. The necessary state + * means there are no active task which could only happen if currently + * executing tasks are given a chance to finish while no new tasks are + * accepted (see {@link #acceptTask} + */ +private boolean resetProcessorIfNecessary() { +boolean reset = this.taskCounter.decrementAndGet() == 0 && !this.acceptTask; +if (reset) { +this.close(); +this.acceptTask = true; +} +return reset; +} + +/** + * Will call {@link Closeable#close()} on the target resource after which + * the target resource will be set to null + * + * @see KafkaPublisher + */ +@OnStopped +public void close() { --- End diff -- @olegz could we reduce the scope here? The concern I have is that technically this is a check-then-modify scenario and it could be called by both internal processor threads executing resetProcessorIfNecessary and whenever the framework is calling onStopped. Now, by the way the framework works onStopped should only be called once there are no threads left so technically this should be legit. Just would not want someone to also call this from other places in the code if we can avoid it. So recommend we
Re: Enumerating processors, queues, etc.
Mark, What would the process look like for doing that? Would that be something trivial or require some reworking? On Fri, Apr 22, 2016 at 10:26 AM, Mark Paynewrote: > I definitely don't think we should be exposing the FlowController to a > Reporting Task. > However, I think exposing information about whether or not backpressure is > being applied > (or even is configured) is a very reasonable idea. > > -Mark > > > > On Apr 22, 2016, at 10:22 AM, Jeremy Dyer wrote: > > > > I could see the argument for not making that available. What about some > > sort of reference that would allow the ReportingTask to to determine if > > backpressure is being applied to a Connection? It currently seems you can > > see the number of bytes and/or objects count queued in a connection but > > don't have any reference to the values a user has setup for backpressure > in > > the UI. Is there a way to get those values in the scope of the > > ReportingTask? > > > > On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende wrote: > > > >> I think the only way you could do it directly without the REST API is by > >> having access to the FlowController, > >> but that is purposely not exposed to extension points... actually > >> StandardFlowController is what implements the > >> EventAccess interface which ends up providing the path way to the status > >> objects. > >> > >> I would have to defer to Joe, Mark, and others about whether we would > want > >> to expose direct access to components > >> through controller services, or some other extension point. > >> > >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer wrote: > >> > >>> Bryan, > >>> > >>> The ReportingTask enumeration makes sense and was helpful for something > >>> else I am working on as well. > >>> > >>> Like Joe however I'm looking for a way to not just get the *Status > >> objects > >>> but rather start and stop processors. Is there a way to do that from > the > >>> ReportContext scope? I imagine you could pull the Processor "Id" from > the > >>> ProcessorStatus and then use the REST API but was looking for something > >>> more direct than having to use the REST API > >>> > >>> > >>> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende wrote: > >>> > Hi Joe, > > I'm not sure if a controller service can do this, but a ReportingTask > >> has > access to similar information. > > A ReportingTask gets access to a ReportingContext, which can access > EventAccess which can access ProcessGroupStatus. > > From ProcessGroupStatus you are at the root process group and can > >>> enumerate > the flow: > > private Collection connectionStatus = new > >>> ArrayList<>(); > private Collection processorStatus = new > >> ArrayList<>(); > private Collection processGroupStatus = new > ArrayList<>(); > private Collection remoteProcessGroupStatus > = > >>> new > ArrayList<>(); > private Collection inputPortStatus = new ArrayList<>(); > private Collection outputPortStatus = new ArrayList<>(); > > Not sure if that is what you were looking for. > > -Bryan > > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: > > > Is it possible and if so what is the best way for a controller > >> service > >>> to > > get the collection of all processors or queues? > > > > The goal being to iterate over the collection of processors or queues > >>> to > > gather information or make adjustments to the flow. > > > > >>> > >> > >
[GitHub] nifi-minifi pull request: MINIFI-27 adding debug statements to Boo...
GitHub user JPercivall opened a pull request: https://github.com/apache/nifi-minifi/pull/16 MINIFI-27 adding debug statements to BootstrapCodec You can merge this pull request into a Git repository by running: $ git pull https://github.com/JPercivall/nifi-minifi MINIFI-27 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi/pull/16.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16 commit 9891fb3fd57149ab874342e122775b5b16653d87 Author: Joseph PercivallDate: 2016-04-22T14:56:45Z MINIFI-27 adding debug statements to BootstrapCodec --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Enumerating processors, queues, etc.
For an extensible system like NiFi, what is the rational behind not allowing the system introspection into the processor and queue collections? On Fri, Apr 22, 2016 at 10:26 AM, Mark Paynewrote: > I definitely don't think we should be exposing the FlowController to a > Reporting Task. > However, I think exposing information about whether or not backpressure is > being applied > (or even is configured) is a very reasonable idea. > > -Mark > > > > On Apr 22, 2016, at 10:22 AM, Jeremy Dyer wrote: > > > > I could see the argument for not making that available. What about some > > sort of reference that would allow the ReportingTask to to determine if > > backpressure is being applied to a Connection? It currently seems you can > > see the number of bytes and/or objects count queued in a connection but > > don't have any reference to the values a user has setup for backpressure > in > > the UI. Is there a way to get those values in the scope of the > > ReportingTask? > > > > On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende wrote: > > > >> I think the only way you could do it directly without the REST API is by > >> having access to the FlowController, > >> but that is purposely not exposed to extension points... actually > >> StandardFlowController is what implements the > >> EventAccess interface which ends up providing the path way to the status > >> objects. > >> > >> I would have to defer to Joe, Mark, and others about whether we would > want > >> to expose direct access to components > >> through controller services, or some other extension point. > >> > >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer wrote: > >> > >>> Bryan, > >>> > >>> The ReportingTask enumeration makes sense and was helpful for something > >>> else I am working on as well. > >>> > >>> Like Joe however I'm looking for a way to not just get the *Status > >> objects > >>> but rather start and stop processors. Is there a way to do that from > the > >>> ReportContext scope? I imagine you could pull the Processor "Id" from > the > >>> ProcessorStatus and then use the REST API but was looking for something > >>> more direct than having to use the REST API > >>> > >>> > >>> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende wrote: > >>> > Hi Joe, > > I'm not sure if a controller service can do this, but a ReportingTask > >> has > access to similar information. > > A ReportingTask gets access to a ReportingContext, which can access > EventAccess which can access ProcessGroupStatus. > > From ProcessGroupStatus you are at the root process group and can > >>> enumerate > the flow: > > private Collection connectionStatus = new > >>> ArrayList<>(); > private Collection processorStatus = new > >> ArrayList<>(); > private Collection processGroupStatus = new > ArrayList<>(); > private Collection remoteProcessGroupStatus > = > >>> new > ArrayList<>(); > private Collection inputPortStatus = new ArrayList<>(); > private Collection outputPortStatus = new ArrayList<>(); > > Not sure if that is what you were looking for. > > -Bryan > > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: > > > Is it possible and if so what is the best way for a controller > >> service > >>> to > > get the collection of all processors or queues? > > > > The goal being to iterate over the collection of processors or queues > >>> to > > gather information or make adjustments to the flow. > > > > >>> > >> > >
Feature Proposal for Temporal Analytics using Stateful Processors
Hello Dev, In the same vein as the discuss thread I initiated for adding Decimals to Expression Language (EL), I created a Feature Proposal for "Leverage Local State to Do Temporal Analytics"[1]. This again is a continuation of what I presented at the Apache NiFi MeetUp in Maryland[2]. The relatively recent additions of state to processors[3] and the ability for processors to expose key value pairs to EL[4] allows for the creation of processors which enable Users to do temporal analysis and data analytics using EL. I look forward to any questions, comments, or concerns you may have. [1] https://cwiki.apache.org/confluence/display/NIFI/Leverage+Local+State+to+Do+Temporal+Analytics [2] http://www.meetup.com/ApacheNiFi/events/229158777/ [3] https://issues.apache.org/jira/browse/NIFI-259 [4] https://issues.apache.org/jira/browse/NIFI-1249 Joe - - - - - - Joseph Percivall linkedin.com/in/Percivall e: joeperciv...@yahoo.com
Re: Enumerating processors, queues, etc.
I'm fine with the REST API being a better answer or part of the answer. A separate system that can monitor and manage an instance through the REST API might be the right way to go, plus it separates the concerns of monitor and monitored. But to me, a controller service seems like a logical choice for a tool to be configure with thresholds that will be used to evaluate processors and queues and either alert or modify UI parameters when components exceed the thresholds. My original use case was the "Processor Stop Light Service" that could change the background (or something similar) for controllers based on their state. On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bendewrote: > I think the only way you could do it directly without the REST API is by > having access to the FlowController, > but that is purposely not exposed to extension points... actually > StandardFlowController is what implements the > EventAccess interface which ends up providing the path way to the status > objects. > > I would have to defer to Joe, Mark, and others about whether we would want > to expose direct access to components > through controller services, or some other extension point. > > On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer wrote: > > > Bryan, > > > > The ReportingTask enumeration makes sense and was helpful for something > > else I am working on as well. > > > > Like Joe however I'm looking for a way to not just get the *Status > objects > > but rather start and stop processors. Is there a way to do that from the > > ReportContext scope? I imagine you could pull the Processor "Id" from the > > ProcessorStatus and then use the REST API but was looking for something > > more direct than having to use the REST API > > > > > > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende wrote: > > > > > Hi Joe, > > > > > > I'm not sure if a controller service can do this, but a ReportingTask > has > > > access to similar information. > > > > > > A ReportingTask gets access to a ReportingContext, which can access > > > EventAccess which can access ProcessGroupStatus. > > > > > > From ProcessGroupStatus you are at the root process group and can > > enumerate > > > the flow: > > > > > > private Collection connectionStatus = new > > ArrayList<>(); > > > private Collection processorStatus = new > ArrayList<>(); > > > private Collection processGroupStatus = new > > > ArrayList<>(); > > > private Collection remoteProcessGroupStatus = > > new > > > ArrayList<>(); > > > private Collection inputPortStatus = new ArrayList<>(); > > > private Collection outputPortStatus = new ArrayList<>(); > > > > > > Not sure if that is what you were looking for. > > > > > > -Bryan > > > > > > > > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: > > > > > > > Is it possible and if so what is the best way for a controller > service > > to > > > > get the collection of all processors or queues? > > > > > > > > The goal being to iterate over the collection of processors or queues > > to > > > > gather information or make adjustments to the flow. > > > > > > > > > >
Re: Enumerating processors, queues, etc.
I definitely don't think we should be exposing the FlowController to a Reporting Task. However, I think exposing information about whether or not backpressure is being applied (or even is configured) is a very reasonable idea. -Mark > On Apr 22, 2016, at 10:22 AM, Jeremy Dyerwrote: > > I could see the argument for not making that available. What about some > sort of reference that would allow the ReportingTask to to determine if > backpressure is being applied to a Connection? It currently seems you can > see the number of bytes and/or objects count queued in a connection but > don't have any reference to the values a user has setup for backpressure in > the UI. Is there a way to get those values in the scope of the > ReportingTask? > > On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bende wrote: > >> I think the only way you could do it directly without the REST API is by >> having access to the FlowController, >> but that is purposely not exposed to extension points... actually >> StandardFlowController is what implements the >> EventAccess interface which ends up providing the path way to the status >> objects. >> >> I would have to defer to Joe, Mark, and others about whether we would want >> to expose direct access to components >> through controller services, or some other extension point. >> >> On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer wrote: >> >>> Bryan, >>> >>> The ReportingTask enumeration makes sense and was helpful for something >>> else I am working on as well. >>> >>> Like Joe however I'm looking for a way to not just get the *Status >> objects >>> but rather start and stop processors. Is there a way to do that from the >>> ReportContext scope? I imagine you could pull the Processor "Id" from the >>> ProcessorStatus and then use the REST API but was looking for something >>> more direct than having to use the REST API >>> >>> >>> On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende wrote: >>> Hi Joe, I'm not sure if a controller service can do this, but a ReportingTask >> has access to similar information. A ReportingTask gets access to a ReportingContext, which can access EventAccess which can access ProcessGroupStatus. From ProcessGroupStatus you are at the root process group and can >>> enumerate the flow: private Collection connectionStatus = new >>> ArrayList<>(); private Collection processorStatus = new >> ArrayList<>(); private Collection processGroupStatus = new ArrayList<>(); private Collection remoteProcessGroupStatus = >>> new ArrayList<>(); private Collection inputPortStatus = new ArrayList<>(); private Collection outputPortStatus = new ArrayList<>(); Not sure if that is what you were looking for. -Bryan On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: > Is it possible and if so what is the best way for a controller >> service >>> to > get the collection of all processors or queues? > > The goal being to iterate over the collection of processors or queues >>> to > gather information or make adjustments to the flow. > >>> >>
Re: Enumerating processors, queues, etc.
I could see the argument for not making that available. What about some sort of reference that would allow the ReportingTask to to determine if backpressure is being applied to a Connection? It currently seems you can see the number of bytes and/or objects count queued in a connection but don't have any reference to the values a user has setup for backpressure in the UI. Is there a way to get those values in the scope of the ReportingTask? On Fri, Apr 22, 2016 at 10:03 AM, Bryan Bendewrote: > I think the only way you could do it directly without the REST API is by > having access to the FlowController, > but that is purposely not exposed to extension points... actually > StandardFlowController is what implements the > EventAccess interface which ends up providing the path way to the status > objects. > > I would have to defer to Joe, Mark, and others about whether we would want > to expose direct access to components > through controller services, or some other extension point. > > On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyer wrote: > > > Bryan, > > > > The ReportingTask enumeration makes sense and was helpful for something > > else I am working on as well. > > > > Like Joe however I'm looking for a way to not just get the *Status > objects > > but rather start and stop processors. Is there a way to do that from the > > ReportContext scope? I imagine you could pull the Processor "Id" from the > > ProcessorStatus and then use the REST API but was looking for something > > more direct than having to use the REST API > > > > > > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende wrote: > > > > > Hi Joe, > > > > > > I'm not sure if a controller service can do this, but a ReportingTask > has > > > access to similar information. > > > > > > A ReportingTask gets access to a ReportingContext, which can access > > > EventAccess which can access ProcessGroupStatus. > > > > > > From ProcessGroupStatus you are at the root process group and can > > enumerate > > > the flow: > > > > > > private Collection connectionStatus = new > > ArrayList<>(); > > > private Collection processorStatus = new > ArrayList<>(); > > > private Collection processGroupStatus = new > > > ArrayList<>(); > > > private Collection remoteProcessGroupStatus = > > new > > > ArrayList<>(); > > > private Collection inputPortStatus = new ArrayList<>(); > > > private Collection outputPortStatus = new ArrayList<>(); > > > > > > Not sure if that is what you were looking for. > > > > > > -Bryan > > > > > > > > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: > > > > > > > Is it possible and if so what is the best way for a controller > service > > to > > > > get the collection of all processors or queues? > > > > > > > > The goal being to iterate over the collection of processors or queues > > to > > > > gather information or make adjustments to the flow. > > > > > > > > > >
Re: Enumerating processors, queues, etc.
I think the only way you could do it directly without the REST API is by having access to the FlowController, but that is purposely not exposed to extension points... actually StandardFlowController is what implements the EventAccess interface which ends up providing the path way to the status objects. I would have to defer to Joe, Mark, and others about whether we would want to expose direct access to components through controller services, or some other extension point. On Fri, Apr 22, 2016 at 9:46 AM, Jeremy Dyerwrote: > Bryan, > > The ReportingTask enumeration makes sense and was helpful for something > else I am working on as well. > > Like Joe however I'm looking for a way to not just get the *Status objects > but rather start and stop processors. Is there a way to do that from the > ReportContext scope? I imagine you could pull the Processor "Id" from the > ProcessorStatus and then use the REST API but was looking for something > more direct than having to use the REST API > > > On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bende wrote: > > > Hi Joe, > > > > I'm not sure if a controller service can do this, but a ReportingTask has > > access to similar information. > > > > A ReportingTask gets access to a ReportingContext, which can access > > EventAccess which can access ProcessGroupStatus. > > > > From ProcessGroupStatus you are at the root process group and can > enumerate > > the flow: > > > > private Collection connectionStatus = new > ArrayList<>(); > > private Collection processorStatus = new ArrayList<>(); > > private Collection processGroupStatus = new > > ArrayList<>(); > > private Collection remoteProcessGroupStatus = > new > > ArrayList<>(); > > private Collection inputPortStatus = new ArrayList<>(); > > private Collection outputPortStatus = new ArrayList<>(); > > > > Not sure if that is what you were looking for. > > > > -Bryan > > > > > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: > > > > > Is it possible and if so what is the best way for a controller service > to > > > get the collection of all processors or queues? > > > > > > The goal being to iterate over the collection of processors or queues > to > > > gather information or make adjustments to the flow. > > > > > >
[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/15#discussion_r60741952 --- Diff: minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java --- @@ -1379,7 +1430,7 @@ private void saveFile(final InputStream configInputStream, File configFile) { } } -private void performTransformation(InputStream configIs, String configDestinationPath) { +public void performTransformation(InputStream configIs, String configDestinationPath) { --- End diff -- That is true, I will make it package private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/15#discussion_r60741623 --- Diff: minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java --- @@ -1080,15 +,31 @@ public void start() throws IOException, InterruptedException { final File reloadFile = getReloadFile(defaultLogger); if (reloadFile.exists()) { -defaultLogger.info("Currently reloading configuration. Will not restart MiNiFi."); +defaultLogger.info("Currently reloading configuration. Will wait to restart MiNiFi."); Thread.sleep(5000L); continue; } final boolean previouslyStarted = getNifiStarted(); if (!previouslyStarted) { -defaultLogger.info("MiNiFi never started. Will not restart MiNiFi"); -return; +final File swapConfigFile = getSwapFile(defaultLogger); +if(swapConfigFile.exists()){ +defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration."); + +final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY); +((MiNiFiConfigurationChangeListener) changeListener).performTransformation(new FileInputStream(swapConfigFile), confDir); --- End diff -- I agree, changing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Enumerating processors, queues, etc.
Bryan, The ReportingTask enumeration makes sense and was helpful for something else I am working on as well. Like Joe however I'm looking for a way to not just get the *Status objects but rather start and stop processors. Is there a way to do that from the ReportContext scope? I imagine you could pull the Processor "Id" from the ProcessorStatus and then use the REST API but was looking for something more direct than having to use the REST API On Fri, Apr 22, 2016 at 9:23 AM, Bryan Bendewrote: > Hi Joe, > > I'm not sure if a controller service can do this, but a ReportingTask has > access to similar information. > > A ReportingTask gets access to a ReportingContext, which can access > EventAccess which can access ProcessGroupStatus. > > From ProcessGroupStatus you are at the root process group and can enumerate > the flow: > > private Collection connectionStatus = new ArrayList<>(); > private Collection processorStatus = new ArrayList<>(); > private Collection processGroupStatus = new > ArrayList<>(); > private Collection remoteProcessGroupStatus = new > ArrayList<>(); > private Collection inputPortStatus = new ArrayList<>(); > private Collection outputPortStatus = new ArrayList<>(); > > Not sure if that is what you were looking for. > > -Bryan > > > On Fri, Apr 22, 2016 at 8:25 AM, Joe Skora wrote: > > > Is it possible and if so what is the best way for a controller service to > > get the collection of all processors or queues? > > > > The goal being to iterate over the collection of processors or queues to > > gather information or make adjustments to the flow. > > >
[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/15#discussion_r60739792 --- Diff: minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java --- @@ -1379,7 +1430,7 @@ private void saveFile(final InputStream configInputStream, File configFile) { } } -private void performTransformation(InputStream configIs, String configDestinationPath) { +public void performTransformation(InputStream configIs, String configDestinationPath) { --- End diff -- Understand the need for increased visibility but think protected or package private would be sufficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/15#discussion_r60739656 --- Diff: minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java --- @@ -1080,15 +,31 @@ public void start() throws IOException, InterruptedException { final File reloadFile = getReloadFile(defaultLogger); if (reloadFile.exists()) { -defaultLogger.info("Currently reloading configuration. Will not restart MiNiFi."); +defaultLogger.info("Currently reloading configuration. Will wait to restart MiNiFi."); Thread.sleep(5000L); continue; } final boolean previouslyStarted = getNifiStarted(); if (!previouslyStarted) { -defaultLogger.info("MiNiFi never started. Will not restart MiNiFi"); -return; +final File swapConfigFile = getSwapFile(defaultLogger); +if(swapConfigFile.exists()){ +defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration."); + +final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY); +((MiNiFiConfigurationChangeListener) changeListener).performTransformation(new FileInputStream(swapConfigFile), confDir); --- End diff -- Would prefer to just declare the field of this class to just be a MiNiFiConfigurationChangeListener instead of having to do the cast. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/15#discussion_r60738272 --- Diff: minifi-assembly/pom.xml --- @@ -261,7 +261,7 @@ limitations under the License. ./lib -8080 --- End diff -- I switched it because we're planning on getting rid of the UI anyway so currently it is primarily for debugging purposes and it's easier to debug when it doesn't have a conflicting default port as NiFi. If it makes more sense to switch it back I can then I would also change it in the ConfigTransformer (defaults to 8081 during transformation). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Enumerating processors, queues, etc.
Hi Joe, I'm not sure if a controller service can do this, but a ReportingTask has access to similar information. A ReportingTask gets access to a ReportingContext, which can access EventAccess which can access ProcessGroupStatus. >From ProcessGroupStatus you are at the root process group and can enumerate the flow: private Collection connectionStatus = new ArrayList<>(); private Collection processorStatus = new ArrayList<>(); private Collection processGroupStatus = new ArrayList<>(); private Collection remoteProcessGroupStatus = new ArrayList<>(); private Collection inputPortStatus = new ArrayList<>(); private Collection outputPortStatus = new ArrayList<>(); Not sure if that is what you were looking for. -Bryan On Fri, Apr 22, 2016 at 8:25 AM, Joe Skorawrote: > Is it possible and if so what is the best way for a controller service to > get the collection of all processors or queues? > > The goal being to iterate over the collection of processors or queues to > gather information or make adjustments to the flow. >
[GitHub] nifi-minifi pull request: MINIFI-17 Adding error handling of confi...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/15#discussion_r60736740 --- Diff: minifi-assembly/pom.xml --- @@ -261,7 +261,7 @@ limitations under the License. ./lib -8080 --- End diff -- Should be converted back to the typical 8080 default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Enumerating processors, queues, etc.
Is it possible and if so what is the best way for a controller service to get the collection of all processors or queues? The goal being to iterate over the collection of processors or queues to gather information or make adjustments to the flow.