tpalfy commented on a change in pull request #5738:
URL: https://github.com/apache/nifi/pull/5738#discussion_r798870414



##########
File path: 
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
##########
@@ -18,26 +18,44 @@
 
 import org.apache.nifi.components.AbstractConfigurableComponent;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.rules.Action;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 public class MockRulesEngineService extends AbstractConfigurableComponent 
implements RulesEngineService {

Review comment:
       Simplified tests deserve simplified helper classes:
   ```java
   public class MockRulesEngineService extends AbstractConfigurableComponent 
implements RulesEngineService {
       @Override
       public List<Action> fireRules(Map<String, Object> facts) {
           return Collections.singletonList(Mockito.mock(Action.class));
       }
   
       @Override
       public void initialize(ControllerServiceInitializationContext context) {
       }
   
       @Override
       public String getIdentifier() {
           return "MockRulesEngineService";
       }
   }
   
   ```

##########
File path: 
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
##########
@@ -73,4 +73,10 @@ public void 
initialize(ControllerServiceInitializationContext context) throws In
     public String getIdentifier() {
         return "MockPropertyContextActionHandler";
     }
+
+    public void reset() {

Review comment:
       The more simplified classes the better:
   ```java
   public class MockPropertyContextActionHandler extends 
AbstractConfigurableComponent implements PropertyContextActionHandler{
       private List<Map<String, Object>> rows = new ArrayList<>();
       private List<PropertyContext> propertyContexts = new ArrayList<>();
   
   
       @Override
       public void execute(PropertyContext context, Action action, Map<String, 
Object> facts) {
           propertyContexts.add(context);
           execute(action, facts);
       }
   
       @Override
       public void execute(Action action, Map<String, Object> facts) {
           rows.add(facts);
       }
   
   
       @Override
       public void initialize(ControllerServiceInitializationContext context) 
throws InitializationException {
   
       }
   
       public List<Map<String, Object>> getRows() {
           return rows;
       }
   
       public List<PropertyContext> getPropertyContexts() {
           return propertyContexts;
       }
   
       @Override
       public String getIdentifier() {
           return "MockPropertyContextActionHandler";
       }
   
       public void reset() {
           rows.clear();
           propertyContexts.clear();
       }
   }
   
   ```

##########
File path: 
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
##########
@@ -77,17 +83,37 @@ public void setup(final ConfigurationContext context) 
throws IOException {
 
     @Override
     public void onTrigger(ReportingContext context) {
+        String sql = 
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();

Review comment:
       The duplication of the sql string start time and end time evaluation 
logic could be prevented.
   I suggest a trait-like interface to achieve maximum potential like this:
   ```java
   public class MetricsEventReportingTask extends AbstractReportingTask 
implements QueryTimeAware {
   
       private List<PropertyDescriptor> properties;
       private MetricsQueryService metricsQueryService;
       private volatile RulesEngineService rulesEngineService;
       private volatile PropertyContextActionHandler actionHandler;
   
       @Override
       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
           return properties;
       }
   
       @Override
       protected void init(final ReportingInitializationContext config) {
           final List<PropertyDescriptor> properties = new ArrayList<>();
           properties.add(QueryMetricsUtil.QUERY);
           properties.add(QueryMetricsUtil.RULES_ENGINE);
           properties.add(QueryMetricsUtil.ACTION_HANDLER);
           properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION);
           properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE);
           this.properties = Collections.unmodifiableList(properties);
       }
   
       @OnScheduled
       public void setup(final ConfigurationContext context) {
           actionHandler = 
context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
           rulesEngineService = 
context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
           final Integer defaultPrecision = 
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
           final Integer defaultScale = 
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
           metricsQueryService = new MetricsSqlQueryService(getLogger(), 
defaultPrecision, defaultScale);
       }
   
       @Override
       public void onTrigger(ReportingContext context) {
           String sql = 
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
           try {
               sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, 
BULLETIN_END_TIME);
               sql = processStartAndEndTimes(context, sql, 
PROVENANCE_START_TIME, PROVENANCE_END_TIME);
   
               fireRules(context, actionHandler, rulesEngineService, sql);
           } catch (Exception e) {
               getLogger().error("Error opening loading rules: {}", new 
Object[]{e.getMessage()}, e);
           }
       }
   
       private void fireRules(ReportingContext context, 
PropertyContextActionHandler actionHandler, RulesEngineService engine, String 
query) throws Exception {
           getLogger().debug("Executing query: {}", query);
           QueryResult queryResult = metricsQueryService.query(context, query);
           ResultSetRecordSet recordSet = 
metricsQueryService.getResultSetRecordSet(queryResult);
           Record record;
           try {
               while ((record = recordSet.next()) != null) {
                   final Map<String, Object> facts = new HashMap<>();
                   for (String fieldName : record.getRawFieldNames()) {
                       facts.put(fieldName, record.getValue(fieldName));
                   }
                   List<Action> actions = engine.fireRules(facts);
                   if (actions == null || actions.isEmpty()) {
                       getLogger().debug("No actions required for provided 
facts.");
                   } else {
                       actions.forEach(action -> actionHandler.execute(context, 
action, facts));
                   }
               }
           } finally {
               metricsQueryService.closeQuietly(recordSet);
           }
       }
   }
   
   public class QueryNiFiReportingTask extends AbstractReportingTask implements 
QueryTimeAware {
   
       private List<PropertyDescriptor> properties;
   
       private volatile RecordSinkService recordSinkService;
   
       private MetricsQueryService metricsQueryService;
   
       @Override
       protected void init(final ReportingInitializationContext config) {
           final List<PropertyDescriptor> properties = new ArrayList<>();
           properties.add(QueryMetricsUtil.QUERY);
           properties.add(QueryMetricsUtil.RECORD_SINK);
           properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS);
           properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION);
           properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE);
           this.properties = Collections.unmodifiableList(properties);
       }
   
       @Override
       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
           return properties;
       }
   
       @OnScheduled
       public void setup(final ConfigurationContext context) {
           recordSinkService = 
context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
           recordSinkService.reset();
           final Integer defaultPrecision = 
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
           final Integer defaultScale = 
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
           metricsQueryService = new MetricsSqlQueryService(getLogger(), 
defaultPrecision, defaultScale);
       }
   
       @Override
       public void onTrigger(ReportingContext context) {
           final StopWatch stopWatch = new StopWatch(true);
           String sql = context.getProperty(QueryMetricsUtil.QUERY).getValue();
           try {
               sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, 
BULLETIN_END_TIME);
               sql = processStartAndEndTimes(context, sql, 
PROVENANCE_START_TIME, PROVENANCE_END_TIME);
   
               getLogger().debug("Executing query: {}", sql);
               final QueryResult queryResult = 
metricsQueryService.query(context, sql);
               final ResultSetRecordSet recordSet;
   
               try {
                   recordSet = 
metricsQueryService.getResultSetRecordSet(queryResult);
               } catch (final Exception e) {
                   getLogger().error("Error creating record set from query 
results due to {}", new Object[]{e.getMessage()}, e);
                   return;
               }
   
               try {
                   final Map<String, String> attributes = new HashMap<>();
                   final String transactionId = UUID.randomUUID().toString();
                   attributes.put("reporting.task.transaction.id", 
transactionId);
                   attributes.put("reporting.task.name", getName());
                   attributes.put("reporting.task.uuid", getIdentifier());
                   attributes.put("reporting.task.type", 
this.getClass().getSimpleName());
                   recordSinkService.sendData(recordSet, attributes, 
context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
               } catch (Exception e) {
                   getLogger().error("Error during transmission of query 
results due to {}", new Object[]{e.getMessage()}, e);
                   return;
               } finally {
                   metricsQueryService.closeQuietly(queryResult);
               }
               final long elapsedMillis = 
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
               getLogger().debug("Successfully queried and sent in {} millis", 
elapsedMillis);
           } catch (Exception e) {
               getLogger().error("Error processing the query due to {}", new 
Object[]{e.getMessage()}, e);
           }
       }
   }
   
   public interface QueryTimeAware {
       default String processStartAndEndTimes(
           ReportingContext context,
           String sql,
           TrackedQueryTime queryStartTime,
           TrackedQueryTime queryEndTime
       ) throws IOException {
           StateManager stateManager = context.getStateManager();
   
           final Map<String, String> stateMap = new 
HashMap<>(stateManager.getState(Scope.LOCAL).toMap());
   
           if (sql.contains(queryStartTime.getSqlPlaceholder()) && 
sql.contains(queryEndTime.getSqlPlaceholder())) {
               final long startTime = stateMap.get(queryStartTime.name()) == 
null ? 0 : Long.parseLong(stateMap.get(queryStartTime.name()));
               final long currentTime = getCurrentTime();
   
               sql = sql.replace(queryStartTime.getSqlPlaceholder(), 
String.valueOf(startTime));
               sql = sql.replace(queryEndTime.getSqlPlaceholder(), 
String.valueOf(currentTime));
   
               stateMap.put(queryStartTime.name(), String.valueOf(currentTime));
               stateManager.setState(stateMap, Scope.LOCAL);
           }
   
           return sql;
       }
   
       default long getCurrentTime() {
           return Instant.now().toEpochMilli();
       }
   }
   
   ```

##########
File path: 
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
##########
@@ -49,27 +61,40 @@
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class TestMetricsEventReportingTask {

Review comment:
       This test is already a bit more complicated than it needs to be, we can 
make it simpler like this:
   ```java
   class TestMetricsEventReportingTask {
   
       private static final String LOG = "LOG";
       private static final String ALERT = "ALERT";
   
       private ReportingContext context;
       private MockMetricsEventReportingTask reportingTask;
       private MockPropertyContextActionHandler actionHandler;
       private ProcessGroupStatus status;
       private MockQueryBulletinRepository mockBulletinRepository;
       private MockProvenanceRepository mockProvenanceRepository;
       private AtomicLong currentTime;
       private MockStateManager mockStateManager;
   
       @BeforeEach
       public void setup() {
           currentTime = new AtomicLong();
           status = new ProcessGroupStatus();
           actionHandler = new MockPropertyContextActionHandler();
           status.setId("1234");
           status.setFlowFilesReceived(5);
           status.setBytesReceived(10000);
           status.setFlowFilesSent(10);
           status.setBytesRead(20000L);
           status.setBytesSent(20000);
           status.setQueuedCount(100);
           status.setQueuedContentSize(1024L);
           status.setBytesWritten(80000L);
           status.setActiveThreadCount(5);
   
           // create a processor status with processing time
           ProcessorStatus procStatus = new ProcessorStatus();
           procStatus.setId("proc");
           procStatus.setProcessingNanos(123456789);
   
           Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
           processorStatuses.add(procStatus);
           status.setProcessorStatus(processorStatuses);
   
           ConnectionStatusPredictions connectionStatusPredictions = new 
ConnectionStatusPredictions();
           
connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000);
           
connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000);
           connectionStatusPredictions.setNextPredictedQueuedCount(1000000000);
           
connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L);
   
           ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
           root1ConnectionStatus.setId("root1");
           root1ConnectionStatus.setQueuedCount(1000);
           root1ConnectionStatus.setPredictions(connectionStatusPredictions);
   
           ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
           root2ConnectionStatus.setId("root2");
           root2ConnectionStatus.setQueuedCount(500);
           root2ConnectionStatus.setPredictions(connectionStatusPredictions);
   
           Collection<ConnectionStatus> rootConnectionStatuses = new 
ArrayList<>();
           rootConnectionStatuses.add(root1ConnectionStatus);
           rootConnectionStatuses.add(root2ConnectionStatus);
           status.setConnectionStatus(rootConnectionStatuses);
       }
   
       @Test
       void testConnectionStatusTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select connectionId, 
predictedQueuedCount, predictedTimeToBytesBackpressureMillis from 
CONNECTION_STATUS_PREDICTIONS");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
           List<PropertyContext> propertyContexts = 
actionHandler.getPropertyContexts();
           assertEquals(2, actionHandler.getRows().size());
           assertEquals(2, propertyContexts.size());
       }
   
       @Test
       void testUniqueBulletinQueryIsInTimeWindow() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from 
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= 
$bulletinEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
           assertEquals(1, actionHandler.getRows().size());
   
           actionHandler.reset();
           final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
 "WARN", "test bulletin 2", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
           currentTime.set(bulletin.getTimestamp().getTime());
           reportingTask.onTrigger(context);
           assertEquals(1, actionHandler.getRows().size());
       }
   
       @Test
       void testUniqueBulletinQueryIsOutOfTimeWindow() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from 
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= 
$bulletinEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
           assertEquals(1, actionHandler.getRows().size());
   
           actionHandler.reset();
           final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
 "WARN", "test bulletin 2", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
           currentTime.set(bulletin.getTimestamp().getTime() - 1);
           reportingTask.onTrigger(context);
           assertEquals(0, actionHandler.getRows().size());
       }
   
       @Test
       void testUniqueProvenanceQueryIsInTimeWindow() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select componentId from 
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
           assertEquals(1, actionHandler.getRows().size());
   
           actionHandler.reset();
   
           MockFlowFile mockFlowFile = new MockFlowFile(2L);
           ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("2")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 2")
                   .setEventTime(Instant.now().toEpochMilli())
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 2")
                   .setAlternateIdentifierUri("remote://test")
                   .build();
           mockProvenanceRepository.registerEvent(prov2);
   
           currentTime.set(prov2.getEventTime());
           reportingTask.onTrigger(context);
   
           assertEquals(1, actionHandler.getRows().size());
       }
   
       @Test
       void testUniqueProvenanceQueryIsOutOfTimeWindow() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select componentId from 
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
           assertEquals(1, actionHandler.getRows().size());
   
           actionHandler.reset();
   
           MockFlowFile mockFlowFile = new MockFlowFile(2L);
           ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("2")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 2")
                   .setEventTime(Instant.now().toEpochMilli())
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 2")
                   .setAlternateIdentifierUri("remote://test")
                   .build();
           mockProvenanceRepository.registerEvent(prov2);
   
           currentTime.set(prov2.getEventTime() - 1);
           reportingTask.onTrigger(context);
   
           assertEquals(0, actionHandler.getRows().size());
       }
   
       @Test
       void testTimeWindowFromStateMap() throws IOException, 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
                   "bulletinTimestamp > $bulletinStartTime and 
bulletinTimestamp <= $bulletinEndTime " +
                   "and timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime");
           reportingTask = initTask(properties);
   
           long testBulletinStartTime = 1609538145L;
           long testProvenanceStartTime = 1641074145L;
           final Map<String, String> stateMap = new HashMap<>();
           stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), 
String.valueOf(testBulletinStartTime));
           stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), 
String.valueOf(testProvenanceStartTime));
           mockStateManager.setState(stateMap, Scope.LOCAL);
   
           final long bulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
           final long provenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
   
           assertEquals(testBulletinStartTime, bulletinStartTime);
           assertEquals(testProvenanceStartTime, provenanceStartTime);
   
           final long currentTime = Instant.now().toEpochMilli();
           this.currentTime.set(currentTime);
   
           reportingTask.onTrigger(context);
   
           final long updatedBulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
           final long updatedProvenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
   
           assertEquals(currentTime, updatedBulletinStartTime);
           assertEquals(currentTime, updatedProvenanceStartTime);
       }
   
       private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException {
           final ComponentLog logger = Mockito.mock(ComponentLog.class);
           reportingTask = new MockMetricsEventReportingTask();
           final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
           
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
           Mockito.when(initContext.getLogger()).thenReturn(logger);
           reportingTask.initialize(initContext);
           Map<PropertyDescriptor, String> properties = new HashMap<>();
   
           for (final PropertyDescriptor descriptor : 
reportingTask.getSupportedPropertyDescriptors()) {
               properties.put(descriptor, descriptor.getDefaultValue());
           }
           properties.putAll(customProperties);
   
           context = Mockito.mock(ReportingContext.class);
           Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
           mockStateManager = new MockStateManager(reportingTask);
           Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
   
           Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
               final PropertyDescriptor descriptor = invocation.getArgument(0, 
PropertyDescriptor.class);
               return new MockPropertyValue(properties.get(descriptor));
           }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
   
           final EventAccess eventAccess = Mockito.mock(EventAccess.class);
           Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
           Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
   
           final PropertyValue pValue = 
Mockito.mock(StandardPropertyValue.class);
           actionHandler = new MockPropertyContextActionHandler();
           
Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
   
           final PropertyValue resValue = 
Mockito.mock(StandardPropertyValue.class);
           MockRulesEngineService rulesEngineService = new 
MockRulesEngineService();
           
Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
   
           ConfigurationContext configContext = 
Mockito.mock(ConfigurationContext.class);
           
Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue);
           
Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue);
           
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new
 MockPropertyValue("10"));
           
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
 MockPropertyValue("0"));
           reportingTask.setup(configContext);
   
           setupMockProvenanceRepository(eventAccess);
           setupMockBulletinRepository();
   
           return reportingTask;
       }
   
       private final class MockMetricsEventReportingTask extends 
MetricsEventReportingTask {
           @Override
           public long getCurrentTime() {
               return currentTime.get();
           }
       }
   
       private void setupMockBulletinRepository() {
           mockBulletinRepository = new MockQueryBulletinRepository();
           
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
 "WARN", "test bulletin 1", "testFlowFileUuid"));
   
           
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
       }
   
       private void setupMockProvenanceRepository(final EventAccess 
eventAccess) {
   
           mockProvenanceRepository = new MockProvenanceRepository();
           long currentTimeMillis = System.currentTimeMillis();
           Map<String, String> previousAttributes = new HashMap<>();
           previousAttributes.put("mime.type", "application/json");
           previousAttributes.put("test.value", "A");
           Map<String, String> updatedAttributes = new 
HashMap<>(previousAttributes);
           updatedAttributes.put("test.value", "B");
   
           // Generate provenance events and put them in a repository
           Processor processor = mock(Processor.class);
           SharedSessionState sharedState = new SharedSessionState(processor, 
new AtomicLong(0));
           MockProcessSession processSession = new 
MockProcessSession(sharedState, processor);
           MockFlowFile mockFlowFile = processSession.createFlowFile("Test 
content".getBytes());
   
           ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("1")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .setEventTime(currentTimeMillis)
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
                   .setAlternateIdentifierUri("remote://test")
                   .setAttributes(previousAttributes, updatedAttributes)
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1);
   
           
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
       }
   
       private static class MockQueryBulletinRepository extends 
MockBulletinRepository {
           Map<String, List<Bulletin>> bulletins = new HashMap<>();
   
           @Override
           public void addBulletin(Bulletin bulletin) {
               bulletins.computeIfAbsent(bulletin.getCategory(), key -> new 
ArrayList<>())
                   .add(bulletin);
           }
   
           @Override
           public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
               return new ArrayList<>(
                   
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
                       .orElse(Collections.emptyList())
               );
           }
   
           @Override
           public List<Bulletin> findBulletinsForController() {
               return Optional.ofNullable(bulletins.get("controller"))
                   .orElse(Collections.emptyList());
           }
       }
   }
   
   
   ```

##########
File path: 
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
##########
@@ -67,21 +72,26 @@
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class TestQueryNiFiReportingTask {
+class TestQueryNiFiReportingTask {

Review comment:
       This test class can also be significantly simplified:
   ```suggestion
   class TestQueryNiFiReportingTask {
   
       private ReportingContext context;
       private MockQueryNiFiReportingTask reportingTask;
       private MockRecordSinkService mockRecordSinkService;
       private ProcessGroupStatus status;
       private BulletinRepository mockBulletinRepository;
       private MockProvenanceRepository mockProvenanceRepository;
       private AtomicLong currentTime;
       private MockStateManager mockStateManager;
   
       @BeforeEach
       public void setup() {
           currentTime = new AtomicLong();
           status = new ProcessGroupStatus();
           status.setId("1234");
           status.setFlowFilesReceived(5);
           status.setBytesReceived(10000);
           status.setFlowFilesSent(10);
           status.setBytesRead(20000L);
           status.setBytesSent(20000);
           status.setQueuedCount(100);
           status.setQueuedContentSize(1024L);
           status.setBytesWritten(80000L);
           status.setActiveThreadCount(5);
   
           // create a processor status with processing time
           ProcessorStatus procStatus = new ProcessorStatus();
           procStatus.setId("proc");
           procStatus.setProcessingNanos(123456789);
   
           Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
           processorStatuses.add(procStatus);
           status.setProcessorStatus(processorStatuses);
   
           ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
           root1ConnectionStatus.setId("root1");
           root1ConnectionStatus.setQueuedCount(1000);
           root1ConnectionStatus.setBackPressureObjectThreshold(1000);
   
           ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
           root2ConnectionStatus.setId("root2");
           root2ConnectionStatus.setQueuedCount(500);
           root2ConnectionStatus.setBackPressureObjectThreshold(1000);
   
           Collection<ConnectionStatus> rootConnectionStatuses = new 
ArrayList<>();
           rootConnectionStatuses.add(root1ConnectionStatus);
           rootConnectionStatuses.add(root2ConnectionStatus);
           status.setConnectionStatus(rootConnectionStatuses);
   
           // create a group status with processing time
           ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
           groupStatus1.setProcessorStatus(processorStatuses);
           groupStatus1.setBytesRead(1234L);
   
           // Create a nested group status with a connection
           ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
           groupStatus2.setProcessorStatus(processorStatuses);
           groupStatus2.setBytesRead(12345L);
           ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
           nestedConnectionStatus.setId("nested");
           nestedConnectionStatus.setQueuedCount(1001);
           Collection<ConnectionStatus> nestedConnectionStatuses = new 
ArrayList<>();
           nestedConnectionStatuses.add(nestedConnectionStatus);
           groupStatus2.setConnectionStatus(nestedConnectionStatuses);
           Collection<ProcessGroupStatus> nestedGroupStatuses = new 
ArrayList<>();
           nestedGroupStatuses.add(groupStatus2);
           groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
   
           ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
           groupStatus3.setBytesRead(1L);
           ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
           nestedConnectionStatus2.setId("nested2");
           nestedConnectionStatus2.setQueuedCount(3);
           Collection<ConnectionStatus> nestedConnectionStatuses2 = new 
ArrayList<>();
           nestedConnectionStatuses2.add(nestedConnectionStatus2);
           groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
           Collection<ProcessGroupStatus> nestedGroupStatuses2 = new 
ArrayList<>();
           nestedGroupStatuses2.add(groupStatus3);
   
           Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
           groupStatuses.add(groupStatus1);
           groupStatuses.add(groupStatus3);
           status.setProcessGroupStatus(groupStatuses);
   
       }
   
       @Test
       void testConnectionStatusTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select 
id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by 
queuedCount desc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(4, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(3, row.size()); // Only projected 2 columns
           Object id = row.get("id");
   
           assertTrue(id instanceof String);
           assertEquals("nested", id);
           assertEquals(1001, row.get("queuedCount"));
           // Validate the second row
           row = rows.get(1);
           id = row.get("id");
           assertEquals("root1", id);
           assertEquals(1000, row.get("queuedCount"));
           assertEquals(true, row.get("isBackPressureEnabled"));
           // Validate the third row
           row = rows.get(2);
           id = row.get("id");
           assertEquals("root2", id);
           assertEquals(500, row.get("queuedCount"));
           assertEquals(false, row.get("isBackPressureEnabled"));
           // Validate the fourth row
           row = rows.get(3);
           id = row.get("id");
           assertEquals("nested2", id);
           assertEquals(3, row.get("queuedCount"));
       }
   
       @Test
       void testBulletinIsInTimeWindow() throws InitializationException {
           String query = "select * from BULLETINS where bulletinTimestamp > 
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
   
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, query);
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(3, rows.size());
   
           final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), 
"ERROR", "test bulletin 3", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
           currentTime.set(bulletin.getTimestamp().getTime());
   
           reportingTask.onTrigger(context);
   
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(1, sameRows.size());
       }
   
       @Test
       void testBulletinIsOutOfTimeWindow() throws InitializationException {
           String query = "select * from BULLETINS where bulletinTimestamp > 
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
   
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, query);
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(3, rows.size());
   
           final Bulletin bulletin = BulletinFactory.createBulletin("input 
port", "ERROR", "test bulletin 3", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
           currentTime.set(bulletin.getTimestamp().getTime() - 1);
   
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(0, sameRows.size());
       }
   
       @Test
       void testProvenanceEventIsInTimeWindow() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE 
where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1001, rows.size());
   
           MockFlowFile mockFlowFile = new MockFlowFile(1002L);
           ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .setEventTime(Instant.now().toEpochMilli())
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
                   .setAlternateIdentifierUri("remote://test")
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1002);
   
           currentTime.set(prov1002.getEventTime());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(1, sameRows.size());
       }
   
       @Test
       void testProvenanceEventIsOutOfTimeWindow() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE 
where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1001, rows.size());
   
           MockFlowFile mockFlowFile = new MockFlowFile(1002L);
           ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .setEventTime(Instant.now().toEpochMilli())
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
                   .setAlternateIdentifierUri("remote://test")
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1002);
   
           currentTime.set(prov1002.getEventTime() - 1);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(0, sameRows.size());
       }
   
       @Test
       void testUniqueProvenanceAndBulletinQuery() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
               "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= 
$bulletinEndTime " +
               "and timestampMillis > $provenanceStartTime and timestampMillis 
<= $provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(3003, rows.size());
   
           final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), 
"ERROR", "test bulletin 3", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
   
           MockFlowFile mockFlowFile = new MockFlowFile(1002L);
           ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1002);
   
           currentTime.set(bulletin.getTimestamp().getTime());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(1, sameRows.size());
       }
   
       @Test
       void testTimeWindowFromStateMap() throws IOException, 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
                   "bulletinTimestamp > $bulletinStartTime and 
bulletinTimestamp <= $bulletinEndTime " +
                   "and timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime");
           reportingTask = initTask(properties);
   
           long testBulletinStartTime = 1609538145L;
           long testProvenanceStartTime = 1641074145L;
           final Map<String, String> stateMap = new HashMap<>();
           stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), 
String.valueOf(testBulletinStartTime));
           stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), 
String.valueOf(testProvenanceStartTime));
           mockStateManager.setState(stateMap, Scope.LOCAL);
   
           final long bulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
           final long provenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
   
           assertEquals(testBulletinStartTime, bulletinStartTime);
           assertEquals(testProvenanceStartTime, provenanceStartTime);
   
           final long currentTime = Instant.now().toEpochMilli();
           this.currentTime.set(currentTime);
   
           reportingTask.onTrigger(context);
   
           final long updatedBulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
           final long updatedProvenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
   
           assertEquals(currentTime, updatedBulletinStartTime);
           assertEquals(currentTime, updatedProvenanceStartTime);
       }
       //--NEW END
   
       @Test
       void testJvmMetricsTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select "
                   + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
                   MetricNames.JVM_THREAD_COUNT,
                   MetricNames.JVM_THREAD_STATES_BLOCKED,
                   MetricNames.JVM_THREAD_STATES_RUNNABLE,
                   MetricNames.JVM_THREAD_STATES_TERMINATED,
                   MetricNames.JVM_THREAD_STATES_TIMED_WAITING,
                   MetricNames.JVM_UPTIME,
                   MetricNames.JVM_HEAP_USED,
                   MetricNames.JVM_HEAP_USAGE,
                   MetricNames.JVM_NON_HEAP_USAGE,
                   MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) -> 
s.replace(".", "_")).collect(Collectors.joining(","))
                   + " from JVM_METRICS");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1, rows.size());
           Map<String, Object> row = rows.get(0);
           assertEquals(11, row.size());
   
           assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".", 
"_")) instanceof Integer);
           assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".", "_")) 
instanceof Double);
       }
   
       @Test
       void testProcessGroupStatusTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from 
PROCESS_GROUP_STATUS order by bytesRead asc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(4, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(20, row.size());
           assertEquals(1L, row.get("bytesRead"));
           // Validate the second row
           row = rows.get(1);
           assertEquals(1234L, row.get("bytesRead"));
           // Validate the third row
           row = rows.get(2);
           assertEquals(12345L, row.get("bytesRead"));
           // Validate the fourth row
           row = rows.get(3);
           assertEquals(20000L, row.get("bytesRead"));
       }
   
       @Test
       void testNoResults() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from 
CONNECTION_STATUS where queuedCount > 2000");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(0, rows.size());
       }
   
       @Test
       void testProvenanceTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE 
order by eventId asc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1001, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(24, row.size());
           // Verify the first row contents
           assertEquals(0L, row.get("eventId"));
           assertEquals("CREATE", row.get("eventType"));
           assertEquals(12L, row.get("entitySize"));
   
           assertNull(row.get("contentPath"));
           assertNull(row.get("previousContentPath"));
   
           Object o = row.get("previousAttributes");
           assertTrue(o instanceof Map);
           Map<String, String> previousAttributes = (Map<String, String>) o;
           assertEquals("A", previousAttributes.get("test.value"));
           o = row.get("updatedAttributes");
           assertTrue(o instanceof Map);
           Map<String, String> updatedAttributes = (Map<String, String>) o;
           assertEquals("B", updatedAttributes.get("test.value"));
   
           // Verify some fields in the second row
           row = rows.get(1);
           assertEquals(24, row.size());
           // Verify the second row contents
           assertEquals(1L, row.get("eventId"));
           assertEquals("DROP", row.get("eventType"));
   
           // Verify some fields in the last row
           row = rows.get(1000);
           assertEquals(24, row.size());
           // Verify the last row contents
           assertEquals(1000L, row.get("eventId"));
           assertEquals("DROP", row.get("eventType"));
       }
   
       @Test
       void testBulletinTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS 
order by bulletinTimestamp asc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           final List<Map<String, Object>> rows = 
mockRecordSinkService.getRows();
           final String flowFileUuid = "testFlowFileUuid";
           assertEquals(3, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(14, row.size());
           assertNotNull(row.get("bulletinId"));
           assertEquals("controller", row.get("bulletinCategory"));
           assertEquals("WARN", row.get("bulletinLevel"));
           assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
   
           // Validate the second row
           row = rows.get(1);
           assertEquals("processor", row.get("bulletinCategory"));
           assertEquals("INFO", row.get("bulletinLevel"));
   
           // Validate the third row
           row = rows.get(2);
           assertEquals("controller_service", row.get("bulletinCategory"));
           assertEquals("ERROR", row.get("bulletinLevel"));
           assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
       }
   
       private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException {
   
           final ComponentLog logger = mock(ComponentLog.class);
           reportingTask = new MockQueryNiFiReportingTask();
           final ReportingInitializationContext initContext = 
mock(ReportingInitializationContext.class);
           
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
           Mockito.when(initContext.getLogger()).thenReturn(logger);
           reportingTask.initialize(initContext);
   
           Map<PropertyDescriptor, String> properties = new HashMap<>();
           for (final PropertyDescriptor descriptor : 
reportingTask.getSupportedPropertyDescriptors()) {
               properties.put(descriptor, descriptor.getDefaultValue());
           }
           properties.putAll(customProperties);
   
           context = mock(ReportingContext.class);
   
           mockStateManager = new MockStateManager(reportingTask);
   
           Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
           Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
               final PropertyDescriptor descriptor = invocation.getArgument(0, 
PropertyDescriptor.class);
               return new MockPropertyValue(properties.get(descriptor));
           }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
   
           final EventAccess eventAccess = mock(EventAccess.class);
           Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
           Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
   
           final PropertyValue pValue = mock(StandardPropertyValue.class);
           mockRecordSinkService = new MockRecordSinkService();
           
Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
           
Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
   
           ConfigurationContext configContext = 
mock(ConfigurationContext.class);
           
Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
   
           
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new
 MockPropertyValue("10"));
           
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
 MockPropertyValue("0"));
           reportingTask.setup(configContext);
   
           mockProvenanceRepository = new MockProvenanceRepository();
           long currentTimeMillis = System.currentTimeMillis();
           Map<String, String> previousAttributes = new HashMap<>();
           previousAttributes.put("mime.type", "application/json");
           previousAttributes.put("test.value", "A");
           Map<String, String> updatedAttributes = new 
HashMap<>(previousAttributes);
           updatedAttributes.put("test.value", "B");
   
           // Generate provenance events and put them in a repository
           Processor processor = mock(Processor.class);
           SharedSessionState sharedState = new SharedSessionState(processor, 
new AtomicLong(0));
           MockProcessSession processSession = new 
MockProcessSession(sharedState, processor);
           MockFlowFile mockFlowFile = processSession.createFlowFile("Test 
content".getBytes());
   
           ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .setEventTime(currentTimeMillis)
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
                   .setAlternateIdentifierUri("remote://test")
                   .setAttributes(previousAttributes, updatedAttributes)
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1);
   
           for (int i = 1; i < 1001; i++) {
               String indexString = Integer.toString(i);
               mockFlowFile = processSession.createFlowFile(("Test content " + 
indexString).getBytes());
               ProvenanceEventRecord prov = 
mockProvenanceRepository.eventBuilder()
                       .fromFlowFile(mockFlowFile)
                       .setEventType(ProvenanceEventType.DROP)
                       .setComponentId(indexString)
                       .setComponentType("Processor")
                       .setFlowFileUUID("I am FlowFile " + indexString)
                       .setEventTime(currentTimeMillis - i)
                       .build();
               mockProvenanceRepository.registerEvent(prov);
           }
   
           
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
   
           mockBulletinRepository = new MockQueryBulletinRepository();
           
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", 
"WARN", "test bulletin 2", "testFlowFileUuid"));
           
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
 "INFO", "test bulletin 1", "testFlowFileUuid"));
           
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
 "ERROR", "test bulletin 2", "testFlowFileUuid"));
   
           
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
   
           return reportingTask;
       }
   
       private final class MockQueryNiFiReportingTask extends 
QueryNiFiReportingTask {
           @Override
           public long getCurrentTime() {
               return currentTime.get();
           }
       }
   
       private static class MockQueryBulletinRepository extends 
MockBulletinRepository {
           Map<String, List<Bulletin>> bulletins = new HashMap<>();
   
           @Override
           public void addBulletin(Bulletin bulletin) {
               bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new 
ArrayList<>())
                   .add(bulletin);
           }
   
           @Override
           public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
               return new ArrayList<>(
                   
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
                       .orElse(Collections.emptyList()));
           }
   
           @Override
           public List<Bulletin> findBulletinsForController() {
               return Optional.ofNullable(bulletins.get("controller"))
                   .orElse(Collections.emptyList());
           }
       }
   }
   ```

##########
File path: 
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
##########
@@ -67,21 +72,26 @@
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class TestQueryNiFiReportingTask {
+class TestQueryNiFiReportingTask {

Review comment:
       This test class can also be significantly simplified:
   ```java
   class TestQueryNiFiReportingTask {
   
       private ReportingContext context;
       private MockQueryNiFiReportingTask reportingTask;
       private MockRecordSinkService mockRecordSinkService;
       private ProcessGroupStatus status;
       private BulletinRepository mockBulletinRepository;
       private MockProvenanceRepository mockProvenanceRepository;
       private AtomicLong currentTime;
       private MockStateManager mockStateManager;
   
       @BeforeEach
       public void setup() {
           currentTime = new AtomicLong();
           status = new ProcessGroupStatus();
           status.setId("1234");
           status.setFlowFilesReceived(5);
           status.setBytesReceived(10000);
           status.setFlowFilesSent(10);
           status.setBytesRead(20000L);
           status.setBytesSent(20000);
           status.setQueuedCount(100);
           status.setQueuedContentSize(1024L);
           status.setBytesWritten(80000L);
           status.setActiveThreadCount(5);
   
           // create a processor status with processing time
           ProcessorStatus procStatus = new ProcessorStatus();
           procStatus.setId("proc");
           procStatus.setProcessingNanos(123456789);
   
           Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
           processorStatuses.add(procStatus);
           status.setProcessorStatus(processorStatuses);
   
           ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
           root1ConnectionStatus.setId("root1");
           root1ConnectionStatus.setQueuedCount(1000);
           root1ConnectionStatus.setBackPressureObjectThreshold(1000);
   
           ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
           root2ConnectionStatus.setId("root2");
           root2ConnectionStatus.setQueuedCount(500);
           root2ConnectionStatus.setBackPressureObjectThreshold(1000);
   
           Collection<ConnectionStatus> rootConnectionStatuses = new 
ArrayList<>();
           rootConnectionStatuses.add(root1ConnectionStatus);
           rootConnectionStatuses.add(root2ConnectionStatus);
           status.setConnectionStatus(rootConnectionStatuses);
   
           // create a group status with processing time
           ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
           groupStatus1.setProcessorStatus(processorStatuses);
           groupStatus1.setBytesRead(1234L);
   
           // Create a nested group status with a connection
           ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
           groupStatus2.setProcessorStatus(processorStatuses);
           groupStatus2.setBytesRead(12345L);
           ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
           nestedConnectionStatus.setId("nested");
           nestedConnectionStatus.setQueuedCount(1001);
           Collection<ConnectionStatus> nestedConnectionStatuses = new 
ArrayList<>();
           nestedConnectionStatuses.add(nestedConnectionStatus);
           groupStatus2.setConnectionStatus(nestedConnectionStatuses);
           Collection<ProcessGroupStatus> nestedGroupStatuses = new 
ArrayList<>();
           nestedGroupStatuses.add(groupStatus2);
           groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
   
           ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
           groupStatus3.setBytesRead(1L);
           ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
           nestedConnectionStatus2.setId("nested2");
           nestedConnectionStatus2.setQueuedCount(3);
           Collection<ConnectionStatus> nestedConnectionStatuses2 = new 
ArrayList<>();
           nestedConnectionStatuses2.add(nestedConnectionStatus2);
           groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
           Collection<ProcessGroupStatus> nestedGroupStatuses2 = new 
ArrayList<>();
           nestedGroupStatuses2.add(groupStatus3);
   
           Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
           groupStatuses.add(groupStatus1);
           groupStatuses.add(groupStatus3);
           status.setProcessGroupStatus(groupStatuses);
   
       }
   
       @Test
       void testConnectionStatusTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select 
id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by 
queuedCount desc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(4, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(3, row.size()); // Only projected 2 columns
           Object id = row.get("id");
   
           assertTrue(id instanceof String);
           assertEquals("nested", id);
           assertEquals(1001, row.get("queuedCount"));
           // Validate the second row
           row = rows.get(1);
           id = row.get("id");
           assertEquals("root1", id);
           assertEquals(1000, row.get("queuedCount"));
           assertEquals(true, row.get("isBackPressureEnabled"));
           // Validate the third row
           row = rows.get(2);
           id = row.get("id");
           assertEquals("root2", id);
           assertEquals(500, row.get("queuedCount"));
           assertEquals(false, row.get("isBackPressureEnabled"));
           // Validate the fourth row
           row = rows.get(3);
           id = row.get("id");
           assertEquals("nested2", id);
           assertEquals(3, row.get("queuedCount"));
       }
   
       @Test
       void testBulletinIsInTimeWindow() throws InitializationException {
           String query = "select * from BULLETINS where bulletinTimestamp > 
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
   
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, query);
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(3, rows.size());
   
           final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), 
"ERROR", "test bulletin 3", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
           currentTime.set(bulletin.getTimestamp().getTime());
   
           reportingTask.onTrigger(context);
   
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(1, sameRows.size());
       }
   
       @Test
       void testBulletinIsOutOfTimeWindow() throws InitializationException {
           String query = "select * from BULLETINS where bulletinTimestamp > 
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
   
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, query);
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(3, rows.size());
   
           final Bulletin bulletin = BulletinFactory.createBulletin("input 
port", "ERROR", "test bulletin 3", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
           currentTime.set(bulletin.getTimestamp().getTime() - 1);
   
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(0, sameRows.size());
       }
   
       @Test
       void testProvenanceEventIsInTimeWindow() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE 
where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1001, rows.size());
   
           MockFlowFile mockFlowFile = new MockFlowFile(1002L);
           ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .setEventTime(Instant.now().toEpochMilli())
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
                   .setAlternateIdentifierUri("remote://test")
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1002);
   
           currentTime.set(prov1002.getEventTime());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(1, sameRows.size());
       }
   
       @Test
       void testProvenanceEventIsOutOfTimeWindow() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE 
where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1001, rows.size());
   
           MockFlowFile mockFlowFile = new MockFlowFile(1002L);
           ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .setEventTime(Instant.now().toEpochMilli())
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
                   .setAlternateIdentifierUri("remote://test")
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1002);
   
           currentTime.set(prov1002.getEventTime() - 1);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(0, sameRows.size());
       }
   
       @Test
       void testUniqueProvenanceAndBulletinQuery() throws 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
               "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= 
$bulletinEndTime " +
               "and timestampMillis > $provenanceStartTime and timestampMillis 
<= $provenanceEndTime");
           reportingTask = initTask(properties);
           currentTime.set(Instant.now().toEpochMilli());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(3003, rows.size());
   
           final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), 
"ERROR", "test bulletin 3", "testFlowFileUuid");
           mockBulletinRepository.addBulletin(bulletin);
   
           MockFlowFile mockFlowFile = new MockFlowFile(1002L);
           ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1002);
   
           currentTime.set(bulletin.getTimestamp().getTime());
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
           assertEquals(1, sameRows.size());
       }
   
       @Test
       void testTimeWindowFromStateMap() throws IOException, 
InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
                   "bulletinTimestamp > $bulletinStartTime and 
bulletinTimestamp <= $bulletinEndTime " +
                   "and timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime");
           reportingTask = initTask(properties);
   
           long testBulletinStartTime = 1609538145L;
           long testProvenanceStartTime = 1641074145L;
           final Map<String, String> stateMap = new HashMap<>();
           stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), 
String.valueOf(testBulletinStartTime));
           stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), 
String.valueOf(testProvenanceStartTime));
           mockStateManager.setState(stateMap, Scope.LOCAL);
   
           final long bulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
           final long provenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
   
           assertEquals(testBulletinStartTime, bulletinStartTime);
           assertEquals(testProvenanceStartTime, provenanceStartTime);
   
           final long currentTime = Instant.now().toEpochMilli();
           this.currentTime.set(currentTime);
   
           reportingTask.onTrigger(context);
   
           final long updatedBulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
           final long updatedProvenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
   
           assertEquals(currentTime, updatedBulletinStartTime);
           assertEquals(currentTime, updatedProvenanceStartTime);
       }
       //--NEW END
   
       @Test
       void testJvmMetricsTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select "
                   + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
                   MetricNames.JVM_THREAD_COUNT,
                   MetricNames.JVM_THREAD_STATES_BLOCKED,
                   MetricNames.JVM_THREAD_STATES_RUNNABLE,
                   MetricNames.JVM_THREAD_STATES_TERMINATED,
                   MetricNames.JVM_THREAD_STATES_TIMED_WAITING,
                   MetricNames.JVM_UPTIME,
                   MetricNames.JVM_HEAP_USED,
                   MetricNames.JVM_HEAP_USAGE,
                   MetricNames.JVM_NON_HEAP_USAGE,
                   MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) -> 
s.replace(".", "_")).collect(Collectors.joining(","))
                   + " from JVM_METRICS");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1, rows.size());
           Map<String, Object> row = rows.get(0);
           assertEquals(11, row.size());
   
           assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".", 
"_")) instanceof Integer);
           assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".", "_")) 
instanceof Double);
       }
   
       @Test
       void testProcessGroupStatusTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from 
PROCESS_GROUP_STATUS order by bytesRead asc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(4, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(20, row.size());
           assertEquals(1L, row.get("bytesRead"));
           // Validate the second row
           row = rows.get(1);
           assertEquals(1234L, row.get("bytesRead"));
           // Validate the third row
           row = rows.get(2);
           assertEquals(12345L, row.get("bytesRead"));
           // Validate the fourth row
           row = rows.get(3);
           assertEquals(20000L, row.get("bytesRead"));
       }
   
       @Test
       void testNoResults() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from 
CONNECTION_STATUS where queuedCount > 2000");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(0, rows.size());
       }
   
       @Test
       void testProvenanceTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE 
order by eventId asc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           List<Map<String, Object>> rows = mockRecordSinkService.getRows();
           assertEquals(1001, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(24, row.size());
           // Verify the first row contents
           assertEquals(0L, row.get("eventId"));
           assertEquals("CREATE", row.get("eventType"));
           assertEquals(12L, row.get("entitySize"));
   
           assertNull(row.get("contentPath"));
           assertNull(row.get("previousContentPath"));
   
           Object o = row.get("previousAttributes");
           assertTrue(o instanceof Map);
           Map<String, String> previousAttributes = (Map<String, String>) o;
           assertEquals("A", previousAttributes.get("test.value"));
           o = row.get("updatedAttributes");
           assertTrue(o instanceof Map);
           Map<String, String> updatedAttributes = (Map<String, String>) o;
           assertEquals("B", updatedAttributes.get("test.value"));
   
           // Verify some fields in the second row
           row = rows.get(1);
           assertEquals(24, row.size());
           // Verify the second row contents
           assertEquals(1L, row.get("eventId"));
           assertEquals("DROP", row.get("eventType"));
   
           // Verify some fields in the last row
           row = rows.get(1000);
           assertEquals(24, row.size());
           // Verify the last row contents
           assertEquals(1000L, row.get("eventId"));
           assertEquals("DROP", row.get("eventType"));
       }
   
       @Test
       void testBulletinTable() throws InitializationException {
           final Map<PropertyDescriptor, String> properties = new HashMap<>();
           properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
           properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS 
order by bulletinTimestamp asc");
           reportingTask = initTask(properties);
           reportingTask.onTrigger(context);
   
           final List<Map<String, Object>> rows = 
mockRecordSinkService.getRows();
           final String flowFileUuid = "testFlowFileUuid";
           assertEquals(3, rows.size());
           // Validate the first row
           Map<String, Object> row = rows.get(0);
           assertEquals(14, row.size());
           assertNotNull(row.get("bulletinId"));
           assertEquals("controller", row.get("bulletinCategory"));
           assertEquals("WARN", row.get("bulletinLevel"));
           assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
   
           // Validate the second row
           row = rows.get(1);
           assertEquals("processor", row.get("bulletinCategory"));
           assertEquals("INFO", row.get("bulletinLevel"));
   
           // Validate the third row
           row = rows.get(2);
           assertEquals("controller_service", row.get("bulletinCategory"));
           assertEquals("ERROR", row.get("bulletinLevel"));
           assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
       }
   
       private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException {
   
           final ComponentLog logger = mock(ComponentLog.class);
           reportingTask = new MockQueryNiFiReportingTask();
           final ReportingInitializationContext initContext = 
mock(ReportingInitializationContext.class);
           
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
           Mockito.when(initContext.getLogger()).thenReturn(logger);
           reportingTask.initialize(initContext);
   
           Map<PropertyDescriptor, String> properties = new HashMap<>();
           for (final PropertyDescriptor descriptor : 
reportingTask.getSupportedPropertyDescriptors()) {
               properties.put(descriptor, descriptor.getDefaultValue());
           }
           properties.putAll(customProperties);
   
           context = mock(ReportingContext.class);
   
           mockStateManager = new MockStateManager(reportingTask);
   
           Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
           Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
               final PropertyDescriptor descriptor = invocation.getArgument(0, 
PropertyDescriptor.class);
               return new MockPropertyValue(properties.get(descriptor));
           }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
   
           final EventAccess eventAccess = mock(EventAccess.class);
           Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
           Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
   
           final PropertyValue pValue = mock(StandardPropertyValue.class);
           mockRecordSinkService = new MockRecordSinkService();
           
Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
           
Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
   
           ConfigurationContext configContext = 
mock(ConfigurationContext.class);
           
Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
   
           
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new
 MockPropertyValue("10"));
           
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
 MockPropertyValue("0"));
           reportingTask.setup(configContext);
   
           mockProvenanceRepository = new MockProvenanceRepository();
           long currentTimeMillis = System.currentTimeMillis();
           Map<String, String> previousAttributes = new HashMap<>();
           previousAttributes.put("mime.type", "application/json");
           previousAttributes.put("test.value", "A");
           Map<String, String> updatedAttributes = new 
HashMap<>(previousAttributes);
           updatedAttributes.put("test.value", "B");
   
           // Generate provenance events and put them in a repository
           Processor processor = mock(Processor.class);
           SharedSessionState sharedState = new SharedSessionState(processor, 
new AtomicLong(0));
           MockProcessSession processSession = new 
MockProcessSession(sharedState, processor);
           MockFlowFile mockFlowFile = processSession.createFlowFile("Test 
content".getBytes());
   
           ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
                   .setEventType(ProvenanceEventType.CREATE)
                   .fromFlowFile(mockFlowFile)
                   .setComponentId("12345")
                   .setComponentType("ReportingTask")
                   .setFlowFileUUID("I am FlowFile 1")
                   .setEventTime(currentTimeMillis)
                   .setEventDuration(100)
                   .setTransitUri("test://")
                   .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
                   .setAlternateIdentifierUri("remote://test")
                   .setAttributes(previousAttributes, updatedAttributes)
                   .build();
   
           mockProvenanceRepository.registerEvent(prov1);
   
           for (int i = 1; i < 1001; i++) {
               String indexString = Integer.toString(i);
               mockFlowFile = processSession.createFlowFile(("Test content " + 
indexString).getBytes());
               ProvenanceEventRecord prov = 
mockProvenanceRepository.eventBuilder()
                       .fromFlowFile(mockFlowFile)
                       .setEventType(ProvenanceEventType.DROP)
                       .setComponentId(indexString)
                       .setComponentType("Processor")
                       .setFlowFileUUID("I am FlowFile " + indexString)
                       .setEventTime(currentTimeMillis - i)
                       .build();
               mockProvenanceRepository.registerEvent(prov);
           }
   
           
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
   
           mockBulletinRepository = new MockQueryBulletinRepository();
           
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", 
"WARN", "test bulletin 2", "testFlowFileUuid"));
           
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
 "INFO", "test bulletin 1", "testFlowFileUuid"));
           
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
 "ERROR", "test bulletin 2", "testFlowFileUuid"));
   
           
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
   
           return reportingTask;
       }
   
       private final class MockQueryNiFiReportingTask extends 
QueryNiFiReportingTask {
           @Override
           public long getCurrentTime() {
               return currentTime.get();
           }
       }
   
       private static class MockQueryBulletinRepository extends 
MockBulletinRepository {
           Map<String, List<Bulletin>> bulletins = new HashMap<>();
   
           @Override
           public void addBulletin(Bulletin bulletin) {
               bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new 
ArrayList<>())
                   .add(bulletin);
           }
   
           @Override
           public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
               return new ArrayList<>(
                   
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
                       .orElse(Collections.emptyList()));
           }
   
           @Override
           public List<Bulletin> findBulletinsForController() {
               return Optional.ofNullable(bulletins.get("controller"))
                   .orElse(Collections.emptyList());
           }
       }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to