[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226029162 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226028831 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226029040 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226028907 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; --- End diff -- ```suggestion ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226035394 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"penalty", "penalize", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Penalizes a FlowFile.") --- End diff -- ```suggestion @CapabilityDescription("This processor provides capability to penalize flow files. " + "Every flow file will be penalized as per 'Penalty Duration' property of the processor.") ``` ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226012968 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"penalty", "penalize", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Penalizes a FlowFile.") +@WritesAttributes({ +@WritesAttribute(attribute = "penalization.count.{processor uuid}", description = "How many times this processor has penalized this FlowFile.") +}) + +public class PenalizeFlowFile extends AbstractProcessor { +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successfully penalized FlowFile").build(); + +private List properties; +private Set relationships; + +@Override +protected void init(final ProcessorInitializationContext context) { +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +this.relationships = Collections.unmodifiableSet(relationships); +} +@Override +public Set getRelationships() { +return relationships; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { +FlowFile flowFile = session.get(); +if (flowFile == null) { +return; +} + +// Track how many times a FlowFile passes through this processor to better support the Retry use case +final String retryAttrName = "penalization.count." + this.getIdentifier(); +final String initialCount = flowFile.getAttribute(retryAttrName); +long cnt = 0; +if(initialCount != null) { +cnt = Long.parseLong(initialCount); +} + +cnt++; + +flowFile = session.putAttribute(flowFile, retryAttrName, Long.toString(cnt)); --- End diff -- I agree. I've updated the PR. All of those other features related to retry should be handled under NIFI-3792. It sounds like there is a processor setup, he's planning to PR it soon. ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226004572 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"penalty", "penalize", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Penalizes a FlowFile.") +@WritesAttributes({ +@WritesAttribute(attribute = "penalization.count.{processor uuid}", description = "How many times this processor has penalized this FlowFile.") +}) + +public class PenalizeFlowFile extends AbstractProcessor { +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successfully penalized FlowFile").build(); + +private List properties; +private Set relationships; + +@Override +protected void init(final ProcessorInitializationContext context) { +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +this.relationships = Collections.unmodifiableSet(relationships); +} +@Override +public Set getRelationships() { +return relationships; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { +FlowFile flowFile = session.get(); +if (flowFile == null) { +return; +} + +// Track how many times a FlowFile passes through this processor to better support the Retry use case +final String retryAttrName = "penalization.count." + this.getIdentifier(); +final String initialCount = flowFile.getAttribute(retryAttrName); +long cnt = 0; +if(initialCount != null) { +cnt = Long.parseLong(initialCount); +} + +cnt++; + +flowFile = session.putAttribute(flowFile, retryAttrName, Long.toString(cnt)); --- End diff -- as discussed under JIRA, if this processor goes for penalizing only (without re-try functionality), then I'd remove lines 77-86 at all. retry capabilities should be implemented then in separate processor. ---
[GitHub] nifi pull request #3078: NIFI-4805 Allow Delayed Transfer
GitHub user patricker opened a pull request: https://github.com/apache/nifi/pull/3078 NIFI-4805 Allow Delayed Transfer Went looking for this processor, found a work around, and found that others had already discussed building one throughout 2018 (see ticket/email thread from today, October 15th). Happy to make changes, feedback welcome. ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/patricker/nifi NIFI-4805 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3078.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3078 commit badfe8651651e5cefe1d2447458ff43be7ba0cdc Author: patricker Date: 2018-10-16T02:55:20Z NIFI-4805 Allow Delayed Transfer ---