[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user asfgit closed the pull request at: https://github.com/apache/nifi-minifi/pull/50 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86903300 --- Diff: minifi-docs/src/main/markdown/System_Admin_Guide.md --- @@ -391,6 +391,39 @@ Within the Processor Configuration section, there is the `Properties` subsection State File: ./conf/state/tail-file Initial Start Position: Beginning of File +## Process Groups + +Process groups can be nested from the top level. They can contain other process groups as well and can be used to logically group related operations. + +*Property* | *Description* +--- | - +name| The name of what this process group will do. +id | The id of this process group. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) +Processors | The processors contained in this Process Group. (Defined above) +Remote Processing Groups| The remote processing groups contained in this Process Group. (Defined below) +Connections | The connections contained in this Process Group. (Defined below) +Input Ports | The input ports contained in this Process Group. (Defined below) +Output Ports| The output ports contained in this Process Group. (Defined below) +Process Groups | The child Process Groups contained in this Process Group. + +## Input Ports --- End diff -- Similar to what we had with PGs, during validation we should check if the root PG has input/output ports (and fail if so) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user brosander commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86796647 --- Diff: minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml --- @@ -0,0 +1,276 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MiNiFi Config Version: 2 +Flow Controller: + name: ProcessGroupsAndRemoteProcessGroups + comment: '' +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: +threshold: 2 +in period: 5 sec +in threads: 1 +out period: 5 sec +out threads: 4 +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false +Provenance Repository: + provenance rollover time: 1 min +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min +Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: +key: +algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL +provider: BC +Processors: +- id: 207748d1-0158-1000-- + name: GenerateFlowFile + class: org.apache.nifi.processors.standard.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: +Batch Size: '1' +Data Format: Binary +File Size: 1 b +Unique FlowFiles: 'false' +- id: 2079e8bd-0158-1000-- + name: LogAttribute + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + - success + Properties: +Attributes to Ignore: +Attributes to Log: +Log Level: info +Log Payload: 'false' +Log prefix: +- id: 2077ab1e-0158-1000-- + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: +Delete Attributes Expression: +top: top +Process Groups: +- id: 207888b1-0158-1000-- + name: middle + Processors: + - id: 2078f34e-0158-1000-- +name: UpdateAttribute +class: org.apache.nifi.processors.attributes.UpdateAttribute +max concurrent tasks: 1 +scheduling strategy: TIMER_DRIVEN +scheduling period: 0 sec +penalization period: 30 sec +yield period: 1 sec +run duration nanos: 0 +auto-terminated relationships list: [] +Properties: + Delete Attributes Expression: + middle: middle + Process Groups: + - id: 20794cd4-0158-1000-- +name: bottom +Processors: +- id: 207a89ba-0158-1000-- + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user brosander commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86797662 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java --- @@ -31,41 +33,33 @@ import java.util.stream.Collectors; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY; -/** - * - */ public class ConfigSchema extends BaseSchema implements WritableSchema, ConvertableSchema { -public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in Processors and Remote Input Ports: "; public static final int CONFIG_VERSION = 2; public static final String VERSION = "MiNiFi Config Version"; +public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: "; +public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in Processors and Remote Input Ports: "; --- End diff -- Updating wording. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user brosander commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86786792 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java --- @@ -50,17 +48,18 @@ public RemoteProcessingGroupSchema(Map map) { name = getRequiredKeyAsType(map, NAME_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); -url = getRequiredKeyAsType(map, URL_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); -inputPorts = convertListToType(getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, REMOTE_PROCESSING_GROUPS_KEY), "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY); +String wrapperName = "RemoteProcessingGroup(name: {name})".replace("{name}", StringUtil.isNullOrEmpty(name) ? "unknown" : name); --- End diff -- Changing to StringBuilder --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user brosander commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86786059 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java --- @@ -101,47 +89,60 @@ public ConfigSchema(Map map, List validationIssues) { addIssuesIfNotNull(contentRepositoryProperties); addIssuesIfNotNull(componentStatusRepositoryProperties); addIssuesIfNotNull(securityProperties); +addIssuesIfNotNull(processGroupSchema); addIssuesIfNotNull(provenanceReportingProperties); addIssuesIfNotNull(provenanceRepositorySchema); -addIssuesIfNotNull(processors); -addIssuesIfNotNull(connections); -addIssuesIfNotNull(remoteProcessingGroups); -Set processorIds = new HashSet<>(); -List processorIdList = processors.stream().map(ProcessorSchema::getId).collect(Collectors.toList()); -processorIds.addAll(processorIdList); +List allProcessGroups = getAllProcessGroups(processGroupSchema); +List allConnectionSchemas = allProcessGroups.stream().flatMap(p -> p.getConnections().stream()).collect(Collectors.toList()); +List allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList()); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, processorIdList); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, connections.stream().map(ConnectionSchema::getId).collect(Collectors.toList())); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, - remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList())); - -Set remoteInputPortIds = new HashSet<>(); -List remoteInputPortIdList = remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) +List allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList()); +List allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList()); +List allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList()); +List allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList()); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, remoteInputPortIdList); -remoteInputPortIds.addAll(remoteInputPortIdList); +List allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); +List allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); + +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds); --- End diff -- Adding checks for input as well as output (that check is only for remote input). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user brosander commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86783902 --- Diff: minifi-docs/src/main/markdown/System_Admin_Guide.md --- @@ -391,6 +391,39 @@ Within the Processor Configuration section, there is the `Properties` subsection State File: ./conf/state/tail-file Initial Start Position: Beginning of File +## Process Groups + +Process groups can be nested from the top level. They can contain other process groups as well and can be used to logically group related operations. + +*Property* | *Description* +--- | - +name| The name of what this process group will do. +id | The id of this process group. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) +Processors | The processors contained in this Process Group. (Defined above) +Remote Processing Groups| The remote processing groups contained in this Process Group. (Defined below) +Connections | The connections contained in this Process Group. (Defined below) +Input Ports | The input ports contained in this Process Group. (Defined below) +Output Ports| The output ports contained in this Process Group. (Defined below) +Process Groups | The child Process Groups contained in this Process Group. + +## Input Ports + +These ports provide input to the Process Group they reside on. + +*Property* | *Description* + | - +name | The name of what this input port will do. +id | The id of this input port. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) + +## Input Ports --- End diff -- Copy pasta, fixing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user brosander commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86783661 --- Diff: minifi-docs/src/main/markdown/System_Admin_Guide.md --- @@ -391,6 +391,39 @@ Within the Processor Configuration section, there is the `Properties` subsection State File: ./conf/state/tail-file Initial Start Position: Beginning of File +## Process Groups + +Process groups can be nested from the top level. They can contain other process groups as well and can be used to logically group related operations. + +*Property* | *Description* +--- | - +name| The name of what this process group will do. +id | The id of this process group. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) +Processors | The processors contained in this Process Group. (Defined above) +Remote Processing Groups| The remote processing groups contained in this Process Group. (Defined below) +Connections | The connections contained in this Process Group. (Defined below) +Input Ports | The input ports contained in this Process Group. (Defined below) +Output Ports| The output ports contained in this Process Group. (Defined below) +Process Groups | The child Process Groups contained in this Process Group. + +## Input Ports --- End diff -- Good point, adding clarifying text. Also wrote up improvement [MINIFI-133](https://issues.apache.org/jira/browse/MINIFI-133). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86612327 --- Diff: minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml --- @@ -0,0 +1,276 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MiNiFi Config Version: 2 +Flow Controller: + name: ProcessGroupsAndRemoteProcessGroups + comment: '' +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: +threshold: 2 +in period: 5 sec +in threads: 1 +out period: 5 sec +out threads: 4 +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false +Provenance Repository: + provenance rollover time: 1 min +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min +Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: +key: +algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL +provider: BC +Processors: +- id: 207748d1-0158-1000-- + name: GenerateFlowFile + class: org.apache.nifi.processors.standard.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: +Batch Size: '1' +Data Format: Binary +File Size: 1 b +Unique FlowFiles: 'false' +- id: 2079e8bd-0158-1000-- + name: LogAttribute + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + - success + Properties: +Attributes to Ignore: +Attributes to Log: +Log Level: info +Log Payload: 'false' +Log prefix: +- id: 2077ab1e-0158-1000-- + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: +Delete Attributes Expression: +top: top +Process Groups: +- id: 207888b1-0158-1000-- + name: middle + Processors: + - id: 2078f34e-0158-1000-- +name: UpdateAttribute +class: org.apache.nifi.processors.attributes.UpdateAttribute +max concurrent tasks: 1 +scheduling strategy: TIMER_DRIVEN +scheduling period: 0 sec +penalization period: 30 sec +yield period: 1 sec +run duration nanos: 0 +auto-terminated relationships list: [] +Properties: + Delete Attributes Expression: + middle: middle + Process Groups: + - id: 20794cd4-0158-1000-- +name: bottom +Processors: +- id: 207a89ba-0158-1000-- + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period:
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86615541 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java --- @@ -31,41 +33,33 @@ import java.util.stream.Collectors; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY; -/** - * - */ public class ConfigSchema extends BaseSchema implements WritableSchema, ConvertableSchema { -public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: "; -public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in Processors and Remote Input Ports: "; public static final int CONFIG_VERSION = 2; public static final String VERSION = "MiNiFi Config Version"; +public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: "; +public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in Processors and Remote Input Ports: "; --- End diff -- Where this message is used now covers all overlapping ids. This variable should be updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86620366 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java --- @@ -50,17 +48,18 @@ public RemoteProcessingGroupSchema(Map map) { name = getRequiredKeyAsType(map, NAME_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); -url = getRequiredKeyAsType(map, URL_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); -inputPorts = convertListToType(getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, REMOTE_PROCESSING_GROUPS_KEY), "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY); +String wrapperName = "RemoteProcessingGroup(name: {name})".replace("{name}", StringUtil.isNullOrEmpty(name) ? "unknown" : name); --- End diff -- This may be a basic question but why use ".replace" (which compiles a pattern) instead of just doing string concatenation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86617837 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java --- @@ -101,47 +89,60 @@ public ConfigSchema(Map map, List validationIssues) { addIssuesIfNotNull(contentRepositoryProperties); addIssuesIfNotNull(componentStatusRepositoryProperties); addIssuesIfNotNull(securityProperties); +addIssuesIfNotNull(processGroupSchema); addIssuesIfNotNull(provenanceReportingProperties); addIssuesIfNotNull(provenanceRepositorySchema); -addIssuesIfNotNull(processors); -addIssuesIfNotNull(connections); -addIssuesIfNotNull(remoteProcessingGroups); -Set processorIds = new HashSet<>(); -List processorIdList = processors.stream().map(ProcessorSchema::getId).collect(Collectors.toList()); -processorIds.addAll(processorIdList); +List allProcessGroups = getAllProcessGroups(processGroupSchema); +List allConnectionSchemas = allProcessGroups.stream().flatMap(p -> p.getConnections().stream()).collect(Collectors.toList()); +List allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList()); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, processorIdList); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, connections.stream().map(ConnectionSchema::getId).collect(Collectors.toList())); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, - remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList())); - -Set remoteInputPortIds = new HashSet<>(); -List remoteInputPortIdList = remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) +List allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList()); +List allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList()); +List allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList()); +List allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList()); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, remoteInputPortIdList); -remoteInputPortIds.addAll(remoteInputPortIdList); +List allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); +List allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); + +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds); --- End diff -- There doesn't appear to be a "checkForDuplicates" for output ports. Is that intended? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86610341 --- Diff: minifi-docs/src/main/markdown/System_Admin_Guide.md --- @@ -391,6 +391,39 @@ Within the Processor Configuration section, there is the `Properties` subsection State File: ./conf/state/tail-file Initial Start Position: Beginning of File +## Process Groups + +Process groups can be nested from the top level. They can contain other process groups as well and can be used to logically group related operations. + +*Property* | *Description* +--- | - +name| The name of what this process group will do. +id | The id of this process group. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) +Processors | The processors contained in this Process Group. (Defined above) +Remote Processing Groups| The remote processing groups contained in this Process Group. (Defined below) +Connections | The connections contained in this Process Group. (Defined below) +Input Ports | The input ports contained in this Process Group. (Defined below) +Output Ports| The output ports contained in this Process Group. (Defined below) +Process Groups | The child Process Groups contained in this Process Group. + +## Input Ports + +These ports provide input to the Process Group they reside on. + +*Property* | *Description* + | - +name | The name of what this input port will do. +id | The id of this input port. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) + +## Input Ports --- End diff -- I believe this should be "Output Ports" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86619586 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java --- @@ -50,21 +49,18 @@ private String queuePrioritizerClass; public ConnectionSchema(Map map) { -super(map, CONNECTIONS_KEY); +super(map, "Connection(id: {id}, name: {name})"); +String wrapperName = getWrapperName(); // In case of older version, these may not be available until after construction, validated in getValidationIssues() -sourceId = getOptionalKeyAsType(map, SOURCE_ID_KEY, String.class, CONNECTIONS_KEY, ""); -destinationId = getOptionalKeyAsType(map, DESTINATION_ID_KEY, String.class, CONNECTIONS_KEY, ""); - -sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, CONNECTIONS_KEY, new ArrayList<>()); -if (sourceRelationshipNames.isEmpty()) { -addValidationIssue("Expected at least one value in " + SOURCE_RELATIONSHIP_NAMES_KEY + " for " + CONNECTIONS_KEY + " " + getName()); -} - -maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, CONNECTIONS_KEY, DEFAULT_MAX_WORK_QUEUE_SIZE); -maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, CONNECTIONS_KEY, DEFAULT_MAX_QUEUE_DATA_SIZE); -flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, CONNECTIONS_KEY, DEFAULT_FLOWFILE_EXPIRATION); -queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, CONNECTIONS_KEY, ""); +sourceId = getOptionalKeyAsType(map, SOURCE_ID_KEY, String.class, wrapperName, ""); +destinationId = getOptionalKeyAsType(map, DESTINATION_ID_KEY, String.class, wrapperName, ""); + +sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, wrapperName, new ArrayList<>()); --- End diff -- Could you add a comment explaining that this could be empty if the source is a port? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86619023 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java --- @@ -101,47 +89,60 @@ public ConfigSchema(Map map, List validationIssues) { addIssuesIfNotNull(contentRepositoryProperties); addIssuesIfNotNull(componentStatusRepositoryProperties); addIssuesIfNotNull(securityProperties); +addIssuesIfNotNull(processGroupSchema); addIssuesIfNotNull(provenanceReportingProperties); addIssuesIfNotNull(provenanceRepositorySchema); -addIssuesIfNotNull(processors); -addIssuesIfNotNull(connections); -addIssuesIfNotNull(remoteProcessingGroups); -Set processorIds = new HashSet<>(); -List processorIdList = processors.stream().map(ProcessorSchema::getId).collect(Collectors.toList()); -processorIds.addAll(processorIdList); +List allProcessGroups = getAllProcessGroups(processGroupSchema); +List allConnectionSchemas = allProcessGroups.stream().flatMap(p -> p.getConnections().stream()).collect(Collectors.toList()); +List allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList()); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, processorIdList); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, connections.stream().map(ConnectionSchema::getId).collect(Collectors.toList())); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, - remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList())); - -Set remoteInputPortIds = new HashSet<>(); -List remoteInputPortIdList = remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) +List allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList()); +List allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList()); +List allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList()); +List allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList()); -checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, remoteInputPortIdList); -remoteInputPortIds.addAll(remoteInputPortIdList); +List allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); +List allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); + +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames); +checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds); -Set duplicateIds = new HashSet<>(processorIds); -duplicateIds.retainAll(remoteInputPortIds); -if (duplicateIds.size() > 0) { -addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + duplicateIds.stream().sorted().collect(Collectors.joining(", "))); +OverlapResults overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds)); +if (overlapResults.duplicates.size() > 0) { +addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", "))); } -Set connectableIds = new HashSet<>(processorIds); -connectableIds.addAll(remoteInputPortIds); -connections.forEach(c -> { +allConnectionSchemas.forEach(c -> { --- End diff -- This block checks if the connections go to any component in any process
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/50#discussion_r86611212 --- Diff: minifi-docs/src/main/markdown/System_Admin_Guide.md --- @@ -391,6 +391,39 @@ Within the Processor Configuration section, there is the `Properties` subsection State File: ./conf/state/tail-file Initial Start Position: Beginning of File +## Process Groups + +Process groups can be nested from the top level. They can contain other process groups as well and can be used to logically group related operations. + +*Property* | *Description* +--- | - +name| The name of what this process group will do. +id | The id of this process group. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) +Processors | The processors contained in this Process Group. (Defined above) +Remote Processing Groups| The remote processing groups contained in this Process Group. (Defined below) +Connections | The connections contained in this Process Group. (Defined below) +Input Ports | The input ports contained in this Process Group. (Defined below) +Output Ports| The output ports contained in this Process Group. (Defined below) +Process Groups | The child Process Groups contained in this Process Group. + +## Input Ports --- End diff -- Since in this PR we're just enabling internal Input/Output ports. This needs to be documented. That said, adding functionality for external I/O ports would probably be easy as a follow-up ticket (additional properties in the config.yml, validation, etc.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #50: MINIFI-107 - Process group support
GitHub user brosander opened a pull request: https://github.com/apache/nifi-minifi/pull/50 MINIFI-107 - Process group support Thank you for submitting a contribution to Apache NiFi - MiNiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi-minifi folder? - [x] Have you written or updated unit tests to verify your changes? - [x] - N/A - If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] - N/A - If applicable, have you updated the LICENSE file, including the main LICENSE file under minifi-assembly? - [x] - N/A - If applicable, have you updated the NOTICE file, including the main NOTICE file found under minifi-assembly? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/brosander/nifi-minifi MINIFI-107-av Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi/pull/50.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #50 commit 9c72f74b754b4a7519bd9cc205cf4fcb55cd5479 Author: Bryan RosanderDate: 2016-11-01T16:54:17Z MINIFI-107 - Process group support --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---