[GitHub] nifi issue #3200: NIFI-5826 WIP Fix back-slash escaping at Lexers
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3200 Reviewing... ---
[GitHub] nifi pull request #3183: NIFI-5826 Fix to escaped backslash
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3183#discussion_r238642945 --- Diff: nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java --- @@ -39,4 +39,52 @@ public static String getFirstStringValue(final RecordPathSegment segment, final return stringValue; } + +/** + * This method handles backslash sequences after ANTLR parser converts all backslash into double ones + * with exception for \t, \r and \n. See + * org/apache/nifi/record/path/RecordPathParser.g --- End diff -- @ijokarumawak , I understand what you are talking about, that was my first idea as well. the problem is coming with regression on WORKING functionality, like EL. If we change Lexer in all three cases, we will have a problem of backward compatibility. As an example: define GFF with attribute "a1" having value (with actual new line): ``` "this is new line and this is just a backslash \n" ``` (doesn't matter with quotes or w/o). next: UpdateAttribute with: `a1: "${a:replaceAll('\\n','_')}"` and `a2: "${a:replaceAll('\n','_')}"` (note single and double backslash) In both cases the only character that will be replaced will be actual new line, resulting in: `a1=a2="this is new line_and this is just a backslash \n" ` If we change Lexer for EL, this will be changed and will behave differently and will result in: `a1 = "his is new line` and this is just a backslash _" `a2 = "his is new line_and this is just a backslash \n"` Of course, we can do "right" way for RecordPath regex-based functions, but then we will have different user experience with regex in RecordPath and EL, which I think should be avoided. Regarding Java method "unescapeBackslash". As a name suggests, this function it to treat string values having "backslash" in their values. I do agree that for the test cases we have, some parts of the code are dead, but since this is public method in utility class, it can have generic functionality to support wider varieties of the use cases. Would appreciate your feedback! ---
[GitHub] nifi pull request #3183: NIFI-5826 Fix to escaped backslash
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3183#discussion_r237529258 --- Diff: nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java --- @@ -39,4 +39,52 @@ public static String getFirstStringValue(final RecordPathSegment segment, final return stringValue; } + +/** + * This method handles backslash sequences after ANTLR parser converts all backslash into double ones + * with exception for \t, \r and \n. See + * org/apache/nifi/record/path/RecordPathParser.g --- End diff -- Yeah, that was supposed to be Lexer (I'll update PR later just in case I'll need to make more changes). Regarding your question on Lexer. When I started working on this issue, I also first started looking into Lexer, trying to understand all these escapes for backslash. After that I took a look at Expression Language Lexer and found the same code there: https://github.com/apache/nifi/blob/master/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g#L235 Then I started digging into actual code in Java and understood the reasons. ANTLR parses the code coming from a textbox. All special cases for backslash sequences (\r,\n,\t) should be supported and escaped to be able to capture actual new lines and tab chars. But then, if you support backslash sequences, you would also need to escape "\" itself, so there is another escape for backslash on a line 152. And then, if you escape single backslash, as special character, to avoid confusion with \r,\n,\t,\\, the rest of the characters are being escaped with double backslash + character (line 155)- to escape actual two char sequence of "\t" or "\r" , etc... Changing Lexer would change default behavior for backslash sequences not only for regex functions, but for all record based input. That would create backward compatibility issue, and that is the main reason I decided not to change Lexer. And also, as I mentioned in already above, current "escape" policy is consistent with EL, so i preferred to keep consistency and just fix a bug of not escaping back for regular expressions. Let me know if that makes sense, or if you have better ideas. ---
[GitHub] nifi pull request #3183: NIFI-5826 Fix to escaped backslash
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3183#discussion_r236843474 --- Diff: nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java --- @@ -39,4 +39,44 @@ public static String getFirstStringValue(final RecordPathSegment segment, final return stringValue; } + +public static String unescapeBackslash(String value) { +if (value == null || value.isEmpty()) { +return value; +} +// need to escape characters after backslashes +final StringBuilder sb = new StringBuilder(); +boolean lastCharIsBackslash = false; +for (int i = 0; i < value.length(); i++) { +final char c = value.charAt(i); + --- End diff -- @ottobackwards I've added Javadocs with reference to antlr parser and explaining the reasons for this transformation. ---
[GitHub] nifi pull request #3183: NIFI-5826 Fix to escaped backslash
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3183#discussion_r236829982 --- Diff: nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java --- @@ -1008,12 +1008,16 @@ public void testReplaceRegex() { final List fields = new ArrayList<>(); fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); --- End diff -- added tests to cover all of special chars. Also added backslash escape sequences to test exceptions. ---
[GitHub] nifi pull request #3183: NIFI-5826 Fix to escaped backslash
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3183#discussion_r236789857 --- Diff: nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java --- @@ -39,4 +39,44 @@ public static String getFirstStringValue(final RecordPathSegment segment, final return stringValue; } + +public static String unescapeBackslash(String value) { +if (value == null || value.isEmpty()) { +return value; +} +// need to escape characters after backslashes +final StringBuilder sb = new StringBuilder(); +boolean lastCharIsBackslash = false; +for (int i = 0; i < value.length(); i++) { +final char c = value.charAt(i); + --- End diff -- Added to the PR description: EL References: StringLiteralEvaluator. Special characters (backslash sequences) will be handled for any string literal value, and will affect the way regex and escaped chars must be defined. Same logic has been added to RecordPathUtils.unescapeBackslash. ---
[GitHub] nifi pull request #3183: NIFI-5826 Fix to escaped backslash
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/3183 NIFI-5826 Fix to escaped backslash Fixing escaped backslash by unescaping it in java just before compiling regex. - For a consistency with expression language regex-based evaluators, all record path operators will follow the same escape sequence for regex. Single backslash should be defined as double backslash. Examples: - replaceRegex(/col1, '\\s', "_") - will replace all whitespaces (spaces, tabs) with underscore. - replaceRegex(/col1, '\\.', ",") - will replace all a period with - replaceRegex(/col1, '', "/") - will replace backslash with forward slash - Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-5826_RegexPath Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3183 commit 4e0eed56f76556b323f96640d4d3126d35750096 Author: Ed Date: 2018-11-27T17:19:20Z NIFI-5826 Fix to escaped backslash Fixing escaped backslash by unescaping it in java just before compiling regex. For a consistency with expression language regex-based evaluators, all record path operators will follow the same escape sequence for regex. Single backslash should be defined as double backslash. Examples: - replaceRegex(/col1, '\\s', "_") - will replace all whitespaces (spaces, tabs) with underscore. - replaceRegex(/col1, '\\.', ",") - will replace all a period with - replaceRegex(/col1, '', "/") - will replace backslash with forward slash NIFI-5826 Fix to escaped backslash Fixing escaped backslash by unescaping it in java just before compiling regex. For a consistency with expression language regex-based evaluators, all record path operators will follow the same escape sequence for regex. Single backslash should be defined as double backslash. Examples: - replaceRegex(/col1, '\\s', "_") - will replace all whitespaces (spaces, tabs) with underscore. - replaceRegex(/col1, '\\.', ",") - will replace all a period with - replaceRegex(/col1, '', "/") - will replace backslash with forward slash ---
[GitHub] nifi issue #1953: NIFI-4130 Add lookup controller service in TransformXML to...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/1953 +1 LGTM. Tested on local env, with XSLT as a file (regression), with lookup service, cache size 0 and >0, all works as expected. Ready for merge. @mattyb149 , please could you please give a final look? ---
[GitHub] nifi pull request #1953: NIFI-4130 Add lookup controller service in Transfor...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/1953#discussion_r233016908 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java --- @@ -82,12 +94,32 @@ public static final PropertyDescriptor XSLT_FILE_NAME = new PropertyDescriptor.Builder() .name("XSLT file name") -.description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content.") -.required(true) +.description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content." ++ "One of the XSLT file name and XSLT controller properties must be defined.") +.required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .build(); +public static final PropertyDescriptor XSLT_CONTROLLER = new PropertyDescriptor.Builder() +.name("xslt-controller") +.displayName("XSLT controller") --- End diff -- "XSLT Lookup" Would be more readable. Description: "Lookup controller used to store..." And: XSLT_CONTROLLER_KEY: "XSLT Lookup Key" (description looks fine) ---
[GitHub] nifi pull request #1953: NIFI-4130 Add lookup controller service in Transfor...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/1953#discussion_r233019686 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java --- @@ -82,12 +94,32 @@ public static final PropertyDescriptor XSLT_FILE_NAME = new PropertyDescriptor.Builder() .name("XSLT file name") -.description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content.") -.required(true) +.description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content." ++ "One of the XSLT file name and XSLT controller properties must be defined.") +.required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .build(); +public static final PropertyDescriptor XSLT_CONTROLLER = new PropertyDescriptor.Builder() +.name("xslt-controller") +.displayName("XSLT controller") +.description("Controller used to store XSLT definitions. One of the XSLT file name and " ++ "XSLT controller properties must be defined.") +.required(false) +.identifiesControllerService(StringLookupService.class) +.build(); + +public static final PropertyDescriptor XSLT_CONTROLLER_KEY = new PropertyDescriptor.Builder() +.name("xslt-controller-key") +.displayName("XSLT controller key") +.description("Key used to retrieve the XSLT definition from the XSLT controller. This property must be set when using " ++ "the XSLT controller property.") +.required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) --- End diff -- since this property supports EL, shouldn't it be NON_EMPTY_**EL**_VALIDATOR? ---
[GitHub] nifi issue #1953: NIFI-4130 Add lookup controller service in TransformXML to...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/1953 Reviewing... ---
[GitHub] nifi pull request #3164: NIFI-5810 Add UserName EL support to JMS processors
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/3164 NIFI-5810 Add UserName EL support to JMS processors Adding EL support to a property "User Name" to ConsumeJMS and PublishJSM Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-5810_JMS_EL Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3164.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3164 commit 5013dfff04bff154f8e1fc150e096a3ab6098a49 Author: Ed B Date: 2018-11-10T00:27:10Z NIFI-5810 Add UserName EL support to JMS processors Adding EL support to a property "User Name" to ConsumeJMS and PublishJSM ---
[GitHub] nifi pull request #3117: NIFI-5770 Fix Memory Leak in ExecuteScript on Jytho...
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/3117 NIFI-5770 Fix Memory Leak in ExecuteScript on Jython Moved module appending (aka classpath in python) into init stage instead of running each time onTrigger. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-5770_ExecuteScript Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3117.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3117 commit e6837c81e84b2d7fa29b020c8192b4a2a9783e18 Author: Ed B Date: 2018-10-31T13:10:27Z NIFI-5770 Fix Memory Leak in ExecuteScript on Jython Moved module appending (aka classpath in python) into init stage instead of running each time onTrigger. ---
[GitHub] nifi pull request #3098: NIFI-5728 XML Writer to populate record tag name pr...
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/3098 NIFI-5728 XML Writer to populate record tag name properly All the changes are tested against - "Use Schema Text" (regression) - "AvroSchemaRegistry" Controller Service - "Hortonworks Schema Registry" Controller Service (docker). Also added Test Case to handle scenario when schema name in xxxRegistry Controller Service doesn't mach "name" field of the root record in schema text. --- Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-5728_XMLWriter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3098 commit 268ae74deeaf0228fb07cf50c3dd343ff9fb7e9f Author: Ed B Date: 2018-10-21T01:42:21Z NIFI-5728 XML Writer to populate record tag name properly ---
[GitHub] nifi issue #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3078 @joewitt , @patricker While working on implementations, I faced a problem of penalization on failure. Some processors are penalizing FF on failure, some do only on rollback, some gives you a choice whether to penalize or not on failure. Use Case: FetchFile -> transform -> PutHiveStreaming On PutHiveStreaming failure -> 1) wait 1 min + retry, 2) create a table (puthiveql will make you overwrite content, so you cannot do that sequentially). There are more use, fo now I do like this: ![image](https://user-images.githubusercontent.com/19496093/47114041-bdd24900-d228-11e8-9458-ce9f171ec9f9.png) ---
[GitHub] nifi issue #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3078 +1 LGTM Pulled the changes, tested on local env, all looks good to go. @markap14 , @alopresto , any additional comments? ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226029162 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226028831 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226029040 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226028907 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226035394 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"penalty", "penalize", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Penalizes a FlowFile.") --- End diff -- ```suggestion @CapabilityDescription("This processor provides capability to penalize flow files. " + "Every flow file will be penalized as per 'Penalty Duration' property of the processor.") ``` ---
[GitHub] nifi issue #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3078 @patricker , the code looks good. I'll test them locally later today and will provide my feedback. ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226004572 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"penalty", "penalize", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Penalizes a FlowFile.") +@WritesAttributes({ +@WritesAttribute(attribute = "penalization.count.{processor uuid}", description = "How many times this processor has penalized this FlowFile.") +}) + +public class PenalizeFlowFile extends AbstractProcessor { +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successfully penalized FlowFile").build(); + +private List properties; +private Set relationships; + +@Override +protected void init(final ProcessorInitializationContext context) { +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +this.relationships = Collections.unmodifiableSet(relationships); +} +@Override +public Set getRelationships() { +return relationships; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { +FlowFile flowFile = session.get(); +if (flowFile == null) { +return; +} + +// Track how many times a FlowFile passes through this processor to better support the Retry use case +final String retryAttrName = "penalization.count." + this.getIdentifier(); +final String initialCount = flowFile.getAttribute(retryAttrName); +long cnt = 0; +if(initialCount != null) { +cnt = Long.parseLong(initialCount); +} + +cnt++; + +flowFile = session.putAttribute(flowFile, retryAttrName, Long.toString(cnt)); --- End diff -- as discussed under JIRA, if this processor goes for penalizing only (without re-try functionality), then I'd remove lines 77-86 at all. retry capabilities should be implemented then in separate processor. ---
[GitHub] nifi issue #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3078 @patricker, this PR doesn't implement all the features requested originally in NIFI-4805, and discussed throughout the thread. Please take a look on comments from Andy, Mark and Martin. Missing features: gradual penalization on re-try, support of EL for defining penalization period ---
[GitHub] nifi issue #2954: NIFI-5514: Fixed bugs in MergeRecord around minimum thresh...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2954 +1 LGTM. Test case updated. Live test on local env (up to date) succeeded. Works as expected. Travis is failing for JP only (US and FR are OK). Can be merged. ---
[GitHub] nifi pull request #2954: NIFI-5514: Fixed bugs in MergeRecord around minimum...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2954#discussion_r219683471 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java --- @@ -304,13 +336,25 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory session.commit(); } +// If there is no more data queued up, complete any bin that meets our minimum threshold +int completedBins = 0; +final QueueSize queueSize = session.getQueueSize(); --- End diff -- @markap14 Test for MinRecords is successful, but actual flow doesn't work as expected and still doesn't respect Min Records setting. There is my set up: ![image](https://user-images.githubusercontent.com/19496093/45922181-efa6f880-be92-11e8-92b1-4f6daf756463.png) ![image](https://user-images.githubusercontent.com/19496093/45922183-02b9c880-be93-11e8-9565-fe8ef8908245.png) The problem is in different behavior of MockProcessSession and StandardProcessSession. MockProcessSession.getQueueSize() will return 0, after session.get(...) StandardProcessSession.getQueueSize() will return same number as before session.get(...), regardless flow files have been polled or not. As a result, this condition will block from actually emitting FF when min requirements are reached. I would recommend to change this condition to: if (flowFiles.size() != 0) {...} In parallel, we probably need to open a JIRA for inconsistency between MockProcessSession and StandardProcessSession. ---
[GitHub] nifi issue #2954: NIFI-5514: Fixed bugs in MergeRecord around minimum thresh...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2954 Reviewing... ---
[GitHub] nifi issue #3005: NIFI-5598: Allow JMS Processors to lookup Connection Facto...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3005 +1. Ready for merge. Tested JNDI lookup with Tibco Context Factory (with authentication). Successfully pulled records from my queue. ---
[GitHub] nifi issue #3005: NIFI-5598: Allow JMS Processors to lookup Connection Facto...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3005 I'll test it and will let you know results (I have JNDI server with authentication for JMS queues). ---
[GitHub] nifi pull request #3005: NIFI-5598: Allow JMS Processors to lookup Connectio...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3005#discussion_r218387623 --- Diff: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java --- @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Arrays; +import java.util.Hashtable; +import java.util.List; + +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; + +@Tags({"jms", "jndi", "messaging", "integration", "queue", "topic", "publish", "subscribe"}) +@CapabilityDescription("Provides a service to lookup an existing JMS ConnectionFactory using the Java Naming and Directory Interface (JNDI).") +@DynamicProperty( +description = "In order to perform a JNDI Lookup, an Initial Context must be established. When this is done, an Environment can be established for the context. Any dynamic/user-defined property" + +" that is added to this Controller Service will be added as an Environment configuration/variable to this Context.", +name = "The name of a JNDI Initial Context environment variable.", +value = "The value of the JNDI Initial Context Environment variable.", +expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) +@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS", "org.apache.nifi.jms.cf.JMSConnectionFactoryProvider"}) +public class JndiJmsConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition{ + +static final PropertyDescriptor INITIAL_NAMING_FACTORY_CLASS = new Builder() +.name("java.naming.factory.initial") +.displayName("Initial Naming Factory Class") +.description("The fully qualified class name of the Java Initial Naming Factory (java.naming.factory.initial).") +.addValidator(NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.required(true) +.build(); +static final PropertyDescriptor NAMING_PROVIDER_URL = new Builder() +.name("java.naming.provider.url") +.displayName("Naming Provider URL") +.description("The URL of the JNDI Naming Provider to use") +.required(true) +.addValidator(NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); +static final PropertyDescriptor CONNECTION_FACTORY_NAME = new Builder() +.name("connection.factory.name") +.displayName("Connection Factory Name") +.description("The name o
[GitHub] nifi pull request #3005: NIFI-5598: Allow JMS Processors to lookup Connectio...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3005#discussion_r218386884 --- Diff: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java --- @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Arrays; +import java.util.Hashtable; +import java.util.List; + +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; + +@Tags({"jms", "jndi", "messaging", "integration", "queue", "topic", "publish", "subscribe"}) +@CapabilityDescription("Provides a service to lookup an existing JMS ConnectionFactory using the Java Naming and Directory Interface (JNDI).") +@DynamicProperty( +description = "In order to perform a JNDI Lookup, an Initial Context must be established. When this is done, an Environment can be established for the context. Any dynamic/user-defined property" + +" that is added to this Controller Service will be added as an Environment configuration/variable to this Context.", +name = "The name of a JNDI Initial Context environment variable.", +value = "The value of the JNDI Initial Context Environment variable.", +expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) +@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS", "org.apache.nifi.jms.cf.JMSConnectionFactoryProvider"}) +public class JndiJmsConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition{ + +static final PropertyDescriptor INITIAL_NAMING_FACTORY_CLASS = new Builder() +.name("java.naming.factory.initial") +.displayName("Initial Naming Factory Class") +.description("The fully qualified class name of the Java Initial Naming Factory (java.naming.factory.initial).") +.addValidator(NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.required(true) +.build(); +static final PropertyDescriptor NAMING_PROVIDER_URL = new Builder() +.name("java.naming.provider.url") +.displayName("Naming Provider URL") +.description("The URL of the JNDI Naming Provider to use") +.required(true) +.addValidator(NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); +static final PropertyDescriptor CONNECTION_FACTORY_NAME = new Builder() +.name("connection.factory.name") +.displayName("Connection Factory Name") +.description("The name o
[GitHub] nifi issue #3008: NIFI-5492_EXEC Adding UDF to EL
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/3008 @joewitt , @alopresto , @mattyb149 Since we had discussions on this topic before, I would like to get your opinion. I've addressed all the issues related to security (separate class loader), API changes (there are no such), scope (separate interface implementation enforced). If that's not enough to feel safe on this, do not hesitate to trash this PR, or give me some feedback how it can be improved. Thank you! ---
[GitHub] nifi pull request #3008: NIFI-5492_EXEC Adding UDF to EL
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/3008 NIFI-5492_EXEC Adding UDF to EL **this PR adds new functinoality to expression language - user defined funtions.** Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-5492_EXEC Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3008.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3008 commit 9cdb0f7b9fa963cfa29100d7b9ae76645388404b Author: Ed B Date: 2018-09-16T14:31:27Z NIFI-5492_EXEC Adding UDF to EL this PR adds new functinoality to expression language - user defined funtions. ---
[GitHub] nifi issue #2639: NIFI-4906 Add GetHDFSFileInfo
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2639 @bbende , thank you for your comments! I've made the changes to address all your concerns. Please review once you have a time. ---
[GitHub] nifi pull request #2639: NIFI-4906 Add GetHDFSFileInfo
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2639#discussion_r195907971 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java --- @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files and directories from HDFS. " ++ "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " ++ "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " ++ "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " +) +@WritesAttributes({ +@WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), +@WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " ++ "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), +@WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), +@WritesAttribute(attribute="hdfs.owner", description="T
[GitHub] nifi pull request #2639: NIFI-4906 Add GetHDFSFileInfo
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2639#discussion_r195906907 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java --- @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files and directories from HDFS. " ++ "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " ++ "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " ++ "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " +) +@WritesAttributes({ +@WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), +@WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " ++ "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), +@WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), +@WritesAttribute(attribute="hdfs.owner", description="T
[GitHub] nifi pull request #2639: NIFI-4906 Add GetHDFSFileInfo
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2639#discussion_r195906819 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java --- @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files and directories from HDFS. " ++ "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " ++ "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " ++ "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " +) +@WritesAttributes({ +@WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), +@WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " ++ "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), +@WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), +@WritesAttribute(attribute="hdfs.owner", description="T
[GitHub] nifi pull request #2639: NIFI-4906 Add GetHDFSFileInfo
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2639#discussion_r195906462 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java --- @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@TriggerSerially +@TriggerWhenEmpty --- End diff -- @bbende , removed TriggerWhenEmpty and TriggerSerially. Copy/paste from statefull processor, no need in this case. ---
[GitHub] nifi pull request #2639: NIFI-4906 Add GetHDFSFileInfo
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2639#discussion_r195905944 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java --- @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files and directories from HDFS. " ++ "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " ++ "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " ++ "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " +) +@WritesAttributes({ +@WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), +@WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " ++ "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), +@WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), +@WritesAttribute(attribute="hdfs.owner", description="T
[GitHub] nifi issue #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2711 @joetrite , no more comments from my side, thanks for addressing code review comments! @MikeThomsen , LGTM. Could you please give a second look on it now? I believe it's ready, but there are issues with build, which seems not related to this PR. ---
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190992414 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -64,17 +62,18 @@ "If the attribute value contains a comma, newline or double quote, then the attribute value will be " + "escaped with double quotes. Any double quote characters in the attribute value are escaped with " + "another double quote.") -@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +@WritesAttribute(attribute = "CSVData", description = "CSV representation of Attributes") public class AttributesToCSV extends AbstractProcessor { -private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String DATA_ATTRIBUTE_NAME = "CSVData"; +private static final String SCHEMA_ATTRIBUTE_NAME = "CSVSchema"; --- End diff -- This attribute should be declared @ WritesAttribute() ---
[GitHub] nifi issue #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2711 @joetrite , @MikeThomsen , So, I tested the processor. It works OK. But I've got a question. In JSON it's easy - json structure will include both attribute name and attribute value. in CSV - there will be only values on the output. Don't you think it would be useful to add a header, or attribute with avro-like generated schema? I agree that specified attributes will be in provided order, but if regex is used - then user won't be able to relate a value to a name. The same is about core attributes - since we add only existing not empty. If we decide to add avro-like schema, we gonna have problems - attribute names could be non Avro-safe. ---
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190750155 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.Collections; +import java.util.Arrays; +import java.util.ArrayList; + + + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; +private static final String SPLIT_REGEX = OUTPUT_SEPARATOR + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"; + +static final AllowableValue OUTPUT_OVERWRITE_CONTENT = new AllowableValue("flowfile-content", "flowfile-content", "The resulting CSV string will be placed into the content of the flowfile." + +"Existing flowfile context will be overwritten. 'CSVAttributes' will not be written to at all (neither null nor empty string)."); +static final AllowableValue OUTPUT_NEW_ATTRIBUTE= new AllowableValue("flowfile-attribute", "flowfile-attribute", "The resulting CSV string will be placed into a new flowfile" + +" attribute named 'CS
[GitHub] nifi pull request #2695: NIFI-5044 SelectHiveQL accept only one statement
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2695#discussion_r190653106 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java --- @@ -198,6 +200,51 @@ public void testWithSqlException() throws SQLException { runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); } +@Test +public void invokeOnTriggerExceptionInPreQieriesNoIncomingFlows() +throws InitializationException, ClassNotFoundException, SQLException, IOException { + +doOnTrigger(QUERY_WITHOUT_EL, false, CSV, +"select 'no exception' from persons; select exception from persons", +null); + +runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); --- End diff -- >In general the behavior should remain the same whenever possible Currently, if there is an error in SQL Query - it will go to "failure" relationship (even if there are no incoming connections) ![image](https://user-images.githubusercontent.com/19496093/40499024-ae42cbec-5f4e-11e8-9ff0-348dd6793b2a.png) ![image](https://user-images.githubusercontent.com/19496093/40498559-68c064c2-5f4d-11e8-9a18-88fcb082b18e.png) So, I follow current error handling strategy. It's just wasn't accurate about: > since we weren't issuing a flow file on failure before because we do issue FF on failure (on establishing connection it is different though, and not impacted by this change). @mattyb149 , Any word on this? ---
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190298869 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190301498 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190301930 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190305079 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190297210 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190296854 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") --- End diff -- In case a destination is 'content', CSVAttributes won't be written at all (neither null nor empty string). We could mention that in description. ---
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190303622 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190300030 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190297408 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi pull request #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190296970 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + +"can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + +"If the attribute value contains a comma, newline or double quote, then the attribute value will be " + +"escaped with double quotes. Any double quote characters in the attribute value are escaped with " + +"another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + +"attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + +private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; +private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; +private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; +private static final String OUTPUT_SEPARATOR = ","; +private static final String OUTPUT_MIME_TYPE = "text/csv"; + + +public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() +.name("attribute-list") +.displayName("Attribute List") +.description("Comma separated list of attributes to be included in the resulting CSV. If this value " + +"is left empty then all existing Attributes will be included. This list of attributes is " + +"case sensitive and does not support attribute names that contain commas. If an attribute
[GitHub] nifi issue #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2711 @joetrite , while AttributesToJSON has regex support for attributes, new AttributesToCSV doesn't have it - is there a reason for not keeping consistency? ---
[GitHub] nifi issue #2711: NIFI-1705 - Adding AttributesToCSV processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2711 @joetrite , I would like to mention it under the current PR - there was a discussion about having ["record-aware" output](https://github.com/apache/nifi/pull/1589#issuecomment-383639761). This will need to be addressed, but I believe is not part of this PR, as there are use cases when having schema will limit capabilities of this processor. I.e. when list of attributes is empty, it is expected to get all flow file attributes on output, but the list is unknown, so a schema cannot be pre-defined. @mattyb149 , as alternative, we can issue enhancement jira for AttributeToRecord to address schema-specific extracts. ---
[GitHub] nifi pull request #2734: NIFI-5230: Fixed NPE in InvokeScriptedProcessor on ...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2734#discussion_r190286397 --- Diff: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java --- @@ -237,7 +237,7 @@ public void setup() { @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { --- End diff -- @ottobackwards , regarding Set vs List. Validation Result supposed to be Collection, so technically - any of them can be used. But consistency should come with Set, as if you check super class AbstractConfigurableComponent, its customValidate returns: return Collections.emptySet(); So, I would say, we need to refactor ArrayList into Set, but then there will be more potential regression impact. I'd recommend to keep it as is for this PR/Jira, and have another ticket opened on cosmetic improvement to change ArrayList to to some Set implementation during initialization. I hope it makes sense. ---
[GitHub] nifi pull request #2734: NIFI-5230: Fixed NPE in InvokeScriptedProcessor on ...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2734#discussion_r190283975 --- Diff: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java --- @@ -237,7 +237,7 @@ public void setup() { @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { --- End diff -- @ottobackwards , when a processor is invalid (customValidate returned validation errors), we keep all the validationResults. Method 'customValidate' just check if there are already errors - no need to validate again, just return whatever is stored. When any property is getting modified - these validation results may not be accurate anymore, so we have to reset them. Once reset (onPropertyModified), next call to customValidate will run full validation and set new errors if found. So at any point we don't drop validation results for no reason. Am I missing anything? ---
[GitHub] nifi pull request #2718: NIFI-5213: Allow AvroReader to process files w embe...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2718#discussion_r189422180 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java --- @@ -17,33 +17,61 @@ package org.apache.nifi.avro; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.commons.io.input.TeeInputStream; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final InputStream in; private final RecordSchema recordSchema; private final DatumReader datumReader; -private final BinaryDecoder decoder; +private BinaryDecoder decoder; private GenericRecord genericRecord; +private DataFileStream dataFileStream; -public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException { +public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException { this.in = in; this.recordSchema = recordSchema; -datumReader = new GenericDatumReader(avroSchema); -decoder = DecoderFactory.get().binaryDecoder(in, null); +datumReader = new GenericDatumReader<>(avroSchema); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +TeeInputStream teeInputStream = new TeeInputStream(in, baos); +// Try to parse as a DataFileStream, if it works, glue the streams back together and delegate calls to the DataFileStream +try { +dataFileStream = new DataFileStream<>(teeInputStream, new GenericDatumReader<>()); +} catch (IOException ioe) { +// Carry on, hopefully a raw Avro file +// Need to be able to re-read the bytes read so far, and the InputStream passed in doesn't support reset. Use the TeeInputStream in +// conjunction with SequenceInputStream to glue the two streams back together for future reading +ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); +SequenceInputStream sis = new SequenceInputStream(bais, in); +decoder = DecoderFactory.get().binaryDecoder(sis, null); +} +if (dataFileStream != null) { +// Verify the schemas are the same +Schema embeddedSchema = dataFileStream.getSchema(); +if (!embeddedSchema.equals(avroSchema)) { +throw new IOException("Explicit schema does not match embedded schema"); --- End diff -- Would it be better to throw SchemaValidationException - makes more sense than IOException? ---
[GitHub] nifi pull request #2695: NIFI-5044 SelectHiveQL accept only one statement
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2695#discussion_r189075604 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java --- @@ -311,7 +340,15 @@ private void onTrigger(final ProcessContext context, final ProcessSession sessio try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement()) ) { - +Pair<String,SQLException> failure = executeConfigStatements(con, preQueries); +if (failure != null) { +// In case of failure, assigning config query to "selectQuery" var will allow to avoid major changes in error handling (catch block), --- End diff -- @pvillard31 , I think if I just rename this var into "hqlStatement" - that will remove confusions and make reusing it for pre- and post- queries natural. Going to push updates for this. ---
[GitHub] nifi pull request #2695: NIFI-5044 SelectHiveQL accept only one statement
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2695#discussion_r189008316 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java --- @@ -113,6 +126,17 @@ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); +public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder() +.name("hive-post-query") +.displayName("HiveQL Post-Query") +.description("HiveQL post-query to execute. Semicolon-delimited list of queries. " ++ "Example: 'set tez.queue.name=default; set hive.exec.orc.split.strategy=HYBRID; set hive.exec.reducers.bytes.per.reducer=258998272'. " ++ "Note, the results/outputs of these queries will be suppressed if successful executed.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) --- End diff -- Well, would be interesting to have a parser here, but then it should be global, and include also main query ("select" statement). I believe it can be done, but out of scope for this change. We can have another Improvement request in Jira to have new HiveQL validator for all Hive related processors. ---
[GitHub] nifi pull request #2695: NIFI-5044 SelectHiveQL accept only one statement
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2695#discussion_r189005203 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java --- @@ -113,6 +126,17 @@ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); +public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder() +.name("hive-post-query") +.displayName("HiveQL Post-Query") +.description("HiveQL post-query to execute. Semicolon-delimited list of queries. " ++ "Example: 'set tez.queue.name=default; set hive.exec.orc.split.strategy=HYBRID; set hive.exec.reducers.bytes.per.reducer=258998272'. " --- End diff -- In my opinion - there shouldn't be "post" queries at all, as after "select" it doesn't make sense to run anything. But based on discussion, there is a demand for "post-queries", that's why we implement. I'll be happy to get any suggestion on what could be a good example for post-query :) ---
[GitHub] nifi issue #2679: NIFI-5141: Updated regex for doubles to allow for numbers ...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2679 +1. No more comments from my side. ---
[GitHub] nifi issue #2701: NIFI-5194: Ensure that even if calling KafkaConsumer.resum...
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2701 +1 LGTM. For committers: Related to Kafka 0.9 only. Other implementations (0.10,0.11 and 1.0) don't use consumer's "resume" API explicitly. ---
[GitHub] nifi pull request #2695: NIFI-5044 SelectHiveQL accept only one statement
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2695 NIFI-5044 SelectHiveQL accept only one statement SelectHiveQL support only single SELECT statement. This change adds support for pre- and post- select statements. It will be useful for configuration queries, i.e. "set tez.queue.name=default", and others. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically master)? - [X] Is your initial contribution a single, squashed commit? ### For code changes: - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [X] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [X] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-5044_SelectHiveQL_ Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2695.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2695 commit 5e4c2b00405418fc4673e851740129e94f59caab Author: Ed B <eberezitsky@...> Date: 2018-05-13T05:24:09Z NIFI-5044 SelectHiveQL accept only one statement SelectHiveQL support only single SELECT statement. This change adds support for pre- and post- select statements. It will be useful for configuration queries, i.e. "set tez.queue.name=default", and others. ---
[GitHub] nifi pull request #2679: NIFI-5141: Updated regex for doubles to allow for n...
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2679#discussion_r186609421 --- Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java --- @@ -60,19 +60,19 @@ private static final String Infinity = "(Infinity)"; private static final String NotANumber = "(NaN)"; -private static final String Base10Digits = "\\d+"; -private static final String Base10Decimal = "\\." + Base10Digits; -private static final String OptionalBase10Decimal = Base10Decimal + "?"; +private static final String Base10Digits = "\\d+"; +private static final String Base10Decimal = "\\." + Base10Digits; +private static final String OptionalBase10Decimal = "(" + Base10Decimal + ")?"; -private static final String Base10Exponent = "[eE]" + OptionalSign + Base10Digits; +private static final String Base10Exponent = "[eE]" + OptionalSign + Base10Digits; private static final String OptionalBase10Exponent = "(" + Base10Exponent + ")?"; private static final String doubleRegex = OptionalSign + "(" + Infinity + "|" + NotANumber + "|"+ -"(" + Base10Digits + Base10Decimal + ")" + "|" + +"(" + Base10Digits + OptionalBase10Decimal + ")" + "|" + "(" + Base10Digits + OptionalBase10Decimal + Base10Exponent + ")" + "|" + --- End diff -- I think it'd be nice to have also: "(" + Base10Digits + "\\." + ")" + "|" + ---
[GitHub] nifi pull request #2639: NIFI-4906 Add GetHDFSFileInfo
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2639 NIFI-4906 Add GetHDFSFileInfo NIFI-4906: Added support to scan HDFS to get files and directories information without pulling the files. --- Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-4906-Add-GetHdfsFileInfo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2639 commit 8f371524ca9ee09af288e0883e1e3f3861034af9 Author: Ed <edward.berezitsky@...> Date: 2018-04-17T14:26:50Z NIFI-4906 Add GetHDFSFileInfo ---
[GitHub] nifi pull request #2584: NIFI-4388: Modules Not Honored
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2584 NIFI-4388: Modules Not Honored Modules aren't honored. The bug is not reproducible in Jython (it handles modules with every script reload). But Groovy loads JARs and dirs with classes only on setup. Cannot provide test cases for Groovy, because it requires custom JARs to be provided as part of the package. - Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-4388_ISP_Modules_Reload Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2584.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2584 commit 0280f59a173e82df926acc9c6f50ef3b5302c9fa Author: Ed <edward.berezitsky@...> Date: 2018-03-27T04:22:12Z NIFI-4388: Modules Not Honored Modules aren't honored. The bug is not reproducible in Jython (it handles modules with every script reload). But Groovy loads JARs and dirs with classes only on setup. Cannot provide test cases for Groovy, because it requires custom JARs to be provided as part of the package. ---
[GitHub] nifi issue #2537: fix printing indefinite log errors
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2537 @mattyb149 , can you please review the changes? ---
[GitHub] nifi pull request #2537: fix printing indefinite log errors
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2537 fix printing indefinite log errors Added fix to avoid printing indefinite errors to logs in customValidate after first failed validation until any property is modified. - Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-4968-ISP-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2537.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2537 commit e105705da30ac72551433ddf2027d90a590ae73a Author: Ed <edward.berezitsky@...> Date: 2018-03-13T05:49:04Z fix printing indefinite log errors After first failure in customValidate, stop printing logs until any property is changed ---
[GitHub] nifi issue #2527: FetchHBaseRow - log level and displayName
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2527 @pvillard31 , @bbende In case we want to have bulletin to be generated, we can change "Bulletin Level" of the component to "DEBUG". But then bulletin will be generated even if rowkey is found. I would rather remove lines 276-278, to have clean debug level bulletin for "not found" only. Your thoughts? ---
[GitHub] nifi issue #2527: FetchHBaseRow - log level and displayName
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2527 oh, I see now. so it was about my first commit, when I added "displayName" and changed the "name" as per standard. But after Pierre's comment removed those fixes. Thanks for taking a look anyway! ---
[GitHub] nifi issue #2527: FetchHBaseRow - log level and displayName
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2527 @MikeThomsen , I think since it is already in 1.5 and ppl started using it, it could be a case, when implementation can use rest APIs to work with these processors. And since we are changing "name" property (which cannot be controlled by user), it can create a problem with those eixsting scripts. Also, name is a part of flow.xml.gz, that will also impact existing implementations and make regression impact. So I had to agree to Pierre comment about regression. ---
[GitHub] nifi pull request #2527: FetchHBaseRow - log level and displayName
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2527 FetchHBaseRow - log level and displayName update log level for "not found" to DEBUG instead of ERROR, and added displayName to all property descriptors. - Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-4953-FetchHBaseRaw-logs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2527.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2527 commit 827759a7fc32bb421e8e0011a2815201f60e1086 Author: Ed <edward.berezitsky@...> Date: 2018-03-09T17:06:34Z FetchHBaseRow - log level and displayName update log level for "not found" to DEBUG instead of ERROR, and added displayName to all property descriptors ---
[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @bbende , Bryan, committed the changes. Tested on a cluster, works as expected. When have a time, please review. ---
[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @bbende , thank you! Both comments make sense. Will commit these changes soon. ---
[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @bbende , I addressed all the comments. Thank you and let me know if you see more issues or have some recommendations/suggestions. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170700319 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcesso
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170697126 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcesso
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169749383 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractPr
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169732184 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) --- End diff -- I was thinking about that before. Couldn't really decide, and then I took a look at DeleteHBaseRow and FetchHBaseRow and decided to keep it consistent. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169680760 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.enqueue("trigger flow file"); +runner.run(); + +runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testResultsNotFound() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setPr
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169677516 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); --- End diff -- Not setting a value for "hbase.table" is intentional. This test is for failure handling if expression is invalid (cannot be evaluated). You can see that FF expected at REL_FAILURE without scans. If I just didn't understand what you meant, please let me know. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169176478 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractPr
[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @MikeThomsen , are you available to re-review this PR? I have addressed you comment regarding branch and the rest (except for labels, which can be added later in bulk for all the HBase related processors) ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2478 NIFI-4833 Add scanHBase Processor Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-4833-Add-ScanHBase-processor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2478 commit 39bd6fb5d02eb7dca63830967823a4bb48c5712c Author: Ed <edward.berezitsky@...> Date: 2018-02-17T21:26:04Z Add ScanHBase Processor New processor for scanning HBase records based on verious params like range of rowkeys, range of timestamps. Supports result limit and reverse scan. commit d2f5410be14a77f64e7ca5593e6c908620a8da58 Author: Ed <edward.berezitsky@...> Date: 2018-02-17T21:27:18Z Adds Atlas Support for ScanHBase processor Adds Atlas Support for ScanHBase processor ---
[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor
Github user bdesert closed the pull request at: https://github.com/apache/nifi/pull/2446 ---
[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165449601 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhan
[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165449486 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhan
[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165448494 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhan
[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165447440 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhan
[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2446 NIFI-4833 Add ScanHBase processor ### Description: Add new processor ScanHBase and a test package. -- Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [v] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [v] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [v] Has your PR been rebased against the latest commit within the target branch (typically master)? - [v] Is your initial contribution a single, squashed commit? ### For code changes: - [v] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [v] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [v] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2446.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2446 commit 7a9dc565f43284a8535de052a812a252c6950613 Author: Ed <edward.berezitsky@...> Date: 2018-01-31T21:20:35Z NIFI-4833 Add ScanHBase processor ---