This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 140ac52e9d NIFI-12068: This closes #7737. Added the ability to annotate components with @UseCase and @MultiProcessorUseCase annotations; updated several processors to make use of them. 140ac52e9d is described below commit 140ac52e9d6434426ec679c5fa7258e0f1fba3c2 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Fri Sep 15 12:04:10 2023 -0400 NIFI-12068: This closes #7737. Added the ability to annotate components with @UseCase and @MultiProcessorUseCase annotations; updated several processors to make use of them. Signed-off-by: Joseph Witt <joew...@apache.org> --- .../documentation/MultiProcessorUseCase.java | 77 +++++ .../documentation/MultiProcessorUseCases.java | 36 +++ .../documentation/ProcessorConfiguration.java | 57 ++++ .../nifi/annotation/documentation/UseCase.java | 89 ++++++ .../nifi/annotation/documentation/UseCases.java | 36 +++ .../documentation/AbstractDocumentationWriter.java | 30 +- .../documentation/xml/XmlDocumentationWriter.java | 59 ++++ .../apache/nifi/extension/manifest/Extension.java | 26 ++ .../extension/manifest/MultiProcessorUseCase.java | 75 +++++ .../extension/manifest/ProcessorConfiguration.java | 43 +++ .../apache/nifi/extension/manifest/UseCase.java | 84 ++++++ .../nifi/processors/aws/s3/FetchS3Object.java | 88 +++++- .../html/HtmlDocumentationWriter.java | 136 +++++++++ .../nifi/processors/standard/CompressContent.java | 37 +++ .../nifi/processors/standard/ConvertRecord.java | 6 + .../nifi/processors/standard/MergeContent.java | 309 ++++++++++++--------- .../nifi/processors/standard/PartitionRecord.java | 35 ++- .../processors/standard/PutDatabaseRecord.java | 2 + .../nifi/processors/standard/QueryRecord.java | 43 ++- .../nifi/processors/standard/ReplaceText.java | 65 ++++- .../nifi/processors/standard/RouteOnAttribute.java | 81 +++++- .../apache/nifi/processors/standard/RouteText.java | 34 ++- .../nifi/processors/standard/UpdateRecord.java | 62 ++++- 23 files changed, 1361 insertions(+), 149 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/MultiProcessorUseCase.java b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/MultiProcessorUseCase.java new file mode 100644 index 0000000000..562647ea6a --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/MultiProcessorUseCase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.documentation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <p> + * An annotation that can be used for Processors in order to explain a specific use case that can be + * accomplished using this Processor in conjunction with at least one other Processor. + * For Processors that are able to be used for multiple use cases, the component + * may be annotated with multiple MultiProcessorUseCase annotations. + * </p> + * <p> + * Note that this annotation differs from {@link UseCase} in that UseCase should describe a use case that is + * accomplished using only the extension that is annotated. In contrast, MultiProcessorUseCase documents a use case + * that is accomplished by using both the Processor that is annotated as well as other Processors. + * </p> + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Repeatable(MultiProcessorUseCases.class) +public @interface MultiProcessorUseCase { + + /** + * A simple 1 (at most 2) sentence description of the use case. This should not include any extraneous details, such + * as caveats, examples, etc. Those can be provided using the {@link #notes()} method. + * + * @return a simple description of the use case + */ + String description(); + + /** + * Most of the time, 1-2 sentences is sufficient to describe a use case. Those 1-2 sentence should then be returned + * by the {@link #description()}. In the event that the description is not sufficient, details may be provided to + * further explain, by providing examples, caveats, etc. + * + * @return any important notes that pertain to the use case + */ + String notes() default ""; + + /** + * An optional array of keywords that can be associated with the use case. + * @return keywords associated with the use case + */ + String[] keywords() default {}; + + /** + * An array of {@link ProcessorConfiguration}s that are necessary in order to accomplish the task described in this use case. + * @return an array of processor configurations + */ + ProcessorConfiguration[] configurations(); + +} diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/MultiProcessorUseCases.java b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/MultiProcessorUseCases.java new file mode 100644 index 0000000000..a7ae09bd38 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/MultiProcessorUseCases.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.documentation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An enclosing annotation that can be used in order to use the {@link MultiProcessorUseCase} annotation in a repeated manner. + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface MultiProcessorUseCases { + MultiProcessorUseCase[] value(); +} diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/ProcessorConfiguration.java b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/ProcessorConfiguration.java new file mode 100644 index 0000000000..ec2ab1fbcd --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/ProcessorConfiguration.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.documentation; + + +import org.apache.nifi.processor.Processor; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An annotation that can be used in conjunction with {@link MultiProcessorUseCase} in order to specify the different + * components that are involved in a given use case. + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ProcessorConfiguration { + + /** + * Returns the class of the Processor that is to be used in the use case, if it is provided. Either the + * Processor Class or the Processor Class Name must be provided. + * + * @return the Processor's class, or <code>Processor</code> if the processor's classname is specified instead + */ + Class<? extends Processor> processorClass() default Processor.class; + + /** + * @return the fully qualified classname of the component + */ + String processorClassName() default ""; + + /** + * @return an explanation of how the Processor should be configured. + */ + String configuration(); +} diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/UseCase.java b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/UseCase.java new file mode 100644 index 0000000000..478ba1e34e --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/UseCase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.documentation; + +import org.apache.nifi.annotation.behavior.InputRequirement; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <p> + * An annotation that can be used for extension points in order to explain a specific use case that can be + * accomplished using this extension. For components that are able to be used for multiple use cases, the component + * may be annotated with multiple UseCase annotations. + * </p> + * <p> + * Note that this annotation differs from {@link CapabilityDescription} in that CapabilityDescription should describe the + * general purpose of the extension point. UseCase, on the other hand, documents one very specific use case that + * can be accomplished. Some extension points may use only a single UseCase annotation while others may accomplish + * many use cases. + * </p> + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Repeatable(UseCases.class) +public @interface UseCase { + + /** + * A simple 1 (at most 2) sentence description of the use case. This should not include any extraneous details, such + * as caveats, examples, etc. Those can be provided using the {@link #notes()} method. + * + * @return a simple description of the use case + */ + String description(); + + /** + * Most of the time, 1-2 sentences is sufficient to describe a use case. Those 1-2 sentence should then be returned + * by the {@link #description()}. In the event that the description is not sufficient, details may be provided to + * further explain, by providing examples, caveats, etc. + * + * @return any important notes that pertain to the use case + */ + String notes() default ""; + + /** + * Most Processors specify an InputRequirement of either {@link InputRequirement.Requirement#INPUT_REQUIRED INPUT_REQUIRED} + * or {@link InputRequirement.Requirement#INPUT_FORBIDDEN}. However, some Processors use {@link InputRequirement.Requirement#INPUT_ALLOWED} + * because some use cases require input while others do not. The inputRequirement here is only relevant for Processors that use + * an InputRequirement of {@link InputRequirement.Requirement#INPUT_ALLOWED} and can indicate whether or not the Processor should have + * input (aka incoming Connections) for this particular use case. + * + * @return the {@link InputRequirement} that corresponds to this use case. + */ + InputRequirement.Requirement inputRequirement() default InputRequirement.Requirement.INPUT_ALLOWED; + + /** + * An optional array of keywords that can be associated with the use case. + * @return keywords associated with the use case + */ + String[] keywords() default {}; + + /** + * A description of how to configure the extension for this particular use case. + * @return a description of how to configure the extension for this particular use case. + */ + String configuration() default ""; +} diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/UseCases.java b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/UseCases.java new file mode 100644 index 0000000000..f64c8e5d00 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/UseCases.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.documentation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An enclosing annotation that can be used in order to use the {@link UseCase} annotation in a repeated manner. + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface UseCases { + UseCase[] value(); +} diff --git a/nifi-api/src/main/java/org/apache/nifi/documentation/AbstractDocumentationWriter.java b/nifi-api/src/main/java/org/apache/nifi/documentation/AbstractDocumentationWriter.java index f2b311b1a5..2b951e360d 100644 --- a/nifi-api/src/main/java/org/apache/nifi/documentation/AbstractDocumentationWriter.java +++ b/nifi-api/src/main/java/org/apache/nifi/documentation/AbstractDocumentationWriter.java @@ -38,8 +38,10 @@ import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; @@ -167,6 +169,8 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat writeRestrictedInfo(component.getClass().getAnnotation(Restricted.class)); writeInputRequirementInfo(getInputRequirement(component)); writeSystemResourceConsiderationInfo(getSystemResourceConsiderations(component)); + writeUseCases(getUseCases(component)); + writeMultiProcessorUseCases(getMultiProcessorUseCases(component)); writeSeeAlso(component.getClass().getAnnotation(SeeAlso.class)); writeDefaultSchedule(component.getClass().getAnnotation(DefaultSchedule.class)); } @@ -251,13 +255,31 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat private List<SystemResourceConsideration> getSystemResourceConsiderations(final ConfigurableComponent component) { SystemResourceConsideration[] systemResourceConsiderations = component.getClass().getAnnotationsByType(SystemResourceConsideration.class); - if (systemResourceConsiderations == null) { + if (systemResourceConsiderations.length == 0) { return Collections.emptyList(); } return Arrays.asList(systemResourceConsiderations); } + private List<UseCase> getUseCases(final ConfigurableComponent component) { + UseCase[] useCases = component.getClass().getAnnotationsByType(UseCase.class); + if (useCases.length == 0) { + return Collections.emptyList(); + } + + return Arrays.asList(useCases); + } + + private List<MultiProcessorUseCase> getMultiProcessorUseCases(final ConfigurableComponent component) { + MultiProcessorUseCase[] useCases = component.getClass().getAnnotationsByType(MultiProcessorUseCase.class); + if (useCases.length == 0) { + return Collections.emptyList(); + } + + return Arrays.asList(useCases); + } + protected ExtensionType getExtensionType(final ConfigurableComponent component) { if (component instanceof Processor) { return ExtensionType.PROCESSOR; @@ -268,7 +290,7 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat if (component instanceof ReportingTask) { return ExtensionType.REPORTING_TASK; } - if (component instanceof ReportingTask) { + if (component instanceof FlowAnalysisRule) { return ExtensionType.FLOW_ANALYSIS_RULE; } if (component instanceof ParameterProvider) { @@ -305,6 +327,10 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat protected abstract void writeSeeAlso(SeeAlso seeAlso) throws IOException; + protected abstract void writeUseCases(List<UseCase> useCases) throws IOException; + + protected abstract void writeMultiProcessorUseCases(List<MultiProcessorUseCase> useCases) throws IOException; + protected abstract void writeDefaultSchedule(DefaultSchedule defaultSchedule) throws IOException; // Processor-specific methods diff --git a/nifi-api/src/main/java/org/apache/nifi/documentation/xml/XmlDocumentationWriter.java b/nifi-api/src/main/java/org/apache/nifi/documentation/xml/XmlDocumentationWriter.java index e374e45f07..f2ba07d105 100644 --- a/nifi-api/src/main/java/org/apache/nifi/documentation/xml/XmlDocumentationWriter.java +++ b/nifi-api/src/main/java/org/apache/nifi/documentation/xml/XmlDocumentationWriter.java @@ -35,7 +35,10 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDependency; @@ -371,6 +374,62 @@ public class XmlDocumentationWriter extends AbstractDocumentationWriter { writeTextArray("seeAlso", "see", toSee); } + @Override + protected void writeUseCases(final List<UseCase> useCases) throws IOException { + if (useCases.isEmpty()) { + return; + } + + writeArray("useCases", useCases, this::writeUseCase); + } + + private void writeUseCase(final UseCase useCase) throws IOException { + writeStartElement("useCase"); + + writeTextElement("description", useCase.description()); + writeTextElement("notes", useCase.notes()); + writeTextArray("keywords", "keyword", Arrays.asList(useCase.keywords())); + writeTextElement("inputRequirement", useCase.inputRequirement().name()); + writeTextElement("configuration", useCase.configuration()); + + writeEndElement(); + } + + protected void writeMultiProcessorUseCases(final List<MultiProcessorUseCase> multiProcessorUseCases) throws IOException { + if (multiProcessorUseCases.isEmpty()) { + return; + } + + writeArray("multiProcessorUseCases", multiProcessorUseCases, this::writeMultiProcessorUseCase); + } + + private void writeMultiProcessorUseCase(final MultiProcessorUseCase useCase) throws IOException { + writeStartElement("multiProcessorUseCase"); + + writeTextElement("description", useCase.description()); + writeTextElement("notes", useCase.notes()); + writeTextArray("keywords", "keyword", Arrays.asList(useCase.keywords())); + + writeArray("processorConfigurations", Arrays.asList(useCase.configurations()), this::writeUseCaseComponent); + + writeEndElement(); + } + + private void writeUseCaseComponent(final ProcessorConfiguration processorConfig) throws IOException { + writeStartElement("processorConfiguration"); + + String processorClassName = processorConfig.processorClassName(); + if (processorClassName.isEmpty()) { + processorClassName = processorConfig.processorClass().getName(); + } + + writeTextElement("processorClassName", processorClassName); + writeTextElement("configuration", processorConfig.configuration()); + + writeEndElement(); + } + + @Override protected void writeRelationships(final Set<Relationship> relationships) throws IOException { if (relationships == null || relationships.isEmpty()) { diff --git a/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/Extension.java b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/Extension.java index cdb83cbff5..681760afee 100644 --- a/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/Extension.java +++ b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/Extension.java @@ -101,6 +101,14 @@ public class Extension { private boolean primaryNodeOnly; private boolean sideEffectFree; + @XmlElementWrapper + @XmlElement(name = "useCase") + private List<UseCase> useCases; + + @XmlElementWrapper + @XmlElement(name = "multiProcessorUseCase") + private List<MultiProcessorUseCase> multiProcessorUseCases; + @ApiModelProperty(value = "The name of the extension") public String getName() { return name; @@ -334,6 +342,24 @@ public class Extension { this.sideEffectFree = sideEffectFree; } + @ApiModelProperty(value = "Zero or more documented use cases for how the extension may be used") + public List<UseCase> getUseCases() { + return useCases; + } + + public void setUseCases(final List<UseCase> useCases) { + this.useCases = useCases; + } + + @ApiModelProperty(value = "Zero or more documented use cases for how the processor may be used in conjunction with other processors") + public List<MultiProcessorUseCase> getMultiProcessorUseCases() { + return multiProcessorUseCases; + } + + public void setMultiProcessorUseCases(final List<MultiProcessorUseCase> multiProcessorUseCases) { + this.multiProcessorUseCases = multiProcessorUseCases; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/MultiProcessorUseCase.java b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/MultiProcessorUseCase.java new file mode 100644 index 0000000000..b530182a34 --- /dev/null +++ b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/MultiProcessorUseCase.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.extension.manifest; + +import io.swagger.annotations.ApiModel; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; +import java.util.List; + +@ApiModel +@XmlAccessorType(XmlAccessType.FIELD) +public class MultiProcessorUseCase { + private String description; + + private String notes; + + @XmlElementWrapper + @XmlElement(name = "keyword") + private List<String> keywords; + + @XmlElementWrapper + @XmlElement(name = "processorConfiguration") + private List<ProcessorConfiguration> processorConfigurations; + + + public String getDescription() { + return description; + } + + public void setDescription(final String description) { + this.description = description; + } + + public String getNotes() { + return notes; + } + + public void setNotes(final String notes) { + this.notes = notes; + } + + public List<String> getKeywords() { + return keywords; + } + + public void setKeywords(final List<String> keywords) { + this.keywords = keywords; + } + + public List<ProcessorConfiguration> getProcessorConfigurations() { + return processorConfigurations; + } + + public void setProcessorConfigurations(final List<ProcessorConfiguration> processorConfigurations) { + this.processorConfigurations = processorConfigurations; + } +} diff --git a/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/ProcessorConfiguration.java b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/ProcessorConfiguration.java new file mode 100644 index 0000000000..0f9b118b3e --- /dev/null +++ b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/ProcessorConfiguration.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.extension.manifest; + +import io.swagger.annotations.ApiModel; + +@ApiModel +public class ProcessorConfiguration { + private String processorClassName; + private String configuration; + + + public String getProcessorClassName() { + return processorClassName; + } + + public void setProcessorClassName(final String processorClassName) { + this.processorClassName = processorClassName; + } + + public String getConfiguration() { + return configuration; + } + + public void setConfiguration(final String configuration) { + this.configuration = configuration; + } +} diff --git a/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/UseCase.java b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/UseCase.java new file mode 100644 index 0000000000..e218210b7d --- /dev/null +++ b/nifi-manifest/nifi-extension-manifest-model/src/main/java/org/apache/nifi/extension/manifest/UseCase.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.extension.manifest; + +import io.swagger.annotations.ApiModel; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; +import java.util.List; + +@ApiModel +@XmlAccessorType(XmlAccessType.FIELD) +public class UseCase { + + private String description; + + private String notes; + + private InputRequirement inputRequirement; + + @XmlElementWrapper + @XmlElement(name = "keyword") + private List<String> keywords; + + String configuration; + + + public String getDescription() { + return description; + } + + public void setDescription(final String description) { + this.description = description; + } + + public String getNotes() { + return notes; + } + + public void setNotes(final String notes) { + this.notes = notes; + } + + public InputRequirement getInputRequirement() { + return inputRequirement; + } + + public void setInputRequirement(final InputRequirement inputRequirement) { + this.inputRequirement = inputRequirement; + } + + public List<String> getKeywords() { + return keywords; + } + + public void setKeywords(final List<String> keywords) { + this.keywords = keywords; + } + + public String getConfiguration() { + return configuration; + } + + public void setConfiguration(final String configuration) { + this.configuration = configuration; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index cac2a6a3f7..69e61fb51a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -30,8 +30,11 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; @@ -47,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.sqs.GetSQS; import java.io.IOException; import java.net.URLDecoder; @@ -84,6 +88,88 @@ import java.util.concurrent.TimeUnit; @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"), @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy that was used to store the S3 object (if it is encrypted)"),}) +@UseCase( + description = "Fetch a specific file from S3", + configuration = """ + The "Bucket" property should be set to the name of the S3 bucket that contains the file. Typically this is defined as an attribute on an incoming FlowFile, \ + so this property is set to `${s3.bucket}`. + The "Object Key" property denotes the fully qualified filename of the file to fetch. Typically, the FlowFile's `filename` attribute is used, so this property is \ + set to `${filename}`. + The "Region" property must be set to denote the S3 region that the Bucket resides in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{S3_REGION}`. + + The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the file. + """ +) +@MultiProcessorUseCase( + description = "Retrieve all files in an S3 bucket", + keywords = {"s3", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListS3.class, + configuration = """ + The "Bucket" property should be set to the name of the S3 bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{S3_SOURCE_BUCKET}`. + The "Region" property must be set to denote the S3 region that the Bucket resides in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{S3_SOURCE_REGION}`. + + The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket. + + The 'success' Relationship of this Processor is then connected to FetchS3Object. + """ + ), + @ProcessorConfiguration( + processorClass = FetchS3Object.class, + configuration = """ + "Bucket" = "${s3.bucket}" + "Object Key" = "${filename}" + + The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket. + + The "Region" property must be set to the same value as the "Region" property of the ListS3 Processor. + """ + ) + } +) +@MultiProcessorUseCase( + description = "Retrieve new files as they arrive in an S3 bucket", + notes = "This method of retrieving files from S3 is more efficient than using ListS3 and more cost effective. It is the pattern recommended by AWS. " + + "However, it does require that the S3 bucket be configured to place notifications on an SQS queue when new files arrive. For more information, see " + + "https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html", + configurations = { + @ProcessorConfiguration( + processorClass = GetSQS.class, + configuration = """ + The "Queue URL" must be set to the appropriate URL for the SQS queue. It is recommended that this property be parameterized, using a value such as `#{SQS_QUEUE_URL}`. + The "Region" property must be set to denote the SQS region that the queue resides in. It's a good idea to parameterize this property by setting it to something like `#{SQS_REGION}`. + + The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket. + + The 'success' relationship is connected to EvaluateJsonPath. + """ + ), + @ProcessorConfiguration( + processorClassName = "org.apache.nifi.processors.standard.EvaluateJsonPath", + configuration = """ + "Destination" = "flowfile-attribute" + "s3.bucket" = "$.Records[0].s3.bucket.name" + "filename" = "$.Records[0].s3.object.key" + + The 'success' relationship is connected to FetchS3Object. + """ + ), + @ProcessorConfiguration( + processorClass = FetchS3Object.class, + configuration = """ + "Bucket" = "${s3.bucket}" + "Object Key" = "${filename}" + + The "Region" property must be set to the same value as the "Region" property of the GetSQS Processor. + The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket. + """ + ) + } +) public class FetchS3Object extends AbstractS3Processor { public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() @@ -382,4 +468,4 @@ public class FetchS3Object extends AbstractS3Processor { attributes.put(CoreAttributes.FILENAME.key(), filePathName); } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java index dcd33f3ad0..0f97aef4a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java @@ -27,8 +27,11 @@ import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties; import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.AllowableValue; @@ -58,6 +61,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -166,6 +171,8 @@ public class HtmlDocumentationWriter implements DocumentationWriter { writeStatefulInfo(configurableComponent, xmlStreamWriter); writeRestrictedInfo(configurableComponent, xmlStreamWriter); writeInputRequirementInfo(configurableComponent, xmlStreamWriter); + writeUseCases(configurableComponent, xmlStreamWriter); + writeMultiComponentUseCases(configurableComponent, xmlStreamWriter); writeSystemResourceConsiderationInfo(configurableComponent, xmlStreamWriter); writeSeeAlso(configurableComponent, xmlStreamWriter); xmlStreamWriter.writeEndElement(); @@ -448,6 +455,135 @@ public class HtmlDocumentationWriter implements DocumentationWriter { return description; } + protected void writeUseCases(final ConfigurableComponent component, final XMLStreamWriter xmlStreamWriter) throws XMLStreamException { + final UseCase[] useCases = component.getClass().getAnnotationsByType(UseCase.class); + if (useCases.length == 0) { + return; + } + + writeSimpleElement(xmlStreamWriter, "h2", "Example Use Cases:"); + + for (final UseCase useCase : useCases) { + writeSimpleElement(xmlStreamWriter, "h3", "Use Case:"); + writeSimpleElement(xmlStreamWriter, "p", useCase.description()); + + final String notes = useCase.notes(); + if (!StringUtils.isEmpty(notes)) { + writeSimpleElement(xmlStreamWriter, "h4", "Notes:"); + + final String[] splits = notes.split("\\n"); + for (final String split : splits) { + writeSimpleElement(xmlStreamWriter, "p", split); + } + } + + final String[] keywords = useCase.keywords(); + if (keywords.length > 0) { + writeSimpleElement(xmlStreamWriter, "h4", "Keywords:"); + xmlStreamWriter.writeCharacters(String.join(", ", keywords)); + } + + final Requirement inputRequirement = useCase.inputRequirement(); + if (inputRequirement != Requirement.INPUT_ALLOWED) { + writeSimpleElement(xmlStreamWriter, "h4", "Input Requirement:"); + xmlStreamWriter.writeCharacters(inputRequirement.toString()); + } + + final String configuration = useCase.configuration(); + writeUseCaseConfiguration(configuration, xmlStreamWriter); + + writeSimpleElement(xmlStreamWriter, "br", null); + } + } + + protected void writeMultiComponentUseCases(final ConfigurableComponent component, final XMLStreamWriter xmlStreamWriter) throws XMLStreamException { + final MultiProcessorUseCase[] useCases = component.getClass().getAnnotationsByType(MultiProcessorUseCase.class); + if (useCases.length == 0) { + return; + } + + writeSimpleElement(xmlStreamWriter, "h2", "Example Use Cases Involving Other Components:"); + + for (final MultiProcessorUseCase useCase : useCases) { + writeSimpleElement(xmlStreamWriter, "h3", "Use Case:"); + writeSimpleElement(xmlStreamWriter, "p", useCase.description()); + + final String notes = useCase.notes(); + if (!StringUtils.isEmpty(notes)) { + writeSimpleElement(xmlStreamWriter, "h4", "Notes:"); + + final String[] splits = notes.split("\\n"); + for (final String split : splits) { + writeSimpleElement(xmlStreamWriter, "p", split); + } + } + + final String[] keywords = useCase.keywords(); + if (keywords.length > 0) { + writeSimpleElement(xmlStreamWriter, "h4", "Keywords:"); + xmlStreamWriter.writeCharacters(String.join(", ", keywords)); + } + + writeSimpleElement(xmlStreamWriter, "h4", "Components involved:"); + final ProcessorConfiguration[] processorConfigurations = useCase.configurations(); + for (final ProcessorConfiguration processorConfiguration : processorConfigurations) { + writeSimpleElement(xmlStreamWriter, "strong", "Component Type: "); + + final String extensionClassName; + if (processorConfiguration.processorClassName().isEmpty()) { + extensionClassName = processorConfiguration.processorClass().getName(); + } else { + extensionClassName = processorConfiguration.processorClassName(); + } + + writeSimpleElement(xmlStreamWriter, "span", extensionClassName); + + final String configuration = processorConfiguration.configuration(); + writeUseCaseConfiguration(configuration, xmlStreamWriter); + + writeSimpleElement(xmlStreamWriter, "br", null); + } + + + writeSimpleElement(xmlStreamWriter, "br", null); + } + } + + private void writeUseCaseConfiguration(final String configuration, final XMLStreamWriter xmlStreamWriter) throws XMLStreamException { + if (StringUtils.isEmpty(configuration)) { + return; + } + + writeSimpleElement(xmlStreamWriter, "h4", "Configuration:"); + + final String[] splits = configuration.split("\\n"); + for (final String split : splits) { + xmlStreamWriter.writeStartElement("p"); + + final Matcher matcher = Pattern.compile("`(.*?)`").matcher(split); + int startIndex = 0; + while (matcher.find()) { + final int start = matcher.start(); + if (start > 0) { + xmlStreamWriter.writeCharacters(split.substring(startIndex, start)); + } + + writeSimpleElement(xmlStreamWriter, "code", matcher.group(1)); + + startIndex = matcher.end(); + } + if (split.length() > startIndex) { + if (startIndex == 0) { + xmlStreamWriter.writeCharacters(split); + } else { + xmlStreamWriter.writeCharacters(split.substring(startIndex)); + } + } + + xmlStreamWriter.writeEndElement(); + } + } + /** * Writes the PropertyDescriptors out as a table. * diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index c56ad8f7fc..2c78fc008f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -38,7 +38,10 @@ import org.apache.nifi.annotation.behavior.SystemResource; import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -98,6 +101,40 @@ import java.util.zip.InflaterInputStream; + "property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.") @SystemResourceConsideration(resource = SystemResource.CPU) @SystemResourceConsideration(resource = SystemResource.MEMORY) +@UseCase( + description = "Compress the contents of a FlowFile", + configuration = """ + "Mode" = "compress" + "Compression Format" should be set to whichever compression algorithm should be used.""" +) +@UseCase( + description = "Decompress the contents of a FlowFile", + configuration = """ + "Mode" = "decompress" + "Compression Format" should be set to whichever compression algorithm was used to compress the data previously.""" +) +@MultiProcessorUseCase( + description = "Check whether or not a FlowFile is compressed and if so, decompress it.", + notes = "If IdentifyMimeType determines that the content is not compressed, CompressContent will pass the FlowFile " + + "along to the 'success' relationship without attempting to decompress it.", + keywords = {"auto", "detect", "mime type", "compress", "decompress", "gzip", "bzip2"}, + configurations = { + @ProcessorConfiguration( + processorClass = IdentifyMimeType.class, + configuration = """ + Default property values are sufficient. + Connect the 'success' relationship to CompressContent. + """ + ), + @ProcessorConfiguration( + processorClass = CompressContent.class, + configuration = """ + "Mode" = "decompress" + "Compression Format" = "use mime.type attribute" + """ + ) + } +) public class CompressContent extends AbstractProcessor { public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute"; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java index 23ff6d81c3..d04a36fe4c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java @@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -48,6 +49,11 @@ import java.util.List; + "the output schema can have a field named \"balance\" with a type of string, double, or float. If any field is present in the input that is not present in the output, " + "the field will be left out of the output. If any field is specified in the output schema but is not present in the input data/schema, then the field will not be " + "present in the output or will have a null value, depending on the writer.") +@UseCase(description = "Convert data from one record-oriented format to another", + configuration = """ + The Record Reader should be configured according to the incoming data format. + The Record Writer should be configured according to the desired output format.""" +) public class ConvertRecord extends AbstractRecordProcessor { @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 6f40029fbd..dfa09a42ed 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -41,6 +41,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -101,14 +102,15 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipException; import java.util.zip.ZipOutputStream; + @SideEffectFree @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"}) @CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. " - + "It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be " - + "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate. " - + "NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.") + + "It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be " + + "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate. " + + "NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.") @ReadsAttributes({ @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. " + "All FlowFiles with the same value for this attribute will be bundled together."), @@ -146,8 +148,55 @@ import java.util.zip.ZipOutputStream; }) @SeeAlso({SegmentContent.class, MergeRecord.class}) @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "While content is not stored in memory, the FlowFiles' attributes are. " + - "The configuration of MergeContent (maximum bin size, maximum group size, maximum bin age, max number of entries) will influence how much " + - "memory is used. If merging together many small FlowFiles, a two-stage approach may be necessary in order to avoid excessive use of memory.") + "The configuration of MergeContent (maximum bin size, maximum group size, maximum bin age, max number of entries) will influence how much " + + "memory is used. If merging together many small FlowFiles, a two-stage approach may be necessary in order to avoid excessive use of memory.") +@UseCase( + description = "Concatenate FlowFiles with textual content together in order to create fewer, larger FlowFiles.", + keywords = {"concatenate", "bundle", "aggregate", "bin", "merge", "combine", "smash"}, + configuration = """ + "Merge Strategy" = "Bin Packing Algorithm" + "Merge Format" = "Binary Concatenation" + "Delimiter Strategy" = "Text" + "Demarcator" = "\\n" (a newline can be inserted by pressing Shift + Enter) + "Minimum Number of Entries" = "1" + "Maximum Number of Entries" = "500000000" + "Minimum Group Size" = the minimum amount of data to write to an output FlowFile. A reasonable value might be "128 MB" + "Maximum Group Size" = the maximum amount of data to write to an output FlowFile. A reasonable value might be "256 MB" + "Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller \ + than the Max Bin Age. A reasonable value might be "5 mins" + """ +) +@UseCase( + description = "Concatenate FlowFiles with binary content together in order to create fewer, larger FlowFiles.", + notes = "Not all binary data can be concatenated together. Whether or not this configuration is valid depends on the type of your data.", + keywords = {"concatenate", "bundle", "aggregate", "bin", "merge", "combine", "smash"}, + configuration = """ + "Merge Strategy" = "Bin Packing Algorithm" + "Merge Format" = "Binary Concatenation" + "Delimiter Strategy" = "Text" + "Minimum Number of Entries" = "1" + "Maximum Number of Entries" = "500000000" + "Minimum Group Size" = the minimum amount of data to write to an output FlowFile. A reasonable value might be "128 MB" + "Maximum Group Size" = the maximum amount of data to write to an output FlowFile. A reasonable value might be "256 MB" + "Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller \ + than the Max Bin Age. A reasonable value might be "5 mins" + """ +) +@UseCase( + description = "Reassemble a FlowFile that was previously split apart into smaller FlowFiles by a processor such as SplitText, UnpackContext, SplitRecord, etc.", + keywords = {"reassemble", "repack", "merge", "recombine"}, + configuration = """ + "Merge Strategy" = "Defragment" + "Merge Format" = the value of Merge Format depends on the desired output format. If the file was previously zipped together and was split apart by UnpackContent, + a Merge Format of "ZIP" makes sense. If it was previously a .tar file, a Merge Format of "TAR" makes sense. If the data is textual, "Binary Concatenation" can be + used to combine the text into a single document. + "Delimiter Strategy" = "Text" + "Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the fragments to 'failure'. A reasonable value might be "5 mins" + + For textual data, "Demarcator" should be set to a newline (\\n), set by pressing Shift+Enter in the UI. For binary data, "Demarcator" should be left blank. + """ +) + public class MergeContent extends BinFiles { // preferred attributes @@ -157,35 +206,35 @@ public class MergeContent extends BinFiles { public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new AllowableValue("Use First Metadata", "Use First Metadata", - "For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile."); + "For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile."); public static final AllowableValue METADATA_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Metadata", "Keep Only Common Metadata", - "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values match those of the first FlowFile, any additional metadata " - + "will be dropped but the FlowFile will be merged. Any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged."); + "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values match those of the first FlowFile, any additional metadata " + + "will be dropped but the FlowFile will be merged. Any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged."); public static final AllowableValue METADATA_STRATEGY_IGNORE = new AllowableValue("Ignore Metadata", "Ignore Metadata", - "Ignores (does not transfer, compare, etc.) any metadata from a FlowFile whose content supports embedded metadata."); + "Ignores (does not transfer, compare, etc.) any metadata from a FlowFile whose content supports embedded metadata."); public static final AllowableValue METADATA_STRATEGY_DO_NOT_MERGE = new AllowableValue("Do Not Merge Uncommon Metadata", "Do Not Merge Uncommon Metadata", - "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged."); + "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged."); public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue( - "Bin-Packing Algorithm", - "Bin-Packing Algorithm", - "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally " + "Bin-Packing Algorithm", + "Bin-Packing Algorithm", + "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally " + "their attributes (if the <Correlation Attribute> property is set)"); public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue( - "Defragment", - "Defragment", - "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " + "Defragment", + "Defragment", + "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " + "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index>. All FlowFiles with the same value for \"fragment.identifier\" " + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles " + "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute."); public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue( - "Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file"); + "Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file"); public static final AllowableValue DELIMITER_STRATEGY_TEXT = new AllowableValue( - "Text", "Text", "The values of Header, Footer, and Demarcator will be specified as property values"); + "Text", "Text", "The values of Header, Footer, and Demarcator will be specified as property values"); public static final AllowableValue DELIMITER_STRATEGY_NONE = new AllowableValue( "Do Not Use Delimiters", "Do Not Use Delimiters", "No Header, Footer, or Demarcator will be used"); @@ -198,38 +247,38 @@ public class MergeContent extends BinFiles { public static final String MERGE_FORMAT_AVRO_VALUE = "Avro"; public static final AllowableValue MERGE_FORMAT_TAR = new AllowableValue( - MERGE_FORMAT_TAR_VALUE, - MERGE_FORMAT_TAR_VALUE, - "A bin of FlowFiles will be combined into a single TAR file. The FlowFiles' <path> attribute will be used to create a directory in the " + MERGE_FORMAT_TAR_VALUE, + MERGE_FORMAT_TAR_VALUE, + "A bin of FlowFiles will be combined into a single TAR file. The FlowFiles' <path> attribute will be used to create a directory in the " + "TAR file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the TAR file. " + "If a FlowFile has an attribute named <tar.permissions> that is 3 characters, each between 0-7, that attribute will be used " + "as the TAR entry's 'mode'."); public static final AllowableValue MERGE_FORMAT_ZIP = new AllowableValue( - MERGE_FORMAT_ZIP_VALUE, - MERGE_FORMAT_ZIP_VALUE, - "A bin of FlowFiles will be combined into a single ZIP file. The FlowFiles' <path> attribute will be used to create a directory in the " + MERGE_FORMAT_ZIP_VALUE, + MERGE_FORMAT_ZIP_VALUE, + "A bin of FlowFiles will be combined into a single ZIP file. The FlowFiles' <path> attribute will be used to create a directory in the " + "ZIP file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the ZIP file. " + "The <Compression Level> property indicates the ZIP compression to use."); public static final AllowableValue MERGE_FORMAT_FLOWFILE_STREAM_V3 = new AllowableValue( - MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE, - MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE, - "A bin of FlowFiles will be combined into a single Version 3 FlowFile Stream"); + MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE, + MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE, + "A bin of FlowFiles will be combined into a single Version 3 FlowFile Stream"); public static final AllowableValue MERGE_FORMAT_FLOWFILE_STREAM_V2 = new AllowableValue( - MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE, - MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE, - "A bin of FlowFiles will be combined into a single Version 2 FlowFile Stream"); + MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE, + MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE, + "A bin of FlowFiles will be combined into a single Version 2 FlowFile Stream"); public static final AllowableValue MERGE_FORMAT_FLOWFILE_TAR_V1 = new AllowableValue( - MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE, - MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE, - "A bin of FlowFiles will be combined into a single Version 1 FlowFile Package"); + MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE, + MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE, + "A bin of FlowFiles will be combined into a single Version 1 FlowFile Package"); public static final AllowableValue MERGE_FORMAT_CONCAT = new AllowableValue( - MERGE_FORMAT_CONCAT_VALUE, - MERGE_FORMAT_CONCAT_VALUE, - "The contents of all FlowFiles will be concatenated together into a single FlowFile"); + MERGE_FORMAT_CONCAT_VALUE, + MERGE_FORMAT_CONCAT_VALUE, + "The contents of all FlowFiles will be concatenated together into a single FlowFile"); public static final AllowableValue MERGE_FORMAT_AVRO = new AllowableValue( - MERGE_FORMAT_AVRO_VALUE, - MERGE_FORMAT_AVRO_VALUE, - "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile"); + MERGE_FORMAT_AVRO_VALUE, + MERGE_FORMAT_AVRO_VALUE, + "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile"); public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions"; @@ -239,21 +288,21 @@ public class MergeContent extends BinFiles { public static final String REASON_FOR_MERGING = "merge.reason"; public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder() - .name("Merge Strategy") - .description("Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by " - + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily " - + "chosen FlowFiles") - .required(true) - .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT) - .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue()) - .build(); + .name("Merge Strategy") + .description("Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by " + + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily " + + "chosen FlowFiles") + .required(true) + .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT) + .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue()) + .build(); public static final PropertyDescriptor MERGE_FORMAT = new PropertyDescriptor.Builder() - .required(true) - .name("Merge Format") - .description("Determines the format that will be used to merge the content.") - .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO) - .defaultValue(MERGE_FORMAT_CONCAT.getValue()) - .build(); + .required(true) + .name("Merge Format") + .description("Determines the format that will be used to merge the content.") + .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO) + .defaultValue(MERGE_FORMAT_CONCAT.getValue()) + .build(); public static final PropertyDescriptor METADATA_STRATEGY = new PropertyDescriptor.Builder() .required(true) @@ -270,85 +319,85 @@ public class MergeContent extends BinFiles { .build(); public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() - .name("Correlation Attribute Name") - .description("If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for " - + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) - .defaultValue(null) - .dependsOn(MERGE_STRATEGY, MERGE_STRATEGY_BIN_PACK) - .build(); + .name("Correlation Attribute Name") + .description("If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for " + + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .defaultValue(null) + .dependsOn(MERGE_STRATEGY, MERGE_STRATEGY_BIN_PACK) + .build(); public static final PropertyDescriptor DELIMITER_STRATEGY = new PropertyDescriptor.Builder() - .required(true) - .name("Delimiter Strategy") - .description("Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if " - + "the values of the properties should be used as the content.") - .allowableValues(DELIMITER_STRATEGY_NONE, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) - .defaultValue(DELIMITER_STRATEGY_NONE.getValue()) - .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT_VALUE) - .build(); + .required(true) + .name("Delimiter Strategy") + .description("Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if " + + "the values of the properties should be used as the content.") + .allowableValues(DELIMITER_STRATEGY_NONE, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) + .defaultValue(DELIMITER_STRATEGY_NONE.getValue()) + .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT_VALUE) + .build(); public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder() - .name("Header File") - .displayName("Header") - .description("Filename or text specifying the header to use. If not specified, no header is supplied.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) - .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT) - .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) - .build(); + .name("Header File") + .displayName("Header") + .description("Filename or text specifying the header to use. If not specified, no header is supplied.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) + .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) + .build(); public static final PropertyDescriptor FOOTER = new PropertyDescriptor.Builder() - .name("Footer File") - .displayName("Footer") - .description("Filename or text specifying the footer to use. If not specified, no footer is supplied.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) - .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT) - .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) - .build(); + .name("Footer File") + .displayName("Footer") + .description("Filename or text specifying the footer to use. If not specified, no footer is supplied.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) + .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) + .build(); public static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder() - .name("Demarcator File") - .displayName("Demarcator") - .description("Filename or text specifying the demarcator to use. If not specified, no demarcator is supplied.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) - .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT) - .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) - .build(); + .name("Demarcator File") + .displayName("Demarcator") + .description("Filename or text specifying the demarcator to use. If not specified, no demarcator is supplied.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) + .dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) + .build(); public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder() - .name("Compression Level") - .description("Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is " - + "ignored") - .required(true) - .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") - .defaultValue("1") - .dependsOn(MERGE_FORMAT, MERGE_FORMAT_ZIP) - .build(); + .name("Compression Level") + .description("Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is " + + "ignored") + .required(true) + .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") + .defaultValue("1") + .dependsOn(MERGE_FORMAT, MERGE_FORMAT_ZIP) + .build(); public static final PropertyDescriptor KEEP_PATH = new PropertyDescriptor.Builder() - .name("Keep Path") - .description("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names.") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP) - .build(); + .name("Keep Path") + .description("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP) + .build(); public static final PropertyDescriptor TAR_MODIFIED_TIME = new PropertyDescriptor.Builder() - .name("Tar Modified Time") - .description("If using the Tar Merge Format, specifies if the Tar entry should store the modified timestamp either by expression " - + "(e.g. ${file.lastModifiedTime} or static value, both of which must match the ISO8601 format 'yyyy-MM-dd'T'HH:mm:ssZ'.") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("${file.lastModifiedTime}") - .dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR) - .build(); + .name("Tar Modified Time") + .description("If using the Tar Merge Format, specifies if the Tar entry should store the modified timestamp either by expression " + + "(e.g. ${file.lastModifiedTime} or static value, both of which must match the ISO8601 format 'yyyy-MM-dd'T'HH:mm:ssZ'.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("${file.lastModifiedTime}") + .dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR) + .build(); public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged content").build(); @@ -429,7 +478,7 @@ public class MergeContent extends BinFiles { @Override protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) { final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME) - .evaluateAttributeExpressions(flowFile).getValue(); + .evaluateAttributeExpressions(flowFile).getValue(); String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName); // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier @@ -559,7 +608,7 @@ public class MergeContent extends BinFiles { decidedFragmentCount = fragmentCount; } else if (!decidedFragmentCount.equals(fragmentCount)) { return "Cannot Defragment " + flowFile + " because it is grouped with another FlowFile, and the two have differing values for the " - + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount; + + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount; } } } @@ -602,7 +651,7 @@ public class MergeContent extends BinFiles { session.remove(flowFile); } catch (final Exception e) { getLogger().error("Failed to remove merged FlowFile from the session after merge failure during \"" - + context.getProperty(MERGE_FORMAT).getValue() + "\" merge.", e); + + context.getProperty(MERGE_FORMAT).getValue() + "\" merge.", e); } } @@ -684,7 +733,7 @@ public class MergeContent extends BinFiles { } private byte[] getDelimiterFileContent(final ProcessContext context, final List<FlowFile> flowFiles, final PropertyDescriptor descriptor) - throws IOException { + throws IOException { byte[] property = null; if (flowFiles != null && flowFiles.size() > 0) { final FlowFile flowFile = flowFiles.get(0); @@ -766,7 +815,7 @@ public class MergeContent extends BinFiles { @Override public void process(final OutputStream rawOut) throws IOException { try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); - final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) { + final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) { out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); // if any one of the FlowFiles is larger than the default maximum tar entry size, then we set bigNumberMode to handle it @@ -923,7 +972,7 @@ public class MergeContent extends BinFiles { @Override public void process(final OutputStream rawOut) throws IOException { try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); - final ZipOutputStream out = new ZipOutputStream(bufferedOut)) { + final ZipOutputStream out = new ZipOutputStream(bufferedOut)) { out.setLevel(compressionLevel); for (final FlowFile flowFile : contents) { final String path = keepPath ? getPath(flowFile) : ""; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java index ba6b9f8af4..a7f31165d8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -93,7 +94,37 @@ import java.util.stream.Stream; }) @Tags({"record", "partition", "recordpath", "rpath", "segment", "split", "group", "bin", "organize"}) @SeeAlso({ConvertRecord.class, SplitRecord.class, UpdateRecord.class, QueryRecord.class}) - +@UseCase( + description = "Separate records into separate FlowFiles so that all of the records in a FlowFile have the same value for a given field or set of fields.", + keywords = {"separate", "split", "partition", "break apart", "colocate", "segregate", "record", "field", "recordpath"}, + configuration = """ + Choose a RecordReader that is appropriate based on the format of the incoming data. + Choose a RecordWriter that writes the data in the desired output format. + + Add a single additional property. The name of the property should describe the type of data that is being used to partition the data. \ + The property's value should be a RecordPath that specifies which output FlowFile the Record belongs to. + + For example, if we want to separate records based on their `transactionType` field, we could add a new property named `transactionType`. \ + The value of the property might be `/transaction/type`. An input FlowFile will then be separated into as few FlowFiles as possible such that each \ + output FlowFile has the same value for the `transactionType` field. + """ +) +@UseCase( + description = "Separate records based on whether or not they adhere to a specific criteria", + keywords = {"separate", "split", "partition", "break apart", "segregate", "record", "field", "recordpath", "criteria"}, + configuration = """ + Choose a RecordReader that is appropriate based on the format of the incoming data. + Choose a RecordWriter that writes the data in the desired output format. + + Add a single additional property. The name of the property should describe the criteria. \ + The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false otherwise. + + For example, if we want to separate records based on whether or not they have a transaction total of more than $1,000 we could add a new property named \ + `largeTransaction` with a value of `/transaction/total > 1000`. This will create two FlowFiles. In the first, all records will have a total over `1000`. \ + In the second, all records will have a transaction less than or equal to 1000. Each FlowFile will have an attribute named `largeTransaction` with a value \ + of `true` or `false`. + """ +) public class PartitionRecord extends AbstractProcessor { private final RecordPathCache recordPathCache = new RecordPathCache(25); @@ -421,4 +452,4 @@ public class PartitionRecord extends AbstractProcessor { return "RecordMapValue[" + values + "]"; } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 73ec4e83ee..90109f302d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -109,6 +110,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST + "will be used to determine the type of statement (INSERT, UPDATE, DELETE, SQL, etc.) to generate and execute.") @WritesAttribute(attribute = PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR, description = "If an error occurs during processing, the flow file will be routed to failure or retry, and this attribute " + "will be populated with the cause of the error.") +@UseCase(description = "Insert records into a database") public class PutDatabaseRecord extends AbstractProcessor { public static final String UPDATE_TYPE = "UPDATE"; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 8be1276e8f..93e39f6cca 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -35,6 +35,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -113,6 +114,46 @@ import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE; @WritesAttribute(attribute = "record.count", description = "The number of records selected by the query"), @WritesAttribute(attribute = QueryRecord.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed") }) +@UseCase( + description = "Filter out records based on the values of the records' fields", + keywords = {"filter out", "remove", "drop", "strip out", "record field", "sql"}, + configuration = """ + "Record Reader" should be set to a Record Reader that is appropriate for your data. + "Record Writer" should be set to a Record Writer that writes out data in the desired format. + + One additional property should be added. + The name of the property should be a short description of the data to keep. + Its value is a SQL statement that selects all columns from a table named `FLOW_FILE` for relevant rows. + The WHERE clause selects the data to keep. I.e., it is the exact opposite of what we want to remove. + It is recommended to always quote column names using double-quotes in order to avoid conflicts with SQL keywords. + For example, to remove records where either the name is George OR the age is less than 18, we would add a \ + property named "adults not george" with a value that selects records where the name is not George AND the age is greater than or equal to 18. \ + So the value would be `SELECT * FROM FLOWFILE WHERE "name" <> 'George' AND "age" >= 18` + + Adding this property now gives us a new Relationship whose name is the same as the property name. So, the "adults not george" Relationship \ + should be connected to the next Processor in our flow. + """ +) +@UseCase( + description = "Keep only specific records", + keywords = {"keep", "filter", "retain", "select", "include", "record", "sql"}, + configuration = """ + "Record Reader" should be set to a Record Reader that is appropriate for your data. + "Record Writer" should be set to a Record Writer that writes out data in the desired format. + + One additional property should be added. + The name of the property should be a short description of the data to keep. + Its value is a SQL statement that selects all columns from a table named `FLOW_FILE` for relevant rows. + The WHERE clause selects the data to keep. + It is recommended to always quote column names using double-quotes in order to avoid conflicts with SQL keywords. + For example, to keep only records where the person is an adult (aged 18 or older), add a property named "adults" \ + with a value that is a SQL statement that selects records where the age is at least 18. \ + So the value would be `SELECT * FROM FLOWFILE WHERE "age" >= 18` + + Adding this property now gives us a new Relationship whose name is the same as the property name. So, the "adults" Relationship \ + should be connected to the next Processor in our flow. + """ +) public class QueryRecord extends AbstractProcessor { public static final String ROUTE_ATTRIBUTE_KEY = "QueryRecord.Route"; @@ -559,4 +600,4 @@ public class QueryRecord extends AbstractProcessor { } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index 7051a4026f..dc16a2b205 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.SystemResource; import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; import org.apache.nifi.attribute.expression.language.exception.IllegalAttributeException; @@ -79,6 +80,68 @@ import java.util.regex.Pattern; @CapabilityDescription("Updates the content of a FlowFile by searching for some textual value in the FlowFile content (via Regular Expression/regex, or literal value) and replacing the " + "section of the content that matches with some alternate value. It can also be used to append or prepend text to the contents of a FlowFile.") @SystemResourceConsideration(resource = SystemResource.MEMORY) +@UseCase( + description = "Append text to the end of every line in a FlowFile", + keywords = {"raw text", "append", "line"}, + configuration = """ + "Evaluation Mode" = "Line-by-Line" + "Replacement Strategy" = "Append" + + "Replacement Value" is set to whatever text should be appended to the line. + For example, to insert the text `<fin>` at the end of every line, we would set "Replacement Value" to `<fin>`. + We can also use Expression Language. So to insert the filename at the end of every line, we set "Replacement Value" to `${filename}` + """ +) +@UseCase( + description = "Prepend text to the beginning of every line in a FlowFile", + keywords = {"raw text", "prepend", "line"}, + configuration = """ + "Evaluation Mode" = "Line-by-Line" + "Replacement Strategy" = "Prepend" + + "Replacement Value" is set to whatever text should be prepended to the line. + For example, to insert the text `<start>` at the beginning of every line, we would set "Replacement Value" to `<start>`. + We can also use Expression Language. So to insert the filename at the beginning of every line, we set "Replacement Value" to `${filename}` + """ +) +@UseCase( + description = "Replace every occurrence of a literal string in the FlowFile with a different value", + keywords = {"replace", "string", "text", "literal"}, + configuration = """ + "Evaluation Mode" = "Line-by-Line" + "Replacement Strategy" = "Literal Replace" + "Search Value" is set to whatever text is in the FlowFile that needs to be replaced. + "Replacement Value" is set to the text that should replace the current text. + + For example, to replace the word "spider" with "arachnid" we set "Search Value" to `spider` and set "Replacement Value" to `arachnid`. + """ +) +@UseCase( + description = "Transform every occurrence of a literal string in a FlowFile", + keywords = {"replace", "transform", "raw text"}, + configuration = """ + "Evaluation Mode" = "Line-by-Line" + "Replacement Strategy" = "Regex Replace" + "Search Value" is set to a regular expression that matches the text that should be transformed in a capturing group. + "Replacement Value" is set to a NiFi Expression Language expression that references `$1` (in quotes to escape the reference name). + + For example, if we wanted to lowercase any occurrence of WOLF, TIGER, or LION, we would use a "Search Value" of `(WOLF|TIGER|LION)` \ + and a "Replacement Value" of `${'$1':toLower()}`. + If we want to replace any identifier with a hash of that identifier, we might use a "Search Value" of `identifier: (.*)` and a \ + "Replacement Value" of `identifier: ${'$1':hash('sha256')}` + """ +) +@UseCase( + description = "Completely replace the contents of a FlowFile to a specific text", + keywords = {"replace", "raw text"}, + configuration = """ + "Evaluation Mode" = "Entire text" + "Replacement Strategy" = "Always Replace" + + "Replacement Value" is set to the new text that should be written to the FlowFile. \ + This text might include NiFi Expression Language to reference one or more attributes. + """ +) public class ReplaceText extends AbstractProcessor { private static Pattern REPLACEMENT_NORMALIZATION_PATTERN = Pattern.compile("(\\$\\D)"); @@ -831,4 +894,4 @@ public class ReplaceText extends AbstractProcessor { } } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java index 1f6792b89d..f36e5f85c7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.DefaultRunDuration; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicRelationship; @@ -36,6 +27,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -51,6 +43,16 @@ import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + /** * <p> * This processor routes a FlowFile based on its flow file attributes by using the Attribute Expression Language. The Expression Language is used by adding Optional Properties to the processor. The @@ -72,6 +74,65 @@ import org.apache.nifi.processor.util.StandardValidators; @WritesAttributes({ @WritesAttribute(attribute = RouteOnAttribute.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed") }) +@UseCase( + description = "Route data to one or more relationships based on its attributes.", + keywords = {"attributes", "routing", "expression language"}, + configuration = """ + Set the "Routing Strategy" property to "Route to Property name". + For each route that a FlowFile might be routed to, add a new property. The name of the property should describe the route. + The value of the property is an Attribute Expression Language expression that will be used to determine whether or not a given FlowFile will be routed to the \ + associated relationship. + + For example, we might route data based on its file extension using the following properties: + - "Routing Strategy" = "Route to Property Name" + - "jpg" = "${filename:endsWith('.jpg')}" + - "png" = "${filename:endsWith('.png')}" + - "pdf" = "${filename:endsWith('.pdf')}" + + The Processor will now have 3 relationships: `jpg`, `png`, and `pdf`. Each of these should be connected to the appropriate downstream processor. + """ +) +@UseCase( + description = "Keep data only if its attributes meet some criteria, such as its filename ends with .txt.", + keywords = {"keep", "filter", "remove", "delete", "expression language"}, + configuration = """ + Add a new property for each condition that must be satisfied in order to keep the data. + If the data should be kept in the case that any of the provided conditions is met, set the "Routing Strategy" property to "Route to 'matched' if any matches". + If all conditions must be met in order to keep the data, set the "Routing Strategy" property to "Route to 'matched' if all match". + + For example, to keep files whose filename ends with .txt and have a file size of at least 1000 bytes, we will use the following properties: + - "ends_with_txt" = "${filename:endsWith('.txt')}" + - "large_enough" = "${fileSize:ge(1000)} + - "Routing Strategy" = "Route to 'matched' if all match" + + Auto-terminate the 'unmatched' relationship. + Connect the 'matched' relationship to the next processor in the flow. + """ +) +@UseCase( + description = "Discard or drop a file based on attributes, such as filename.", + keywords = {"discard", "drop", "filter", "remove", "delete", "expression language"}, + configuration = """ + Add a new property for each condition that must be satisfied in order to drop the data. + If the data should be dropped in the case that any of the provided conditions is met, set the "Routing Strategy" property to "Route to 'matched' if any matches". + If all conditions must be met in order to drop the data, set the "Routing Strategy" property to "Route to 'matched' if all match". + + Here are a couple of examples for configuring the properties: + Example 1 Use Case: Data should be dropped if its "uuid" attribute has an 'a' in it or ends with '0'. + Here, we will use the following properties: + - "has_a" = "${uuid:contains('a')}" + - "ends_with_0" = "${uuid:endsWith('0')} + - "Routing Strategy" = "Route to 'matched' if any matches" + Example 2 Use Case: Data should be dropped if its 'uuid' attribute has an 'a' AND it ends with a '1'. + Here, we will use the following properties: + - "has_a" = "${uuid:contains('a')}" + - "ends_with_1" = "${uuid:endsWith('1')} + - "Routing Strategy" = "Route to 'matched' if all match" + + Auto-terminate the 'matched' relationship. + Connect the 'unmatched' relationship to the next processor in the flow. + """ +) public class RouteOnAttribute extends AbstractProcessor { public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route"; @@ -277,4 +338,4 @@ public class RouteOnAttribute extends AbstractProcessor { session.transfer(flowFile, firstRelationship); } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java index 93a5265c28..1effec6f6f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -82,6 +83,37 @@ import java.util.regex.Pattern; @WritesAttribute(attribute = "RouteText.Group", description = "The value captured by all capturing groups in the 'Grouping Regular Expression' property. " + "If this property is not set or contains no capturing groups, this attribute will not be added.") }) +@UseCase( + description = "Drop blank or empty lines from the FlowFile's content.", + keywords = {"filter", "drop", "empty", "blank", "remove", "delete", "strip out", "lines", "text"}, + configuration = """ + "Routing Strategy" = "Route to each matching Property Name" + "Matching Strategy" = "Matches Regular Expression" + "Empty Line" = "^$" + + Auto-terminate the "Empty Line" relationship. + Connect the "unmatched" relationship to the next processor in your flow. + """ +) +@UseCase( + description = "Remove specific lines of text from a file, such as those containing a specific word or having a line length over some threshold.", + keywords = {"filter", "drop", "empty", "blank", "remove", "delete", "strip out", "lines", "text", "expression language"}, + configuration = """ + "Routing Strategy" = "Route to each matching Property Name" + "Matching Strategy" = "Satisfies Expression" + + An additional property should be added named "Filter Out." The value should be a NiFi Expression Language Expression that can refer to two variables \ + (in addition to FlowFile attributes): `line`, which is the line of text being evaluated; and `lineNo`, which is the line number in the file (starting with 1). \ + The Expression should return `true` for any line that should be dropped. + + For example, to remove any line that starts with a # symbol, we can set "Filter Out" to `${line:startsWith("#")}`. + We could also remove the first 2 lines of text by setting "Filter Out" to `${lineNo:le(2)}`. Note that we use the `le` function because we want lines numbers \ + less than or equal to `2`, since the line index is 1-based. + + Auto-terminate the "Filter Out" relationship. + Connect the "unmatched" relationship to the next processor in your flow. + """ +) public class RouteText extends AbstractProcessor { public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route"; @@ -646,4 +678,4 @@ public class RouteText extends AbstractProcessor { } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index 020ad157d4..0feeba2d14 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -75,6 +76,65 @@ import java.util.stream.Stream; description = "Allows users to specify values to use to replace fields in the record that match the RecordPath.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @SeeAlso({ConvertRecord.class}) +@UseCase( + description = "Combine multiple fields into a single field.", + keywords = {"combine", "concatenate", "recordpath"}, + configuration = """ + "Replacement Value Strategy" = "Record Path Value" + + A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to place the result in. + The value of the property uses the CONCAT Record Path function to concatenate multiple values together, potentially using other string literal values. + For example, to combine the `title`, `firstName` and `lastName` fields into a single field named `fullName`, we add a property with the name `/fullName` \ + and a value of `CONCAT(/title, ' ', /firstName, ' ', /lastName)` + """ +) +@UseCase( + description = "Change the value of a record field to an explicit value.", + keywords = {"change", "update", "replace", "transform"}, + configuration = """ + "Replacement Value Strategy" = "Literal Value" + + A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to place the result in. + The value of the property is the explicit value to set the field to. For example, we can set any field with a name of `txId`, regardless of its level in the data's hierarchy, \ + to `1111-1111` by adding a property with a name of `//txId` and a value of `1111-1111` + """ +) +@UseCase( + description = "Copy the value of one record field to another record field.", + keywords = {"change", "update", "copy", "recordpath", "hierarchy", "transform"}, + configuration = """ + "Replacement Value Strategy" = "Record Path Value" + + A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to update. + The value of the property is a RecordPath identifying the field to copy the value from. + For example, we can copy the value of `/identifiers/all/imei` to the `identifier` field at the root level, by adding a property named \ + `/identifier` with a value of `/identifiers/all/imei`. + """ +) +@UseCase( + description = "Enrich data by injecting the value of an attribute into each Record.", + keywords = {"enrich", "attribute", "change", "update", "replace", "insert", "transform"}, + configuration = """ + "Replacement Value Strategy" = "Literal Value" + + A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to place the result in. + The value of the property is an Expression Language expression that references the attribute of interest. We can, for example, insert a new field name \ + `filename` into each record by adding a property named `/filename` with a value of `${filename}` + """ +) +@UseCase( + description = "Change the format of a record field.", + keywords = {"change", "update", "replace", "insert", "transform", "format", "date/time", "timezone", "expression language"}, + configuration = """ + "Replacement Value Strategy" = "Literal Value" + + A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to update. + The value is an Expression Language expression that references the `field.name` variable. For example, to change the date/time format of \ + a field named `txDate` from `year-month-day` format to `month/day/year` format, we add a property named `/txDate` with a value of \ + `${field.value:toDate('yyyy-MM-dd'):format('MM/dd/yyyy')}`. We could also change the timezone of a timestamp field (and insert the timezone for clarity) by using a value of \ + `${field.value:toDate('yyyy-MM-dd HH:mm:ss', 'UTC-0400'):format('yyyy-MM-dd HH:mm:ss Z', 'UTC')}`. + """ +) public class UpdateRecord extends AbstractRecordProcessor { private static final String FIELD_NAME = "field.name"; private static final String FIELD_VALUE = "field.value"; @@ -291,4 +351,4 @@ public class UpdateRecord extends AbstractRecordProcessor { return selectedFields.get(0); } } -} +} \ No newline at end of file