This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 239b9362f4 NIFI-13465: Fix NPE in QueryNiFiReportingTask when 
analytics tables are queried without analytics enabled
239b9362f4 is described below

commit 239b9362f423e14a0ca79934f707f7b9d8964c8f
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Fri Jun 28 11:53:56 2024 -0400

    NIFI-13465: Fix NPE in QueryNiFiReportingTask when analytics tables are 
queried without analytics enabled
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #9016.
---
 .../nifi/reporting/sql/MetricsSqlQueryService.java | 11 +++--
 .../ConnectionStatusPredictionDataSource.java      |  4 ++
 .../reporting/sql/TestQueryNiFiReportingTask.java  | 51 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 6 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
index 41cdd2d9bb..b60f61acff 100644
--- 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
+++ 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
@@ -148,12 +148,11 @@ public class MetricsSqlQueryService implements 
MetricsQueryService {
         final NiFiTable connectionStatusTable = new 
NiFiTable("CONNECTION_STATUS", connectionStatusDataSource, getLogger());
         database.addTable(connectionStatusTable);
 
-        if (context.isAnalyticsEnabled()) {
-            final ResettableDataSource predictionDataSource = new 
ConnectionStatusPredictionDataSource(context, groupStatusCache);
-            final NiFiTable connectionStatusPredictionsTable = new 
NiFiTable("CONNECTION_STATUS_PREDICTIONS", predictionDataSource, getLogger());
-            database.addTable(connectionStatusPredictionsTable);
-        } else {
-            getLogger().debug("Analytics is not enabled, 
CONNECTION_STATUS_PREDICTIONS table is not available for querying");
+        final ResettableDataSource predictionDataSource = new 
ConnectionStatusPredictionDataSource(context, groupStatusCache);
+        final NiFiTable connectionStatusPredictionsTable = new 
NiFiTable("CONNECTION_STATUS_PREDICTIONS", predictionDataSource, getLogger());
+        database.addTable(connectionStatusPredictionsTable);
+        if (!context.isAnalyticsEnabled()) {
+            getLogger().info("Analytics is not enabled, 
CONNECTION_STATUS_PREDICTIONS table will not contain any rows");
         }
 
         final ResettableDataSource processorStatusDataSource = new 
ProcessorStatusDataSource(context, groupStatusCache);
diff --git 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java
 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java
index 97d43f6b24..803c301f65 100644
--- 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java
+++ 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ConnectionStatusPredictionDataSource.java
@@ -88,6 +88,10 @@ public class ConnectionStatusPredictionDataSource implements 
ResettableDataSourc
     private Object[] toArray(final ConnectionStatus status) {
         final ConnectionStatusPredictions predictions = 
status.getPredictions();
 
+        if (predictions == null) {
+            return new Object[8];
+        }
+
         return new Object[] {
             status.getId(),
             predictions.getNextPredictedQueuedBytes(),
diff --git 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index a1512352fe..ab80bea754 100644
--- 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -30,6 +30,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.provenance.MockProvenanceEvent;
@@ -128,11 +129,29 @@ class TestQueryNiFiReportingTask {
         root1ConnectionStatus.setId("root1");
         root1ConnectionStatus.setQueuedCount(1000);
         root1ConnectionStatus.setBackPressureObjectThreshold(1000);
+        // Set backpressure predictions
+        ConnectionStatusPredictions connectionStatusPredictions1 = new 
ConnectionStatusPredictions();
+        
connectionStatusPredictions1.setPredictedTimeToCountBackpressureMillis(2000);
+        
connectionStatusPredictions1.setPredictedTimeToBytesBackpressureMillis(2000);
+        connectionStatusPredictions1.setNextPredictedQueuedBytes(1024);
+        connectionStatusPredictions1.setNextPredictedQueuedCount(1);
+        connectionStatusPredictions1.setPredictedPercentBytes(55);
+        connectionStatusPredictions1.setPredictedPercentCount(30);
+        root1ConnectionStatus.setPredictions(connectionStatusPredictions1);
 
         ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
         root2ConnectionStatus.setId("root2");
         root2ConnectionStatus.setQueuedCount(500);
         root2ConnectionStatus.setBackPressureObjectThreshold(1000);
+        // Set backpressure predictions
+        ConnectionStatusPredictions connectionStatusPredictions2 = new 
ConnectionStatusPredictions();
+        
connectionStatusPredictions2.setPredictedTimeToBytesBackpressureMillis(2000);
+        
connectionStatusPredictions2.setPredictedTimeToBytesBackpressureMillis(2000);
+        connectionStatusPredictions2.setNextPredictedQueuedBytes(1024);
+        connectionStatusPredictions2.setNextPredictedQueuedCount(1);
+        connectionStatusPredictions2.setPredictedPercentBytes(55);
+        connectionStatusPredictions2.setPredictedPercentCount(30);
+        root2ConnectionStatus.setPredictions(connectionStatusPredictions2);
 
         Collection<ConnectionStatus> rootConnectionStatuses = new 
ArrayList<>();
         rootConnectionStatuses.add(root1ConnectionStatus);
@@ -151,6 +170,15 @@ class TestQueryNiFiReportingTask {
         ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
         nestedConnectionStatus.setId("nested");
         nestedConnectionStatus.setQueuedCount(1001);
+        // Set backpressure predictions
+        ConnectionStatusPredictions connectionStatusPredictions3 = new 
ConnectionStatusPredictions();
+        
connectionStatusPredictions3.setPredictedTimeToBytesBackpressureMillis(2000);
+        
connectionStatusPredictions3.setPredictedTimeToBytesBackpressureMillis(2000);
+        connectionStatusPredictions3.setNextPredictedQueuedBytes(1024);
+        connectionStatusPredictions3.setNextPredictedQueuedCount(1);
+        connectionStatusPredictions3.setPredictedPercentBytes(55);
+        connectionStatusPredictions3.setPredictedPercentCount(30);
+        nestedConnectionStatus.setPredictions(connectionStatusPredictions3);
         Collection<ConnectionStatus> nestedConnectionStatuses = new 
ArrayList<>();
         nestedConnectionStatuses.add(nestedConnectionStatus);
         groupStatus2.setConnectionStatus(nestedConnectionStatuses);
@@ -163,6 +191,15 @@ class TestQueryNiFiReportingTask {
         ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
         nestedConnectionStatus2.setId("nested2");
         nestedConnectionStatus2.setQueuedCount(3);
+        // Set backpressure predictions
+        ConnectionStatusPredictions connectionStatusPredictions4 = new 
ConnectionStatusPredictions();
+        
connectionStatusPredictions4.setPredictedTimeToBytesBackpressureMillis(2000);
+        
connectionStatusPredictions4.setPredictedTimeToBytesBackpressureMillis(2000);
+        connectionStatusPredictions4.setNextPredictedQueuedBytes(1024);
+        connectionStatusPredictions4.setNextPredictedQueuedCount(1);
+        connectionStatusPredictions4.setPredictedPercentBytes(55);
+        connectionStatusPredictions4.setPredictedPercentCount(30);
+        nestedConnectionStatus2.setPredictions(connectionStatusPredictions4);
         Collection<ConnectionStatus> nestedConnectionStatuses2 = new 
ArrayList<>();
         nestedConnectionStatuses2.add(nestedConnectionStatus2);
         groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
@@ -226,6 +263,20 @@ class TestQueryNiFiReportingTask {
         assertEquals(3, row.get("queuedCount"));
     }
 
+    @Test
+    void testConnectionStatusTableJoin() throws InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "SELECT id "
+                + "FROM CONNECTION_STATUS "
+                + "JOIN CONNECTION_STATUS_PREDICTIONS ON 
CONNECTION_STATUS_PREDICTIONS.connectionId = CONNECTION_STATUS.id");
+        reportingTask = initTask(properties);
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(4, rows.size());
+    }
+
     @Test
     void testBulletinIsInTimeWindow() throws InitializationException {
         String query = "select * from BULLETINS where bulletinTimestamp > 
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";

Reply via email to