This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 239b9362f4 NIFI-13465: Fix NPE in QueryNiFiReportingTask when analytics tables are queried without analytics enabled 239b9362f4 is described below commit 239b9362f423e14a0ca79934f707f7b9d8964c8f Author: Matt Burgess <mattyb...@apache.org> AuthorDate: Fri Jun 28 11:53:56 2024 -0400 NIFI-13465: Fix NPE in QueryNiFiReportingTask when analytics tables are queried without analytics enabled Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #9016. --- .../nifi/reporting/sql/MetricsSqlQueryService.java | 11 +++-- .../ConnectionStatusPredictionDataSource.java | 4 ++ .../reporting/sql/TestQueryNiFiReportingTask.java | 51 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java index 41cdd2d9bb..b60f61acff 100644 --- a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java +++ b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java @@ -148,12 +148,11 @@ public class MetricsSqlQueryService implements MetricsQueryService { final NiFiTable connectionStatusTable = new NiFiTable("CONNECTION_STATUS", connectionStatusDataSource, getLogger()); database.addTable(connectionStatusTable); - if (context.isAnalyticsEnabled()) { - final ResettableDataSource predictionDataSource = new ConnectionStatusPredictionDataSource(context, groupStatusCache); - final NiFiTable connectionStatusPredictionsTable = new NiFiTable("CONNECTION_STATUS_PREDICTIONS", predictionDataSource, getLogger()); - database.addTable(connectionStatusPredictionsTable); - } else { - getLogger().debug("Analytics is not enabled, CONNECTION_STATUS_PREDICTIONS table is not available for querying"); + final ResettableDataSource predictionDataSource = new ConnectionStatusPredictionDataSource(context, groupStatusCache); + final NiFiTable connectionStatusPredictionsTable = new NiFiTable("CONNECTION_STATUS_PREDICTIONS", predictionDataSource, getLogger()); + database.addTable(connectionStatusPredictionsTable); + if (!context.isAnalyticsEnabled()) { + getLogger().info("Analytics is not enabled, CONNECTION_STATUS_PREDICTIONS table will not contain any rows"); } final ResettableDataSource processorStatusDataSource = new ProcessorStatusDataSource(context, groupStatusCache); diff --git a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java index 97d43f6b24..803c301f65 100644 --- a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java +++ b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java @@ -88,6 +88,10 @@ public class ConnectionStatusPredictionDataSource implements ResettableDataSourc private Object[] toArray(final ConnectionStatus status) { final ConnectionStatusPredictions predictions = status.getPredictions(); + if (predictions == null) { + return new Object[8]; + } + return new Object[] { status.getId(), predictions.getNextPredictedQueuedBytes(), diff --git a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java index a1512352fe..ab80bea754 100644 --- a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java @@ -30,6 +30,7 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.Processor; import org.apache.nifi.provenance.MockProvenanceEvent; @@ -128,11 +129,29 @@ class TestQueryNiFiReportingTask { root1ConnectionStatus.setId("root1"); root1ConnectionStatus.setQueuedCount(1000); root1ConnectionStatus.setBackPressureObjectThreshold(1000); + // Set backpressure predictions + ConnectionStatusPredictions connectionStatusPredictions1 = new ConnectionStatusPredictions(); + connectionStatusPredictions1.setPredictedTimeToCountBackpressureMillis(2000); + connectionStatusPredictions1.setPredictedTimeToBytesBackpressureMillis(2000); + connectionStatusPredictions1.setNextPredictedQueuedBytes(1024); + connectionStatusPredictions1.setNextPredictedQueuedCount(1); + connectionStatusPredictions1.setPredictedPercentBytes(55); + connectionStatusPredictions1.setPredictedPercentCount(30); + root1ConnectionStatus.setPredictions(connectionStatusPredictions1); ConnectionStatus root2ConnectionStatus = new ConnectionStatus(); root2ConnectionStatus.setId("root2"); root2ConnectionStatus.setQueuedCount(500); root2ConnectionStatus.setBackPressureObjectThreshold(1000); + // Set backpressure predictions + ConnectionStatusPredictions connectionStatusPredictions2 = new ConnectionStatusPredictions(); + connectionStatusPredictions2.setPredictedTimeToBytesBackpressureMillis(2000); + connectionStatusPredictions2.setPredictedTimeToBytesBackpressureMillis(2000); + connectionStatusPredictions2.setNextPredictedQueuedBytes(1024); + connectionStatusPredictions2.setNextPredictedQueuedCount(1); + connectionStatusPredictions2.setPredictedPercentBytes(55); + connectionStatusPredictions2.setPredictedPercentCount(30); + root2ConnectionStatus.setPredictions(connectionStatusPredictions2); Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>(); rootConnectionStatuses.add(root1ConnectionStatus); @@ -151,6 +170,15 @@ class TestQueryNiFiReportingTask { ConnectionStatus nestedConnectionStatus = new ConnectionStatus(); nestedConnectionStatus.setId("nested"); nestedConnectionStatus.setQueuedCount(1001); + // Set backpressure predictions + ConnectionStatusPredictions connectionStatusPredictions3 = new ConnectionStatusPredictions(); + connectionStatusPredictions3.setPredictedTimeToBytesBackpressureMillis(2000); + connectionStatusPredictions3.setPredictedTimeToBytesBackpressureMillis(2000); + connectionStatusPredictions3.setNextPredictedQueuedBytes(1024); + connectionStatusPredictions3.setNextPredictedQueuedCount(1); + connectionStatusPredictions3.setPredictedPercentBytes(55); + connectionStatusPredictions3.setPredictedPercentCount(30); + nestedConnectionStatus.setPredictions(connectionStatusPredictions3); Collection<ConnectionStatus> nestedConnectionStatuses = new ArrayList<>(); nestedConnectionStatuses.add(nestedConnectionStatus); groupStatus2.setConnectionStatus(nestedConnectionStatuses); @@ -163,6 +191,15 @@ class TestQueryNiFiReportingTask { ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus(); nestedConnectionStatus2.setId("nested2"); nestedConnectionStatus2.setQueuedCount(3); + // Set backpressure predictions + ConnectionStatusPredictions connectionStatusPredictions4 = new ConnectionStatusPredictions(); + connectionStatusPredictions4.setPredictedTimeToBytesBackpressureMillis(2000); + connectionStatusPredictions4.setPredictedTimeToBytesBackpressureMillis(2000); + connectionStatusPredictions4.setNextPredictedQueuedBytes(1024); + connectionStatusPredictions4.setNextPredictedQueuedCount(1); + connectionStatusPredictions4.setPredictedPercentBytes(55); + connectionStatusPredictions4.setPredictedPercentCount(30); + nestedConnectionStatus2.setPredictions(connectionStatusPredictions4); Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>(); nestedConnectionStatuses2.add(nestedConnectionStatus2); groupStatus3.setConnectionStatus(nestedConnectionStatuses2); @@ -226,6 +263,20 @@ class TestQueryNiFiReportingTask { assertEquals(3, row.get("queuedCount")); } + @Test + void testConnectionStatusTableJoin() throws InitializationException { + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "SELECT id " + + "FROM CONNECTION_STATUS " + + "JOIN CONNECTION_STATUS_PREDICTIONS ON CONNECTION_STATUS_PREDICTIONS.connectionId = CONNECTION_STATUS.id"); + reportingTask = initTask(properties); + reportingTask.onTrigger(context); + + List<Map<String, Object>> rows = mockRecordSinkService.getRows(); + assertEquals(4, rows.size()); + } + @Test void testBulletinIsInTimeWindow() throws InitializationException { String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";