This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 8e1452a NIFI-6649 - added separate query interval configuration for observation queries NIFI-6649 - documentation update 8e1452a is described below commit 8e1452a3f342ee45dfd589d343c9ecf6a7cf6825 Author: Yolanda M. Davis <yolanda.m.da...@gmail.com> AuthorDate: Tue Sep 10 15:54:42 2019 -0400 NIFI-6649 - added separate query interval configuration for observation queries NIFI-6649 - documentation update NIFI-6649 - add debug logging for score and prediction information NIFI-6649 - fix to ensure counts return minimum value of 0 if not infinite or NaN Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3719 --- .../java/org/apache/nifi/util/NiFiProperties.java | 2 ++ .../src/main/asciidoc/administration-guide.adoc | 3 ++- .../org/apache/nifi/controller/FlowController.java | 13 ++++++++++- .../CachingConnectionStatusAnalyticsEngine.java | 5 ++-- .../analytics/ConnectionStatusAnalytics.java | 27 ++++++++++++++++++++-- .../analytics/ConnectionStatusAnalyticsEngine.java | 6 ++++- ...TestCachingConnectionStatusAnalyticsEngine.java | 8 ++++--- .../TestConnectionStatusAnalyticsEngine.java | 5 ++-- .../analytics/TestStatusAnalyticsEngine.java | 5 ++-- .../nifi-framework/nifi-resources/pom.xml | 1 + .../src/main/resources/conf/nifi.properties | 1 + 11 files changed, 62 insertions(+), 14 deletions(-) diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 7da514c..afcd268 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -241,6 +241,7 @@ public abstract class NiFiProperties { // analytics properties public static final String ANALYTICS_PREDICTION_ENABLED = "nifi.analytics.predict.enabled"; public static final String ANALYTICS_PREDICTION_INTERVAL = "nifi.analytics.predict.interval"; + public static final String ANALYTICS_QUERY_INTERVAL = "nifi.analytics.query.interval"; public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "nifi.analytics.connection.model.implementation"; public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= "nifi.analytics.connection.model.score.name"; public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold"; @@ -318,6 +319,7 @@ public abstract class NiFiProperties { // analytics defaults public static final String DEFAULT_ANALYTICS_PREDICTION_ENABLED = "false"; public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3 mins"; + public static final String DEFAULT_ANALYTICS_QUERY_INTERVAL = "3 mins"; public final static String DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares"; public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared"; public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90; diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 432ca9e..8c5cac9 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -2389,7 +2389,7 @@ In order to generate predictions, local status snapshot history is queried to ob NiFi evaluates the model's effectiveness before sending prediction information by using the model's R-Squared score by default. One important note: R-Square is a measure of how close the regression line fits the observation data vs. how accurate the prediction will be; therefore there may be some measure of error. If the R-Squared score for the calculated model meets the configured threshold (as defined by `nifi.analytics.connection.model.score.threshold`) then the model will be used for [...] -The prediction interval `nifi.analytics.predict.interval` can be configured to project out further when back pressure will occur. Predictions further out in time require more observations stored locally to generate an effective model. This may also require tuning of the model's scoring threshold value to select a score which can offer reasonable predictions. +The prediction interval `nifi.analytics.predict.interval` can be configured to project out further when back pressure will occur. The prediction query interval `nifi.analytics.query.interval` can also be configured to determine how far back in time past observations should be queried in order to generate the model. Adjustments to these settings may require tuning of the model's scoring threshold value to select a score that can offer reasonable predictions. See <<analytics_properties>> for complete information on configuring analytic properties. @@ -3341,6 +3341,7 @@ These properties determine the behavior of the internal NiFi predictive analytic |*Property*|*Description* |`nifi.analytics.predict.enabled`|This indicates whether prediction should be enabled for the cluster. The default is `false`. |`nifi.analytics.predict.interval`|This indicates a time interval for which analytical predictions (queue saturation, e.g.) should be made. The default value is `3 mins`. +|`nifi.analytics.query.interval`|This indicates a time interval to query for past observations (e.g. the last 3 minutes of snapshots). The default value is `3 mins`. NOTE: This value should be at least 3 times greater than `nifi.components.status.snapshot.frequency` to ensure enough observations are retrieved for predictions. |`nifi.analytics.connection.model.implementation`|This is the implementation class for the status analytics model used to make connection predictions. The default value is `org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`. |`nifi.analytics.connection.model.score.name`|This is the name of the scoring type that should be used to evaluate model. The default value is `rSquared`. |`nifi.analytics.connection.model.score.threshold`|This is the threshold for the scoring value (where model score should be above given threshold). The default value is `.9`. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 98b4395..c6f942b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -614,6 +614,17 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS); } + // Determine interval for querying past observations + final String queryInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_QUERY_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL); + long queryIntervalMillis; + try { + queryIntervalMillis = FormatUtils.getTimeDuration(queryInterval, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + LOG.warn("Analytics is enabled however could not retrieve value for "+ NiFiProperties.ANALYTICS_QUERY_INTERVAL + ". This property has been set to '" + + NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL + "'"); + queryIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL, TimeUnit.MILLISECONDS); + } + // Determine score name to use for evaluating model performance String modelScoreName = nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME); @@ -632,7 +643,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node .getConnectionStatusModelMap(extensionManager, nifiProperties); analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, - predictionIntervalMillis, modelScoreName, modelScoreThreshold); + predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold); } eventAccess = new StandardEventAccess(this, flowFileEventRepository); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java index 3588ed5..aa67811 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java @@ -40,9 +40,10 @@ public class CachingConnectionStatusAnalyticsEngine extends ConnectionStatusAnal public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, - long predictionIntervalMillis, String scoreName, double scoreThreshold) { + long predictionIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) { - super(flowManager,statusRepository,flowFileEventRepository,modelMap,predictionIntervalMillis,scoreName,scoreThreshold); + super(flowManager, statusRepository, flowFileEventRepository, modelMap, predictionIntervalMillis, + queryIntervalMillis, scoreName, scoreThreshold); this.cache = Caffeine.newBuilder() .expireAfterWrite(30, TimeUnit.MINUTES) .build(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index 4bf1948..7abba4b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -24,6 +24,7 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.stream.Stream; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.flow.FlowManager; @@ -56,6 +57,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { private final Boolean supportOnlineLearning; private Boolean extendWindow = false; private long intervalMillis = 3L * 60 * 1000; // Default is 3 minutes + private long queryIntervalMillis = 3L * 60 * 1000; //Default is 3 minutes private String scoreName = "rSquared"; private double scoreThreshold = .90; @@ -78,7 +80,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { //Obtain latest observations when available, extend window if needed to obtain minimum observations this.queryWindow = new QueryWindow(extendWindow ? queryWindow.getStartTimeMillis() : queryWindow.getEndTimeMillis(), System.currentTimeMillis()); } else { - this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis()); + this.queryWindow = new QueryWindow(System.currentTimeMillis() - getQueryIntervalMillis(), System.currentTimeMillis()); } modelMap.forEach((metric, modelFunction) -> { @@ -94,6 +96,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { try { LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier); model.learn(Stream.of(features), Stream.of(values)); + + if(MapUtils.isNotEmpty(model.getScores())){ + model.getScores().forEach((key, value) -> { + LOG.debug("Model Scores for prediction metric {} for connection id {}: {}={} ", metric, connectionIdentifier, key, value); + }); + } + extendWindow = false; } catch (Exception ex) { LOG.debug("Exception encountered while training model for connection id {}: {}", connectionIdentifier, ex.getMessage()); @@ -137,6 +146,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { predictFeatures.put(1, inOutRatio); return convertTimePrediction(bytesModel.predictVariable(0, predictFeatures, backPressureBytes), System.currentTimeMillis()); } else { + LOG.debug("Model is not valid for calculating time back pressure by content size in bytes. Returning -1"); return -1L; } } @@ -164,6 +174,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { predictFeatures.put(1, inOutRatio); return convertTimePrediction(countModel.predictVariable(0, predictFeatures, backPressureCountThreshold), System.currentTimeMillis()); } else { + LOG.debug("Model is not valid for calculating time to back pressure by object count. Returning -1"); return -1L; } } @@ -186,6 +197,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { predictFeatures.add(inOutRatio); return convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new Double[2]))); } else { + LOG.debug("Model is not valid for predicting content size in bytes for next interval. Returning -1"); return -1L; } } @@ -208,6 +220,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { predictFeatures.add(inOutRatio); return convertCountPrediction(countModel.predict(predictFeatures.toArray(new Double[2]))); } else { + LOG.debug("Model is not valid for predicting object count for next interval. Returning -1"); return -1L; } @@ -266,6 +279,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { this.intervalMillis = intervalTimeMillis; } + public long getQueryIntervalMillis() { + return queryIntervalMillis; + } + + public void setQueryIntervalMillis(long queryIntervalMillis) { + this.queryIntervalMillis = queryIntervalMillis; + } + public String getScoreName() { return scoreName; } @@ -334,6 +355,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { */ private Long convertTimePrediction(Double prediction, Long timeMillis) { if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < timeMillis) { + LOG.debug("Time prediction value is invalid: {}. Returning -1.",prediction); return -1L; } else { return Math.max(0, Math.round(prediction) - timeMillis); @@ -346,7 +368,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { * @return prediction prediction value converted into valid value for consumption */ private Long convertCountPrediction(Double prediction) { - if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < 0) { + if (Double.isNaN(prediction) || Double.isInfinite(prediction)) { + LOG.debug("Count prediction value is invalid: {}. Returning -1.",prediction); return -1L; } else { return Math.max(0, Math.round(prediction)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java index 17a2704..e7eecac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java @@ -37,16 +37,19 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { protected final FlowFileEventRepository flowFileEventRepository; protected final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap; protected final long predictionIntervalMillis; + protected final long queryIntervalMillis; protected final String scoreName; protected final double scoreThreshold; public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository, - Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis, String scoreName, double scoreThreshold) { + Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis, + long queryIntervalMillis, String scoreName, double scoreThreshold) { this.flowManager = flowManager; this.statusRepository = statusRepository; this.flowFileEventRepository = flowFileEventRepository; this.predictionIntervalMillis = predictionIntervalMillis; this.modelMap = modelMap; + this.queryIntervalMillis = queryIntervalMillis; this.scoreName = scoreName; this.scoreThreshold = scoreThreshold; } @@ -60,6 +63,7 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { public StatusAnalytics getStatusAnalytics(String identifier) { ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, modelMap, identifier, false); connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis); + connectionStatusAnalytics.setQueryIntervalMillis(queryIntervalMillis); connectionStatusAnalytics.setScoreName(scoreName); connectionStatusAnalytics.setScoreThreshold(scoreThreshold); connectionStatusAnalytics.refresh(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java index 57b300f..7be0b8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java @@ -33,15 +33,17 @@ public class TestCachingConnectionStatusAnalyticsEngine extends TestStatusAnalyt public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository componentStatusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, - long predictIntervalMillis, String scoreName, double scoreThreshold) { + long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) { - return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, predictIntervalMillis, scoreName, scoreThreshold); + return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, predictIntervalMillis, + queryIntervalMillis, scoreName, scoreThreshold); } @Test public void testCachedStatusAnalytics() { StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, modelMap, - DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD); + DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS, + DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD); StatusAnalytics statusAnalyticsA = statusAnalyticsEngine.getStatusAnalytics("A"); StatusAnalytics statusAnalyticsB = statusAnalyticsEngine.getStatusAnalytics("B"); StatusAnalytics statusAnalyticsTest = statusAnalyticsEngine.getStatusAnalytics("A"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java index 0b5612d..eb56129 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java @@ -28,8 +28,9 @@ public class TestConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngi @Override public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository statusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, - long predictIntervalMillis, String scoreName, double scoreThreshold) { - return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS, scoreName, scoreThreshold); + long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) { + return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,modelMap, + DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS, scoreName, scoreThreshold); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java index 477216c..f1dc0be 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java @@ -43,6 +43,7 @@ import org.mockito.stubbing.Answer; public abstract class TestStatusAnalyticsEngine { static final long DEFAULT_PREDICT_INTERVAL_MILLIS = 3L * 60 * 1000; + static final long DEFAULT_QUERY_INTERVAL_MILLIS = 3L * 60 * 1000; static final String DEFAULT_SCORE_NAME = "rSquared"; static final double DEFAULT_SCORE_THRESHOLD = .9; @@ -89,13 +90,13 @@ public abstract class TestStatusAnalyticsEngine { @Test public void testGetStatusAnalytics() { StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS, - DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD); + DEFAULT_QUERY_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD); StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics("1"); assertNotNull(statusAnalytics); } public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository componentStatusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, - long predictIntervalMillis, String scoreName, double scoreThreshold); + long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index fe69177..56bd941 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -212,6 +212,7 @@ <!-- nifi.properties: analytics properties --> <nifi.analytics.predict.enabled>false</nifi.analytics.predict.enabled> <nifi.analytics.predict.interval>3 mins</nifi.analytics.predict.interval> + <nifi.analytics.query.interval>3 mins</nifi.analytics.query.interval> <nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation> <nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name> <nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold> diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 3df0005..617e722 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -260,6 +260,7 @@ nifi.variable.registry.properties= # analytics properties # nifi.analytics.predict.enabled=${nifi.analytics.predict.enabled} nifi.analytics.predict.interval=${nifi.analytics.predict.interval} +nifi.analytics.query.interval=${nifi.analytics.query.interval} nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation} nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name} nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}