This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new c998a72 NIFI-6842 - Introduce MetricsEventReportingTask c998a72 is described below commit c998a7259aca73e716433d0582b30d7ed986c4b2 Author: Yolanda M. Davis <yolanda.m.da...@gmail.com> AuthorDate: Tue Nov 5 14:25:58 2019 -0500 NIFI-6842 - Introduce MetricsEventReportingTask NIFI-6842 - Added AlertHandler for bulletin reporting. Update ReportingTask meta data. NIFI-6842 - corrected display names in action handlers, included metrics option for alert handlers, small refactor in reporting task NIFI-6842 - updated docs and tags NIFI-6842 - Added documentation for handlers. Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3874 --- nifi-assembly/pom.xml | 12 + .../apache/nifi/rules/handlers/AlertHandler.java | 169 +++++++++++++ .../nifi/rules/handlers/ExpressionHandler.java | 10 +- .../org/apache/nifi/rules/handlers/LogHandler.java | 29 ++- .../nifi/rules/handlers/RecordSinkHandler.java | 5 +- .../org.apache.nifi.controller.ControllerService | 3 +- .../additionalDetails.html | 39 +++ .../additionalDetails.html | 38 +++ .../additionalDetails.html | 38 +++ .../additionalDetails.html | 37 +++ .../nifi/rules/handlers/TestAlertHandler.java | 264 +++++++++++++++++++++ .../nifi-sql-reporting-tasks/pom.xml | 6 + .../reporting/sql/MetricsEventReportingTask.java | 105 ++++++++ .../nifi/reporting/sql/QueryNiFiReportingTask.java | 84 +------ .../nifi/reporting/sql/util/QueryMetricsUtil.java | 114 +++++++++ .../org.apache.nifi.reporting.ReportingTask | 3 +- .../additionalDetails.html | 34 +++ ...ask.java => TestMetricsEventReportingTask.java} | 176 +++++--------- .../reporting/sql/TestQueryNiFiReportingTask.java | 21 +- .../rules/MockPropertyContextActionHandler.java | 76 ++++++ .../nifi/rules/engine/MockRulesEngineService.java | 47 ++++ 21 files changed, 1094 insertions(+), 216 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index f3381a7..b036e65 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -878,6 +878,18 @@ language governing permissions and limitations under the License. --> <version>1.11.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-easyrules-nar</artifactId> + <version>1.11.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-rules-action-handler-nar</artifactId> + <version>1.11.0-SNAPSHOT</version> + <type>nar</type> + </dependency> </dependencies> </profile> <profile> diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java new file mode 100644 index 0000000..234ea3c --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules.handlers; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.rules.Action; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"rules", "rules engine", "action", "action handler", "logging", "alerts", "bulletins"}) +@CapabilityDescription("Creates alerts as bulletins based on a provided action (usually created by a rules engine). " + + "Action objects executed with this Handler should contain \"category\", \"message\", and \"logLevel\" attributes.") +public class AlertHandler extends AbstractActionHandlerService { + + public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder() + .name("alert-default-log-level") + .displayName("Default Alert Log Level") + .required(true) + .description("The default Log Level that will be used to log an alert message" + + " if a log level was not provided in the received action's attributes.") + .allowableValues(DebugLevels.values()) + .defaultValue("info") + .build(); + + public static final PropertyDescriptor DEFAULT_CATEGORY = new PropertyDescriptor.Builder() + .name("alert-default-category") + .displayName("Default Category") + .required(true) + .description("The default category to use when logging alert message "+ + " if a category was not provided in the received action's attributes.") + .defaultValue("Rules Triggered Alert") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DEFAULT_MESSAGE = new PropertyDescriptor.Builder() + .name("alert-default-message") + .displayName("Default Message") + .required(true) + .description("The default message to include in alert if an alert message was " + + "not provided in the received action's attributes") + .defaultValue("An alert was triggered by a rules-based action.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final PropertyDescriptor INCLUDE_FACTS = new PropertyDescriptor.Builder() + .name("alert-include-facts") + .displayName("Include Fact Data") + .required(true) + .description("If true, the alert message will include the facts which triggered this action. Default is false.") + .defaultValue("true") + .allowableValues("true", "false") + .build(); + + private List<PropertyDescriptor> properties; + private String defaultCategory; + private String defaultLogLevel; + private String defaultMessage; + private Boolean includeFacts; + + @Override + protected void init(ControllerServiceInitializationContext config) throws InitializationException { + super.init(config); + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DEFAULT_LOG_LEVEL); + properties.add(DEFAULT_CATEGORY); + properties.add(DEFAULT_MESSAGE); + properties.add(INCLUDE_FACTS); + this.properties = Collections.unmodifiableList(properties); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase(); + defaultCategory = context.getProperty(DEFAULT_CATEGORY).getValue(); + defaultMessage = context.getProperty(DEFAULT_MESSAGE).getValue(); + includeFacts = context.getProperty(INCLUDE_FACTS).asBoolean(); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void execute(Action action, Map<String, Object> facts) { + throw new UnsupportedOperationException("This method is not supported. The AlertHandler requires a Reporting Context"); + } + + @Override + public void execute(PropertyContext propertyContext, Action action, Map<String, Object> facts) { + ComponentLog logger = getLogger(); + + if (propertyContext instanceof ReportingContext) { + + ReportingContext context = (ReportingContext) propertyContext; + Map<String, String> attributes = action.getAttributes(); + if (context.getBulletinRepository() != null) { + final String category = attributes.getOrDefault("category", defaultCategory); + final String message = getMessage(attributes.getOrDefault("message", defaultMessage), facts); + final String level = attributes.getOrDefault("severity", attributes.getOrDefault("logLevel", defaultLogLevel)); + Severity severity; + try { + severity = Severity.valueOf(level.toUpperCase()); + } catch (IllegalArgumentException iae) { + severity = Severity.INFO; + } + BulletinRepository bulletinRepository = context.getBulletinRepository(); + bulletinRepository.addBulletin(context.createBulletin(category, severity, message)); + + } else { + logger.warn("Bulletin Repository is not available which is unusual. Cannot send a bulletin."); + } + + } else { + logger.warn("Reporting context was not provided to create bulletins."); + } + + } + + protected String getMessage(String alertMessage, Map<String, Object> facts){ + if (includeFacts) { + final StringBuilder message = new StringBuilder(alertMessage); + final Set<String> fields = facts.keySet(); + message.append("\n"); + message.append("Alert Facts:\n"); + fields.forEach(field -> { + message.append("Field: "); + message.append(field); + message.append(", Value: "); + message.append(facts.get(field)); + message.append("\n"); + }); + return message.toString(); + }else{ + return alertMessage; + } + } + +} diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java index be41a5c..711c0ec 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java @@ -39,7 +39,7 @@ import java.util.Map; @Tags({"rules", "rules engine", "action", "action handler", "expression language","MVEL","SpEL"}) @CapabilityDescription("Executes an action containing an expression written in MVEL or SpEL. The action " + -"is usually created by a rules engine. ") +"is usually created by a rules engine. Action objects executed with this Handler should contain \"command\" and \"type\" attributes.") public class ExpressionHandler extends AbstractActionHandlerService { enum ExpresssionType { @@ -47,9 +47,11 @@ public class ExpressionHandler extends AbstractActionHandlerService { } public static final PropertyDescriptor DEFAULT_EXPRESSION_LANGUAGE_TYPE = new PropertyDescriptor.Builder() - .name("Expression Language Type") + .name("default-expression-language-type") + .displayName("Default Expression Language Type") .required(true) - .description("The expression language that should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).") + .description("If an expression language type is not provided as an attribute within an Action, the default expression language that " + + "should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).") .allowableValues(ExpresssionType.values()) .defaultValue("MVEL") .build(); @@ -82,7 +84,7 @@ public class ExpressionHandler extends AbstractActionHandlerService { final String command = attributes.get("command"); if(StringUtils.isNotEmpty(command)) { try { - final String type = attributes.get("type"); + final String type = attributes.getOrDefault("type",this.type.toString()); ExpresssionType expresssionType = ExpresssionType.valueOf(type); if (expresssionType.equals(ExpresssionType.MVEL)) { executeMVEL(command, facts); diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java index 666afbc..20e9351 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java @@ -38,19 +38,32 @@ import java.util.Map; import java.util.Set; @Tags({"rules", "rules engine", "action", "action handler", "logging"}) -@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)") +@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine). " + + " Action objects executed with this Handler should contain \"logLevel\" and \"message\" attributes.") public class LogHandler extends AbstractActionHandlerService { public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder() - .name("Log Level") + .name("logger-default-log-level") + .displayName("Default Log Level") .required(true) - .description("The Log Level to use when logging the Attributes") + .description("If a log level is not provided as an attribute within an Action, the default log level will be used.") .allowableValues(DebugLevels.values()) .defaultValue("info") .build(); + public static final PropertyDescriptor DEFAULT_LOG_MESSAGE = new PropertyDescriptor.Builder() + .name("logger-default-log-message") + .displayName("Default Log Message") + .required(true) + .description("If a log message is not provided as an attribute within an Action, the default log message will be used.") + .defaultValue("Rules Action Triggered Log.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + private static final PropertyDescriptor LOG_FACTS = new PropertyDescriptor.Builder() - .name("Log Facts") + .name("log-facts") + .displayName("Log Facts") .required(true) .description("If true, the log message will include the facts which triggered this log action.") .defaultValue("true") @@ -58,7 +71,8 @@ public class LogHandler extends AbstractActionHandlerService { .build(); private static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder() - .name("Log prefix") + .name("log-prefix") + .displayName("Log Prefix") .required(false) .description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -69,6 +83,7 @@ public class LogHandler extends AbstractActionHandlerService { private String logPrefix; private Boolean logFacts; private String defaultLogLevel; + private String defaultLogMessage; @Override protected void init(ControllerServiceInitializationContext config) throws InitializationException { @@ -77,6 +92,7 @@ public class LogHandler extends AbstractActionHandlerService { properties.add(LOG_PREFIX); properties.add(LOG_FACTS); properties.add(DEFAULT_LOG_LEVEL); + properties.add(DEFAULT_LOG_MESSAGE); this.properties = Collections.unmodifiableList(properties); } @@ -85,6 +101,7 @@ public class LogHandler extends AbstractActionHandlerService { logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue(); logFacts = context.getProperty(LOG_FACTS).asBoolean(); defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase(); + defaultLogMessage = context.getProperty(DEFAULT_LOG_MESSAGE).evaluateAttributeExpressions().getValue(); } @Override @@ -98,7 +115,7 @@ public class LogHandler extends AbstractActionHandlerService { Map<String, String> attributes = action.getAttributes(); final String logLevel = attributes.get("logLevel"); final LogLevel level = getLogLevel(logLevel, LogLevel.valueOf(defaultLogLevel)); - final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : "Rules Action Triggered Log."; + final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : defaultLogMessage; final String factsMessage = createFactsLogMessage(facts, eventMessage); logger.log(level, factsMessage); } diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java index 760315e..ee3ca25 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java @@ -43,8 +43,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -@Tags({"rules", "rules engine", "action", "action handler", "logging"}) -@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)") +@Tags({"rules", "rules engine", "action", "action handler", "record", "record sink"}) +@CapabilityDescription("Sends fact information to sink based on a provided action (usually created by a rules engine)." + + " Action objects executed with this Handler should contain \"sendZeroResult\" attribute.") public class RecordSinkHandler extends AbstractActionHandlerService{ static final PropertyDescriptor RECORD_SINK_SERVICE = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 7d2967f..461f818 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -15,4 +15,5 @@ org.apache.nifi.rules.handlers.ActionHandlerLookup org.apache.nifi.rules.handlers.ExpressionHandler org.apache.nifi.rules.handlers.LogHandler -org.apache.nifi.rules.handlers.RecordSinkHandler \ No newline at end of file +org.apache.nifi.rules.handlers.RecordSinkHandler +org.apache.nifi.rules.handlers.AlertHandler \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html new file mode 100644 index 0000000..ea1a498 --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html @@ -0,0 +1,39 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>AlertHandler</title> + <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /--> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Summary</h2> +<p> + The AlertHandler is used to broadcast alerts (bulletins) as dictated by the action object. Action objects can include attributes to configure + the handler otherwise default values will be used. Possible attribute values are listed below. +</p> +<h3>ExpressionHandler Service Attributes</h3> +<table title="AlertHandler Attributes" border="1" width="500"> + <tr><th>Attribute</th><th>Description</th></tr> + <tr><td>category</td><td>The category the alert should be grouped under.</td></tr> + <tr><td>logLevel</td><td>Log Level for the alert. Possible values are trace, debug, info, warn, error.</td></tr> + <tr><td>message</td><td>Message for the alert.</td></tr> +</table> +<br/> +</body> +</html> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html new file mode 100644 index 0000000..8dff1bc --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html @@ -0,0 +1,38 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>ExpressionHandler</title> + <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /--> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Summary</h2> +<p> + The ExpressionHandler is used to execute dynamic commands writtin in MVEL or SpEL expression language. Action objects must include attributes to configure + the handler otherwise an exception will be thrown. Possible attribute values are listed below. +</p> +<h3>ExpressionHandler Service Attributes</h3> +<table title="ExpressionHandler Attributes" border="1" width="500"> + <tr><th>Attribute</th><th>Description</th></tr> + <tr><td>type</td><td>The expression language type of the command to be executed. Possible values are MVEL and SpEl (MVEL will be applied by default if type is not provided).</td></tr> + <tr><td>command</td><td>The expression language command that should be executed</td></tr> +</table> +<br/> +</body> +</html> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html new file mode 100644 index 0000000..b9b487d --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html @@ -0,0 +1,38 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>LogHandler</title> + <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /--> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Summary</h2> +<p> + The LogHandler is used to execute actions that dictate to log a message and/or metrics. LogHandler can be invoked with any Action object. + Action objects can include attributes to configure the LogHandler or rely on the handler's default settings. Possible attribute values are listed below. +</p> +<h3>LogHandler Service Attributes</h3> +<table title="LogHandler Attributes" border="1" width="500"> + <tr><th>Attribute</th><th>Description</th></tr> + <tr><td>logLevel</td><td>Log Level for logged message. Possible values are trace, debug, info, warn, error.</td></tr> + <tr><td>message</td><td>Message for log.</td></tr> +</table> +<br/> +</body> +</html> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html new file mode 100644 index 0000000..4633f37 --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html @@ -0,0 +1,37 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>RecordSinkHandler</title> + <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /--> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Summary</h2> +<p> + The RecordSinkHandler is used to execute actions that send metrics information to a configured sink. RecordSinkHandler can be invoked with any Action object. + Action objects can include attributes to configure the handler. Possible attribute values are listed below. +</p> +<h3>RecordSinkHandler Service Attributes</h3> +<table title="RecordSinkHandler Attributes" border="1" width="500"> + <tr><th>Attribute</th><th>Description</th></tr> + <tr><td>sendZeroResults</td><td>Allow empty results to be sent to sink. Possible values are true and false (default is false).</td></tr> +</table> +<br/> +</body> +</html> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java new file mode 100644 index 0000000..5fda9de --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules.handlers; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinFactory; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.rules.Action; +import org.apache.nifi.util.MockBulletinRepository; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyString; + +public class TestAlertHandler { + + private TestRunner runner; + private MockComponentLog mockComponentLog; + private ReportingContext reportingContext; + private AlertHandler alertHandler; + private MockAlertBulletinRepository mockAlertBulletinRepository; + + @Before + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(TestProcessor.class); + mockComponentLog = new MockComponentLog(); + AlertHandler handler = new MockAlertHandler(mockComponentLog); + mockAlertBulletinRepository = new MockAlertBulletinRepository(); + runner.addControllerService("MockAlertHandler", handler); + runner.enableControllerService(handler); + alertHandler = (AlertHandler) runner.getProcessContext() + .getControllerServiceLookup() + .getControllerService("MockAlertHandler"); + reportingContext = Mockito.mock(ReportingContext.class); + Mockito.when(reportingContext.getBulletinRepository()).thenReturn(mockAlertBulletinRepository); + Mockito.when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString())) + .thenAnswer(invocation -> + BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2))); + } + + @Test + public void testValidService() { + runner.assertValid(alertHandler); + assertThat(alertHandler, instanceOf(AlertHandler.class)); + } + + @Test + public void testAlertNoReportingContext() { + + final Map<String, String> attributes = new HashMap<>(); + final Map<String, Object> metrics = new HashMap<>(); + + attributes.put("logLevel", "INFO"); + attributes.put("message", "This should be not sent as an alert!"); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + try { + alertHandler.execute(action, metrics); + fail(); + } catch (UnsupportedOperationException ex) { + assertTrue(true); + } + } + + @Test + public void testAlertWithBulletinLevel() { + + final Map<String, String> attributes = new HashMap<>(); + final Map<String, Object> metrics = new HashMap<>(); + + final String category = "Rules Alert"; + final String message = "This should be sent as an alert!"; + final String severity = "INFO"; + attributes.put("category", category); + attributes.put("message", message); + attributes.put("severity", severity); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final String expectedOutput = "This should be sent as an alert!\n" + + "Alert Facts:\n" + + "Field: cpu, Value: 90\n" + + "Field: jvmHeap, Value: 1000000\n"; + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + alertHandler.execute(reportingContext, action, metrics); + BulletinRepository bulletinRepository = reportingContext.getBulletinRepository(); + List<Bulletin> bulletins = bulletinRepository.findBulletinsForController(); + assertFalse(bulletins.isEmpty()); + Bulletin bulletin = bulletins.get(0); + assertEquals(bulletin.getCategory(), category); + assertEquals(bulletin.getMessage(), expectedOutput); + assertEquals(bulletin.getLevel(), severity); + } + + @Test + public void testAlertWithDefaultValues() { + + final Map<String, String> attributes = new HashMap<>(); + final Map<String, Object> metrics = new HashMap<>(); + + final String category = "Rules Triggered Alert"; + final String message = "An alert was triggered by a rules based action."; + final String severity = "INFO"; + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final String expectedOutput = "An alert was triggered by a rules-based action.\n" + + "Alert Facts:\n" + + "Field: cpu, Value: 90\n" + + "Field: jvmHeap, Value: 1000000\n"; + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + alertHandler.execute(reportingContext, action, metrics); + BulletinRepository bulletinRepository = reportingContext.getBulletinRepository(); + List<Bulletin> bulletins = bulletinRepository.findBulletinsForController(); + assertFalse(bulletins.isEmpty()); + Bulletin bulletin = bulletins.get(0); + assertEquals(bulletin.getCategory(), category); + assertEquals(bulletin.getMessage(), expectedOutput); + assertEquals(bulletin.getLevel(), severity); + } + + @Test + public void testInvalidContext(){ + final Map<String, String> attributes = new HashMap<>(); + final Map<String, Object> metrics = new HashMap<>(); + + final String category = "Rules Alert"; + final String message = "This should be sent as an alert!"; + final String severity = "INFO"; + attributes.put("category", category); + attributes.put("message", message); + attributes.put("severity", severity); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + PropertyContext fakeContext = new PropertyContext() { + @Override + public PropertyValue getProperty(PropertyDescriptor descriptor) { + return null; + } + + @Override + public Map<String, String> getAllProperties() { + return null; + } + }; + alertHandler.execute(fakeContext, action, metrics); + final String debugMessage = mockComponentLog.getWarnMessage(); + assertTrue(StringUtils.isNotEmpty(debugMessage)); + assertEquals(debugMessage,"Reporting context was not provided to create bulletins."); + } + + @Test + public void testEmptyBulletinRepository(){ + final Map<String, String> attributes = new HashMap<>(); + final Map<String, Object> metrics = new HashMap<>(); + + final String category = "Rules Alert"; + final String message = "This should be sent as an alert!"; + final String severity = "INFO"; + attributes.put("category", category); + attributes.put("message", message); + attributes.put("severity", severity); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + ReportingContext fakeContext = Mockito.mock(ReportingContext.class); + Mockito.when(reportingContext.getBulletinRepository()).thenReturn(null); + alertHandler.execute(fakeContext, action, metrics); + final String debugMessage = mockComponentLog.getWarnMessage(); + assertTrue(StringUtils.isNotEmpty(debugMessage)); + assertEquals(debugMessage,"Bulletin Repository is not available which is unusual. Cannot send a bulletin."); + } + + private static class MockAlertHandler extends AlertHandler { + + private ComponentLog testLogger; + + public MockAlertHandler(ComponentLog testLogger) { + this.testLogger = testLogger; + } + + @Override + protected ComponentLog getLogger() { + return testLogger; + } + + } + + private static class MockAlertBulletinRepository extends MockBulletinRepository { + + List<Bulletin> bulletinList; + + + public MockAlertBulletinRepository() { + bulletinList = new ArrayList<>(); + } + + @Override + public void addBulletin(Bulletin bulletin) { + bulletinList.add(bulletin); + } + + @Override + public List<Bulletin> findBulletinsForController() { + return bulletinList; + } + + } + +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml index 1b00ce7..7cb3179 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml @@ -81,6 +81,12 @@ <version>1.11.0-SNAPSHOT</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-rules-engine-service-api</artifactId> + <version>1.11.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java new file mode 100644 index 0000000..5254abe --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; +import org.apache.nifi.rules.Action; +import org.apache.nifi.rules.PropertyContextActionHandler; +import org.apache.nifi.rules.engine.RulesEngineService; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tags({"reporting", "rules", "action", "action handler", "status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"}) +@CapabilityDescription("Triggers rules-driven actions based on metrics values ") +public class MetricsEventReportingTask extends AbstractReportingTask { + + private List<PropertyDescriptor> properties; + private MetricsQueryService metricsQueryService; + private volatile RulesEngineService rulesEngineService; + private volatile PropertyContextActionHandler actionHandler; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected void init(final ReportingInitializationContext config) { + metricsQueryService = new MetricsSqlQueryService(getLogger()); + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(QueryMetricsUtil.QUERY); + properties.add(QueryMetricsUtil.RULES_ENGINE); + properties.add(QueryMetricsUtil.ACTION_HANDLER); + this.properties = Collections.unmodifiableList(properties); + } + + @OnScheduled + public void setup(final ConfigurationContext context) throws IOException { + actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class); + rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class); + } + + @Override + public void onTrigger(ReportingContext context) { + try { + final String query = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue(); + fireRules(context, actionHandler, rulesEngineService, query); + } catch (Exception e) { + getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e); + } + } + + private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception { + QueryResult queryResult = metricsQueryService.query(context, query); + getLogger().debug("Executing query: {}", new Object[]{ query }); + ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult); + Record record; + try { + while ((record = recordSet.next()) != null) { + final Map<String, Object> facts = new HashMap<>(); + for (String fieldName : record.getRawFieldNames()) { + facts.put(fieldName, record.getValue(fieldName)); + } + List<Action> actions = engine.fireRules(facts); + if(actions == null || actions.isEmpty()){ + getLogger().debug("No actions required for provided facts."); + } else { + actions.forEach(action -> { + actionHandler.execute(context, action,facts); + }); + } + } + } finally { + metricsQueryService.closeQuietly(recordSet); + } + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java index ae0e326..6f3aa9e 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java @@ -16,21 +16,16 @@ */ package org.apache.nifi.reporting.sql; -import org.apache.calcite.config.Lex; -import org.apache.calcite.sql.parser.SqlParser; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.record.sink.RecordSinkService; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.util.StopWatch; @@ -50,34 +45,6 @@ import java.util.concurrent.TimeUnit; + "query on the table when the capability is disabled will cause an error.") public class QueryNiFiReportingTask extends AbstractReportingTask { - static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder() - .name("sql-reporting-record-sink") - .displayName("Record Destination Service") - .description("Specifies the Controller Service to use for writing out the query result records to some destination.") - .identifiesControllerService(RecordSinkService.class) - .required(true) - .build(); - - static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() - .name("sql-reporting-query") - .displayName("SQL Query") - .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. " - + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the " - + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .addValidator(new SqlValidator()) - .build(); - - static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder() - .name("sql-reporting-include-zero-record-results") - .displayName("Include Zero Record Results") - .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); private List<PropertyDescriptor> properties; @@ -89,9 +56,9 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { protected void init(final ReportingInitializationContext config) { metricsQueryService = new MetricsSqlQueryService(getLogger()); final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(QUERY); - properties.add(RECORD_SINK); - properties.add(INCLUDE_ZERO_RECORD_RESULTS); + properties.add(QueryMetricsUtil.QUERY); + properties.add(QueryMetricsUtil.RECORD_SINK); + properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS); this.properties = Collections.unmodifiableList(properties); } @@ -102,7 +69,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { @OnScheduled public void setup(final ConfigurationContext context) throws IOException { - recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class); + recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class); recordSinkService.reset(); } @@ -110,7 +77,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { public void onTrigger(ReportingContext context) { final StopWatch stopWatch = new StopWatch(true); try { - final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue(); + final String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue(); final QueryResult queryResult = metricsQueryService.query(context, sql); final ResultSetRecordSet recordSet; @@ -129,7 +96,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { attributes.put("reporting.task.name", getName()); attributes.put("reporting.task.uuid", getIdentifier()); attributes.put("reporting.task.type", this.getClass().getSimpleName()); - recordSinkService.sendData(recordSet, attributes, context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean()); + recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean()); } catch (Exception e) { getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e); return; @@ -143,41 +110,4 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { } } - private static class SqlValidator implements Validator { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if (context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder() - .input(input) - .subject(subject) - .valid(true) - .explanation("Expression Language Present") - .build(); - } - - final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); - - final SqlParser.Config config = SqlParser.configBuilder() - .setLex(Lex.MYSQL_ANSI) - .build(); - - final SqlParser parser = SqlParser.create(substituted, config); - try { - parser.parseStmt(); - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(true) - .build(); - } catch (final Exception e) { - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Not a valid SQL Statement: " + e.getMessage()) - .build(); - } - } - } - } diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java new file mode 100644 index 0000000..159daec --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql.util; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.rules.PropertyContextActionHandler; +import org.apache.nifi.rules.engine.RulesEngineService; + +public class QueryMetricsUtil { + + public static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder() + .name("sql-reporting-record-sink") + .displayName("Record Destination Service") + .description("Specifies the Controller Service to use for writing out the query result records to some destination.") + .identifiesControllerService(RecordSinkService.class) + .required(true) + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("sql-reporting-query") + .displayName("SQL Query") + .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. " + + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the " + + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(new SqlValidator()) + .build(); + + public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder() + .name("sql-reporting-include-zero-record-results") + .displayName("Include Zero Record Results") + .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor RULES_ENGINE = new PropertyDescriptor.Builder() + .name("rules-engine-service") + .displayName("Rules Engine Service") + .description("Specifies the Controller Service to use for applying rules to metrics.") + .identifiesControllerService(RulesEngineService.class) + .required(true) + .build(); + + public static final PropertyDescriptor ACTION_HANDLER = new PropertyDescriptor.Builder() + .name("action-handler") + .displayName("Event Action Handler") + .description("Handler that will execute the defined action returned from rules engine (if Action type is supported by the handler)") + .identifiesControllerService(PropertyContextActionHandler.class) + .required(true) + .build(); + + public static class SqlValidator implements Validator { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Expression Language Present") + .build(); + } + + final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + + final SqlParser.Config config = SqlParser.configBuilder() + .setLex(Lex.MYSQL_ANSI) + .build(); + + final SqlParser parser = SqlParser.create(substituted, config); + try { + parser.parseStmt(); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(true) + .build(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Not a valid SQL Statement: " + e.getMessage()) + .build(); + } + } + } + +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask index ec340e8..c3f5883 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.reporting.sql.QueryNiFiReportingTask \ No newline at end of file +org.apache.nifi.reporting.sql.QueryNiFiReportingTask +org.apache.nifi.reporting.sql.MetricsEventReportingTask \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html new file mode 100644 index 0000000..2392aab --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html @@ -0,0 +1,34 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>Metrics Event Reporting Task</title> + <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /--> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Summary</h2> +<p> + This reporting task can be used to issue SQL queries against various NiFi metrics information, submit returned data to a rules engine (which will determine if any actions should be performed) + and execute the prescribed actions using action handlers. This task requires a RulesEngineService (which will identify any actions that should be performed) and an ActionHandler which will execute the action(s). + A distinct ActionHandler can be used to service all events or an ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally action handler should be associated with the expected action types + returned from the rules engine. +</p> +<br/> +</body> +</html> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java similarity index 55% copy from nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java copy to nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java index 9f4cb0a..83992e4 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java @@ -16,24 +16,32 @@ */ package org.apache.nifi.reporting.sql; - +import com.google.common.collect.Lists; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.record.sink.MockRecordSinkService; -import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingInitializationContext; -import org.apache.nifi.reporting.util.metrics.MetricNames; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; +import org.apache.nifi.rules.Action; +import org.apache.nifi.rules.MockPropertyContextActionHandler; +import org.apache.nifi.rules.PropertyContextActionHandler; +import org.apache.nifi.rules.engine.MockRulesEngineService; +import org.apache.nifi.rules.engine.RulesEngineService; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockPropertyValue; +import org.apache.nifi.util.Tuple; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -46,23 +54,23 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestQueryNiFiReportingTask { +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +public class TestMetricsEventReportingTask { private ReportingContext context; - private MockQueryNiFiReportingTask reportingTask; - private MockRecordSinkService mockRecordSinkService; + private MockMetricsEventReportingTask reportingTask; + private MockPropertyContextActionHandler actionHandler; + private MockRulesEngineService rulesEngineService; private ProcessGroupStatus status; @Before public void setup() { - mockRecordSinkService = new MockRecordSinkService(); status = new ProcessGroupStatus(); + actionHandler = new MockPropertyContextActionHandler(); status.setId("1234"); status.setFlowFilesReceived(5); status.setBytesReceived(10000); @@ -83,15 +91,21 @@ public class TestQueryNiFiReportingTask { processorStatuses.add(procStatus); status.setProcessorStatus(processorStatuses); + ConnectionStatusPredictions connectionStatusPredictions = new ConnectionStatusPredictions(); + connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000); + connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000); + connectionStatusPredictions.setNextPredictedQueuedCount(1000000000); + connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L); + ConnectionStatus root1ConnectionStatus = new ConnectionStatus(); root1ConnectionStatus.setId("root1"); root1ConnectionStatus.setQueuedCount(1000); - root1ConnectionStatus.setBackPressureObjectThreshold(1000); + root1ConnectionStatus.setPredictions(connectionStatusPredictions); ConnectionStatus root2ConnectionStatus = new ConnectionStatus(); root2ConnectionStatus.setId("root2"); root2ConnectionStatus.setQueuedCount(500); - root2ConnectionStatus.setBackPressureObjectThreshold(1000); + root2ConnectionStatus.setPredictions(connectionStatusPredictions); Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>(); rootConnectionStatuses.add(root1ConnectionStatus); @@ -138,120 +152,42 @@ public class TestQueryNiFiReportingTask { @Test public void testConnectionStatusTable() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); + properties.put(QueryMetricsUtil.QUERY, "select connectionId, predictedQueuedCount, predictedTimeToBytesBackpressureMillis from CONNECTION_STATUS_PREDICTIONS"); reportingTask = initTask(properties); reportingTask.onTrigger(context); + List<Map<String,Object>> metricsList = actionHandler.getRows(); + List<Tuple<String, Action>> defaultLogActions = actionHandler.getDefaultActionsByType("LOG"); + List<Tuple<String, Action>> defaultAlertActions = actionHandler.getDefaultActionsByType("ALERT"); + List<PropertyContext> propertyContexts = actionHandler.getPropertyContexts(); + assertFalse(metricsList.isEmpty()); + assertEquals(2,defaultLogActions.size()); + assertEquals(2,defaultAlertActions.size()); + assertEquals(4,propertyContexts.size()); - List<Map<String, Object>> rows = mockRecordSinkService.getRows(); - assertEquals(4, rows.size()); - // Validate the first row - Map<String, Object> row = rows.get(0); - assertEquals(3, row.size()); // Only projected 2 columns - Object id = row.get("id"); - assertTrue(id instanceof String); - assertEquals("nested", id); - assertEquals(1001, row.get("queuedCount")); - // Validate the second row - row = rows.get(1); - id = row.get("id"); - assertEquals("root1", id); - assertEquals(1000, row.get("queuedCount")); - assertEquals(true, row.get("isBackPressureEnabled")); - // Validate the third row - row = rows.get(2); - id = row.get("id"); - assertEquals("root2", id); - assertEquals(500, row.get("queuedCount")); - assertEquals(false, row.get("isBackPressureEnabled")); - // Validate the fourth row - row = rows.get(3); - id = row.get("id"); - assertEquals("nested2", id); - assertEquals(3, row.get("queuedCount")); } - @Test - public void testJvmMetricsTable() throws IOException, InitializationException { - final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select " - + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT, - MetricNames.JVM_THREAD_COUNT, - MetricNames.JVM_THREAD_STATES_BLOCKED, - MetricNames.JVM_THREAD_STATES_RUNNABLE, - MetricNames.JVM_THREAD_STATES_TERMINATED, - MetricNames.JVM_THREAD_STATES_TIMED_WAITING, - MetricNames.JVM_UPTIME, - MetricNames.JVM_HEAP_USED, - MetricNames.JVM_HEAP_USAGE, - MetricNames.JVM_NON_HEAP_USAGE, - MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) -> s.replace(".", "_")).collect(Collectors.joining(",")) - + " from JVM_METRICS"); - reportingTask = initTask(properties); - reportingTask.onTrigger(context); - - List<Map<String, Object>> rows = mockRecordSinkService.getRows(); - assertEquals(1, rows.size()); - Map<String,Object> row = rows.get(0); - assertEquals(11, row.size()); - assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".","_")) instanceof Integer); - assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".","_")) instanceof Double); - } - - @Test - public void testProcessGroupStatusTable() throws IOException, InitializationException { - final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); - reportingTask = initTask(properties); - reportingTask.onTrigger(context); - - List<Map<String, Object>> rows = mockRecordSinkService.getRows(); - assertEquals(4, rows.size()); - // Validate the first row - Map<String, Object> row = rows.get(0); - assertEquals(20, row.size()); - assertEquals(1L, row.get("bytesRead")); - // Validate the second row - row = rows.get(1); - assertEquals(1234L, row.get("bytesRead")); - // Validate the third row - row = rows.get(2); - assertEquals(12345L, row.get("bytesRead")); - // Validate the fourth row - row = rows.get(3); - assertEquals(20000L, row.get("bytesRead")); - } - - @Test - public void testNoResults() throws IOException, InitializationException { - final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); - reportingTask = initTask(properties); - reportingTask.onTrigger(context); - - List<Map<String, Object>> rows = mockRecordSinkService.getRows(); - assertEquals(0, rows.size()); - } - - private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException { + private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException { final ComponentLog logger = Mockito.mock(ComponentLog.class); - reportingTask = new MockQueryNiFiReportingTask(); + final BulletinRepository bulletinRepository = Mockito.mock(BulletinRepository.class); + reportingTask = new MockMetricsEventReportingTask(); final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); Mockito.when(initContext.getLogger()).thenReturn(logger); reportingTask.initialize(initContext); Map<PropertyDescriptor, String> properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) { properties.put(descriptor, descriptor.getDefaultValue()); } properties.putAll(customProperties); context = Mockito.mock(ReportingContext.class); + Mockito.when(context.isAnalyticsEnabled()).thenReturn(true); Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask)); + Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository); + Mockito.when(context.createBulletin(anyString(),any(Severity.class), anyString())).thenReturn(null); + Mockito.doAnswer((Answer<PropertyValue>) invocation -> { final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); return new MockPropertyValue(properties.get(descriptor)); @@ -262,17 +198,27 @@ public class TestQueryNiFiReportingTask { Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); - mockRecordSinkService = new MockRecordSinkService(); - Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); - Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService); + actionHandler = new MockPropertyContextActionHandler(); + Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler); + + Action action1 = new Action(); + action1.setType("LOG"); + Action action2 = new Action(); + action2.setType("ALERT"); + + final PropertyValue resValue = Mockito.mock(StandardPropertyValue.class); + rulesEngineService = new MockRulesEngineService(Lists.newArrayList(action1,action2)); + Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService); ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class); - Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); + Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue); + Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue); reportingTask.setup(configContext); return reportingTask; } - private static final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask { + private static final class MockMetricsEventReportingTask extends MetricsEventReportingTask { + } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java index 9f4cb0a..eae9e91 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java @@ -31,6 +31,7 @@ import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; import org.apache.nifi.reporting.util.metrics.MetricNames; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockPropertyValue; @@ -138,8 +139,8 @@ public class TestQueryNiFiReportingTask { @Test public void testConnectionStatusTable() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); @@ -174,8 +175,8 @@ public class TestQueryNiFiReportingTask { @Test public void testJvmMetricsTable() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select " + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select " + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT, MetricNames.JVM_THREAD_COUNT, MetricNames.JVM_THREAD_STATES_BLOCKED, @@ -202,8 +203,8 @@ public class TestQueryNiFiReportingTask { @Test public void testProcessGroupStatusTable() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); @@ -227,8 +228,8 @@ public class TestQueryNiFiReportingTask { @Test public void testNoResults() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); reportingTask = initTask(properties); reportingTask.onTrigger(context); @@ -263,11 +264,11 @@ public class TestQueryNiFiReportingTask { final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); mockRecordSinkService = new MockRecordSinkService(); - Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); + Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService); ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class); - Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); + Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); reportingTask.setup(configContext); return reportingTask; diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java new file mode 100644 index 0000000..323317d --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.Tuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler{ + + private List<Map<String, Object>> rows = new ArrayList<>(); + private List<Tuple<String,Action>> defaultActions = new ArrayList<>(); + private List<PropertyContext> propertyContexts = new ArrayList<>(); + + + @Override + public void execute(PropertyContext context, Action action, Map<String, Object> facts) { + propertyContexts.add(context); + execute(action, facts); + } + + @Override + public void execute(Action action, Map<String, Object> facts) { + rows.add(facts); + defaultActions.add( new Tuple<>(action.getType(),action)); + } + + + @Override + public void initialize(ControllerServiceInitializationContext context) throws InitializationException { + + } + + public List<Map<String, Object>> getRows() { + return rows; + } + + public List<Tuple<String, Action>> getDefaultActions() { + return defaultActions; + } + + public List<Tuple<String,Action>> getDefaultActionsByType(final String type){ + return defaultActions.stream().filter(stringActionTuple -> stringActionTuple + .getKey().equalsIgnoreCase(type)).collect(Collectors.toList()); + } + + public List<PropertyContext> getPropertyContexts() { + return propertyContexts; + } + + @Override + public String getIdentifier() { + return "MockPropertyContextActionHandler"; + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java new file mode 100644 index 0000000..e3ccc73 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules.engine; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.rules.Action; + +import java.util.List; +import java.util.Map; + +public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService { + private List<Action> actions; + + public MockRulesEngineService(List<Action> actions) { + this.actions = actions; + } + + @Override + public List<Action> fireRules(Map<String, Object> facts) { + return actions; + } + + @Override + public void initialize(ControllerServiceInitializationContext context) throws InitializationException { + } + + @Override + public String getIdentifier() { + return "MockRulesEngineService"; + } +}