tpalfy commented on a change in pull request #5738: URL: https://github.com/apache/nifi/pull/5738#discussion_r798870414
########## File path: nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java ########## @@ -18,26 +18,44 @@ import org.apache.nifi.components.AbstractConfigurableComponent; import org.apache.nifi.controller.ControllerServiceInitializationContext; -import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.rules.Action; +import java.util.Collections; import java.util.List; import java.util.Map; public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService { Review comment: Simplified tests deserve simplified helper classes: ```java public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService { @Override public List<Action> fireRules(Map<String, Object> facts) { return Collections.singletonList(Mockito.mock(Action.class)); } @Override public void initialize(ControllerServiceInitializationContext context) { } @Override public String getIdentifier() { return "MockRulesEngineService"; } } ``` ########## File path: nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java ########## @@ -73,4 +73,10 @@ public void initialize(ControllerServiceInitializationContext context) throws In public String getIdentifier() { return "MockPropertyContextActionHandler"; } + + public void reset() { Review comment: The more simplified classes the better: ```java public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler{ private List<Map<String, Object>> rows = new ArrayList<>(); private List<PropertyContext> propertyContexts = new ArrayList<>(); @Override public void execute(PropertyContext context, Action action, Map<String, Object> facts) { propertyContexts.add(context); execute(action, facts); } @Override public void execute(Action action, Map<String, Object> facts) { rows.add(facts); } @Override public void initialize(ControllerServiceInitializationContext context) throws InitializationException { } public List<Map<String, Object>> getRows() { return rows; } public List<PropertyContext> getPropertyContexts() { return propertyContexts; } @Override public String getIdentifier() { return "MockPropertyContextActionHandler"; } public void reset() { rows.clear(); propertyContexts.clear(); } } ``` ########## File path: nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java ########## @@ -77,17 +83,37 @@ public void setup(final ConfigurationContext context) throws IOException { @Override public void onTrigger(ReportingContext context) { + String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue(); Review comment: The duplication of the sql string start time and end time evaluation logic could be prevented. I suggest a trait-like interface to achieve maximum potential like this: ```java public class MetricsEventReportingTask extends AbstractReportingTask implements QueryTimeAware { private List<PropertyDescriptor> properties; private MetricsQueryService metricsQueryService; private volatile RulesEngineService rulesEngineService; private volatile PropertyContextActionHandler actionHandler; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; } @Override protected void init(final ReportingInitializationContext config) { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(QueryMetricsUtil.QUERY); properties.add(QueryMetricsUtil.RULES_ENGINE); properties.add(QueryMetricsUtil.ACTION_HANDLER); properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION); properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE); this.properties = Collections.unmodifiableList(properties); } @OnScheduled public void setup(final ConfigurationContext context) { actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class); rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class); final Integer defaultPrecision = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger(); final Integer defaultScale = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE).evaluateAttributeExpressions().asInteger(); metricsQueryService = new MetricsSqlQueryService(getLogger(), defaultPrecision, defaultScale); } @Override public void onTrigger(ReportingContext context) { String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue(); try { sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME); sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME); fireRules(context, actionHandler, rulesEngineService, sql); } catch (Exception e) { getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e); } } private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception { getLogger().debug("Executing query: {}", query); QueryResult queryResult = metricsQueryService.query(context, query); ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult); Record record; try { while ((record = recordSet.next()) != null) { final Map<String, Object> facts = new HashMap<>(); for (String fieldName : record.getRawFieldNames()) { facts.put(fieldName, record.getValue(fieldName)); } List<Action> actions = engine.fireRules(facts); if (actions == null || actions.isEmpty()) { getLogger().debug("No actions required for provided facts."); } else { actions.forEach(action -> actionHandler.execute(context, action, facts)); } } } finally { metricsQueryService.closeQuietly(recordSet); } } } public class QueryNiFiReportingTask extends AbstractReportingTask implements QueryTimeAware { private List<PropertyDescriptor> properties; private volatile RecordSinkService recordSinkService; private MetricsQueryService metricsQueryService; @Override protected void init(final ReportingInitializationContext config) { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(QueryMetricsUtil.QUERY); properties.add(QueryMetricsUtil.RECORD_SINK); properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS); properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION); properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE); this.properties = Collections.unmodifiableList(properties); } @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; } @OnScheduled public void setup(final ConfigurationContext context) { recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class); recordSinkService.reset(); final Integer defaultPrecision = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger(); final Integer defaultScale = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE).evaluateAttributeExpressions().asInteger(); metricsQueryService = new MetricsSqlQueryService(getLogger(), defaultPrecision, defaultScale); } @Override public void onTrigger(ReportingContext context) { final StopWatch stopWatch = new StopWatch(true); String sql = context.getProperty(QueryMetricsUtil.QUERY).getValue(); try { sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME); sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME); getLogger().debug("Executing query: {}", sql); final QueryResult queryResult = metricsQueryService.query(context, sql); final ResultSetRecordSet recordSet; try { recordSet = metricsQueryService.getResultSetRecordSet(queryResult); } catch (final Exception e) { getLogger().error("Error creating record set from query results due to {}", new Object[]{e.getMessage()}, e); return; } try { final Map<String, String> attributes = new HashMap<>(); final String transactionId = UUID.randomUUID().toString(); attributes.put("reporting.task.transaction.id", transactionId); attributes.put("reporting.task.name", getName()); attributes.put("reporting.task.uuid", getIdentifier()); attributes.put("reporting.task.type", this.getClass().getSimpleName()); recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean()); } catch (Exception e) { getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e); return; } finally { metricsQueryService.closeQuietly(queryResult); } final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS); getLogger().debug("Successfully queried and sent in {} millis", elapsedMillis); } catch (Exception e) { getLogger().error("Error processing the query due to {}", new Object[]{e.getMessage()}, e); } } } public interface QueryTimeAware { default String processStartAndEndTimes( ReportingContext context, String sql, TrackedQueryTime queryStartTime, TrackedQueryTime queryEndTime ) throws IOException { StateManager stateManager = context.getStateManager(); final Map<String, String> stateMap = new HashMap<>(stateManager.getState(Scope.LOCAL).toMap()); if (sql.contains(queryStartTime.getSqlPlaceholder()) && sql.contains(queryEndTime.getSqlPlaceholder())) { final long startTime = stateMap.get(queryStartTime.name()) == null ? 0 : Long.parseLong(stateMap.get(queryStartTime.name())); final long currentTime = getCurrentTime(); sql = sql.replace(queryStartTime.getSqlPlaceholder(), String.valueOf(startTime)); sql = sql.replace(queryEndTime.getSqlPlaceholder(), String.valueOf(currentTime)); stateMap.put(queryStartTime.name(), String.valueOf(currentTime)); stateManager.setState(stateMap, Scope.LOCAL); } return sql; } default long getCurrentTime() { return Instant.now().toEpochMilli(); } } ``` ########## File path: nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java ########## @@ -49,27 +61,40 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +class TestMetricsEventReportingTask { Review comment: This test is already a bit more complicated than it needs to be, we can make it simpler like this: ```java class TestMetricsEventReportingTask { private static final String LOG = "LOG"; private static final String ALERT = "ALERT"; private ReportingContext context; private MockMetricsEventReportingTask reportingTask; private MockPropertyContextActionHandler actionHandler; private ProcessGroupStatus status; private MockQueryBulletinRepository mockBulletinRepository; private MockProvenanceRepository mockProvenanceRepository; private AtomicLong currentTime; private MockStateManager mockStateManager; @BeforeEach public void setup() { currentTime = new AtomicLong(); status = new ProcessGroupStatus(); actionHandler = new MockPropertyContextActionHandler(); status.setId("1234"); status.setFlowFilesReceived(5); status.setBytesReceived(10000); status.setFlowFilesSent(10); status.setBytesRead(20000L); status.setBytesSent(20000); status.setQueuedCount(100); status.setQueuedContentSize(1024L); status.setBytesWritten(80000L); status.setActiveThreadCount(5); // create a processor status with processing time ProcessorStatus procStatus = new ProcessorStatus(); procStatus.setId("proc"); procStatus.setProcessingNanos(123456789); Collection<ProcessorStatus> processorStatuses = new ArrayList<>(); processorStatuses.add(procStatus); status.setProcessorStatus(processorStatuses); ConnectionStatusPredictions connectionStatusPredictions = new ConnectionStatusPredictions(); connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000); connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000); connectionStatusPredictions.setNextPredictedQueuedCount(1000000000); connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L); ConnectionStatus root1ConnectionStatus = new ConnectionStatus(); root1ConnectionStatus.setId("root1"); root1ConnectionStatus.setQueuedCount(1000); root1ConnectionStatus.setPredictions(connectionStatusPredictions); ConnectionStatus root2ConnectionStatus = new ConnectionStatus(); root2ConnectionStatus.setId("root2"); root2ConnectionStatus.setQueuedCount(500); root2ConnectionStatus.setPredictions(connectionStatusPredictions); Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>(); rootConnectionStatuses.add(root1ConnectionStatus); rootConnectionStatuses.add(root2ConnectionStatus); status.setConnectionStatus(rootConnectionStatuses); } @Test void testConnectionStatusTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select connectionId, predictedQueuedCount, predictedTimeToBytesBackpressureMillis from CONNECTION_STATUS_PREDICTIONS"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<PropertyContext> propertyContexts = actionHandler.getPropertyContexts(); assertEquals(2, actionHandler.getRows().size()); assertEquals(2, propertyContexts.size()); } @Test void testUniqueBulletinQueryIsInTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); assertEquals(1, actionHandler.getRows().size()); actionHandler.reset(); final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(), "WARN", "test bulletin 2", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); currentTime.set(bulletin.getTimestamp().getTime()); reportingTask.onTrigger(context); assertEquals(1, actionHandler.getRows().size()); } @Test void testUniqueBulletinQueryIsOutOfTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); assertEquals(1, actionHandler.getRows().size()); actionHandler.reset(); final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(), "WARN", "test bulletin 2", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); currentTime.set(bulletin.getTimestamp().getTime() - 1); reportingTask.onTrigger(context); assertEquals(0, actionHandler.getRows().size()); } @Test void testUniqueProvenanceQueryIsInTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select componentId from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); assertEquals(1, actionHandler.getRows().size()); actionHandler.reset(); MockFlowFile mockFlowFile = new MockFlowFile(2L); ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("2") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 2") .setEventTime(Instant.now().toEpochMilli()) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 2") .setAlternateIdentifierUri("remote://test") .build(); mockProvenanceRepository.registerEvent(prov2); currentTime.set(prov2.getEventTime()); reportingTask.onTrigger(context); assertEquals(1, actionHandler.getRows().size()); } @Test void testUniqueProvenanceQueryIsOutOfTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select componentId from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); assertEquals(1, actionHandler.getRows().size()); actionHandler.reset(); MockFlowFile mockFlowFile = new MockFlowFile(2L); ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("2") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 2") .setEventTime(Instant.now().toEpochMilli()) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 2") .setAlternateIdentifierUri("remote://test") .build(); mockProvenanceRepository.registerEvent(prov2); currentTime.set(prov2.getEventTime() - 1); reportingTask.onTrigger(context); assertEquals(0, actionHandler.getRows().size()); } @Test void testTimeWindowFromStateMap() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " + "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " + "and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); long testBulletinStartTime = 1609538145L; long testProvenanceStartTime = 1641074145L; final Map<String, String> stateMap = new HashMap<>(); stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), String.valueOf(testBulletinStartTime)); stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), String.valueOf(testProvenanceStartTime)); mockStateManager.setState(stateMap, Scope.LOCAL); final long bulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name())); final long provenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name())); assertEquals(testBulletinStartTime, bulletinStartTime); assertEquals(testProvenanceStartTime, provenanceStartTime); final long currentTime = Instant.now().toEpochMilli(); this.currentTime.set(currentTime); reportingTask.onTrigger(context); final long updatedBulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name())); final long updatedProvenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name())); assertEquals(currentTime, updatedBulletinStartTime); assertEquals(currentTime, updatedProvenanceStartTime); } private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException { final ComponentLog logger = Mockito.mock(ComponentLog.class); reportingTask = new MockMetricsEventReportingTask(); final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); Mockito.when(initContext.getLogger()).thenReturn(logger); reportingTask.initialize(initContext); Map<PropertyDescriptor, String> properties = new HashMap<>(); for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) { properties.put(descriptor, descriptor.getDefaultValue()); } properties.putAll(customProperties); context = Mockito.mock(ReportingContext.class); Mockito.when(context.isAnalyticsEnabled()).thenReturn(true); mockStateManager = new MockStateManager(reportingTask); Mockito.when(context.getStateManager()).thenReturn(mockStateManager); Mockito.doAnswer((Answer<PropertyValue>) invocation -> { final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); return new MockPropertyValue(properties.get(descriptor)); }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); final EventAccess eventAccess = Mockito.mock(EventAccess.class); Mockito.when(context.getEventAccess()).thenReturn(eventAccess); Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); actionHandler = new MockPropertyContextActionHandler(); Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler); final PropertyValue resValue = Mockito.mock(StandardPropertyValue.class); MockRulesEngineService rulesEngineService = new MockRulesEngineService(); Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService); ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class); Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue); Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue); Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new MockPropertyValue("10")); Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new MockPropertyValue("0")); reportingTask.setup(configContext); setupMockProvenanceRepository(eventAccess); setupMockBulletinRepository(); return reportingTask; } private final class MockMetricsEventReportingTask extends MetricsEventReportingTask { @Override public long getCurrentTime() { return currentTime.get(); } } private void setupMockBulletinRepository() { mockBulletinRepository = new MockQueryBulletinRepository(); mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(), "WARN", "test bulletin 1", "testFlowFileUuid")); Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository); } private void setupMockProvenanceRepository(final EventAccess eventAccess) { mockProvenanceRepository = new MockProvenanceRepository(); long currentTimeMillis = System.currentTimeMillis(); Map<String, String> previousAttributes = new HashMap<>(); previousAttributes.put("mime.type", "application/json"); previousAttributes.put("test.value", "A"); Map<String, String> updatedAttributes = new HashMap<>(previousAttributes); updatedAttributes.put("test.value", "B"); // Generate provenance events and put them in a repository Processor processor = mock(Processor.class); SharedSessionState sharedState = new SharedSessionState(processor, new AtomicLong(0)); MockProcessSession processSession = new MockProcessSession(sharedState, processor); MockFlowFile mockFlowFile = processSession.createFlowFile("Test content".getBytes()); ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("1") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .setEventTime(currentTimeMillis) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 1") .setAlternateIdentifierUri("remote://test") .setAttributes(previousAttributes, updatedAttributes) .build(); mockProvenanceRepository.registerEvent(prov1); Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository); } private static class MockQueryBulletinRepository extends MockBulletinRepository { Map<String, List<Bulletin>> bulletins = new HashMap<>(); @Override public void addBulletin(Bulletin bulletin) { bulletins.computeIfAbsent(bulletin.getCategory(), key -> new ArrayList<>()) .add(bulletin); } @Override public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) { return new ArrayList<>( Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase())) .orElse(Collections.emptyList()) ); } @Override public List<Bulletin> findBulletinsForController() { return Optional.ofNullable(bulletins.get("controller")) .orElse(Collections.emptyList()); } } } ``` ########## File path: nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java ########## @@ -67,21 +72,26 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; - import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -public class TestQueryNiFiReportingTask { +class TestQueryNiFiReportingTask { Review comment: This test class can also be significantly simplified: ```suggestion class TestQueryNiFiReportingTask { private ReportingContext context; private MockQueryNiFiReportingTask reportingTask; private MockRecordSinkService mockRecordSinkService; private ProcessGroupStatus status; private BulletinRepository mockBulletinRepository; private MockProvenanceRepository mockProvenanceRepository; private AtomicLong currentTime; private MockStateManager mockStateManager; @BeforeEach public void setup() { currentTime = new AtomicLong(); status = new ProcessGroupStatus(); status.setId("1234"); status.setFlowFilesReceived(5); status.setBytesReceived(10000); status.setFlowFilesSent(10); status.setBytesRead(20000L); status.setBytesSent(20000); status.setQueuedCount(100); status.setQueuedContentSize(1024L); status.setBytesWritten(80000L); status.setActiveThreadCount(5); // create a processor status with processing time ProcessorStatus procStatus = new ProcessorStatus(); procStatus.setId("proc"); procStatus.setProcessingNanos(123456789); Collection<ProcessorStatus> processorStatuses = new ArrayList<>(); processorStatuses.add(procStatus); status.setProcessorStatus(processorStatuses); ConnectionStatus root1ConnectionStatus = new ConnectionStatus(); root1ConnectionStatus.setId("root1"); root1ConnectionStatus.setQueuedCount(1000); root1ConnectionStatus.setBackPressureObjectThreshold(1000); ConnectionStatus root2ConnectionStatus = new ConnectionStatus(); root2ConnectionStatus.setId("root2"); root2ConnectionStatus.setQueuedCount(500); root2ConnectionStatus.setBackPressureObjectThreshold(1000); Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>(); rootConnectionStatuses.add(root1ConnectionStatus); rootConnectionStatuses.add(root2ConnectionStatus); status.setConnectionStatus(rootConnectionStatuses); // create a group status with processing time ProcessGroupStatus groupStatus1 = new ProcessGroupStatus(); groupStatus1.setProcessorStatus(processorStatuses); groupStatus1.setBytesRead(1234L); // Create a nested group status with a connection ProcessGroupStatus groupStatus2 = new ProcessGroupStatus(); groupStatus2.setProcessorStatus(processorStatuses); groupStatus2.setBytesRead(12345L); ConnectionStatus nestedConnectionStatus = new ConnectionStatus(); nestedConnectionStatus.setId("nested"); nestedConnectionStatus.setQueuedCount(1001); Collection<ConnectionStatus> nestedConnectionStatuses = new ArrayList<>(); nestedConnectionStatuses.add(nestedConnectionStatus); groupStatus2.setConnectionStatus(nestedConnectionStatuses); Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>(); nestedGroupStatuses.add(groupStatus2); groupStatus1.setProcessGroupStatus(nestedGroupStatuses); ProcessGroupStatus groupStatus3 = new ProcessGroupStatus(); groupStatus3.setBytesRead(1L); ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus(); nestedConnectionStatus2.setId("nested2"); nestedConnectionStatus2.setQueuedCount(3); Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>(); nestedConnectionStatuses2.add(nestedConnectionStatus2); groupStatus3.setConnectionStatus(nestedConnectionStatuses2); Collection<ProcessGroupStatus> nestedGroupStatuses2 = new ArrayList<>(); nestedGroupStatuses2.add(groupStatus3); Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>(); groupStatuses.add(groupStatus1); groupStatuses.add(groupStatus3); status.setProcessGroupStatus(groupStatuses); } @Test void testConnectionStatusTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(4, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(3, row.size()); // Only projected 2 columns Object id = row.get("id"); assertTrue(id instanceof String); assertEquals("nested", id); assertEquals(1001, row.get("queuedCount")); // Validate the second row row = rows.get(1); id = row.get("id"); assertEquals("root1", id); assertEquals(1000, row.get("queuedCount")); assertEquals(true, row.get("isBackPressureEnabled")); // Validate the third row row = rows.get(2); id = row.get("id"); assertEquals("root2", id); assertEquals(500, row.get("queuedCount")); assertEquals(false, row.get("isBackPressureEnabled")); // Validate the fourth row row = rows.get(3); id = row.get("id"); assertEquals("nested2", id); assertEquals(3, row.get("queuedCount")); } @Test void testBulletinIsInTimeWindow() throws InitializationException { String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime"; final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, query); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(3, rows.size()); final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), "ERROR", "test bulletin 3", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); currentTime.set(bulletin.getTimestamp().getTime()); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(1, sameRows.size()); } @Test void testBulletinIsOutOfTimeWindow() throws InitializationException { String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime"; final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, query); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(3, rows.size()); final Bulletin bulletin = BulletinFactory.createBulletin("input port", "ERROR", "test bulletin 3", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); currentTime.set(bulletin.getTimestamp().getTime() - 1); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(0, sameRows.size()); } @Test void testProvenanceEventIsInTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1001, rows.size()); MockFlowFile mockFlowFile = new MockFlowFile(1002L); ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .setEventTime(Instant.now().toEpochMilli()) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 1") .setAlternateIdentifierUri("remote://test") .build(); mockProvenanceRepository.registerEvent(prov1002); currentTime.set(prov1002.getEventTime()); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(1, sameRows.size()); } @Test void testProvenanceEventIsOutOfTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1001, rows.size()); MockFlowFile mockFlowFile = new MockFlowFile(1002L); ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .setEventTime(Instant.now().toEpochMilli()) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 1") .setAlternateIdentifierUri("remote://test") .build(); mockProvenanceRepository.registerEvent(prov1002); currentTime.set(prov1002.getEventTime() - 1); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(0, sameRows.size()); } @Test void testUniqueProvenanceAndBulletinQuery() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " + "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " + "and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(3003, rows.size()); final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), "ERROR", "test bulletin 3", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); MockFlowFile mockFlowFile = new MockFlowFile(1002L); ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .build(); mockProvenanceRepository.registerEvent(prov1002); currentTime.set(bulletin.getTimestamp().getTime()); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(1, sameRows.size()); } @Test void testTimeWindowFromStateMap() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " + "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " + "and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); long testBulletinStartTime = 1609538145L; long testProvenanceStartTime = 1641074145L; final Map<String, String> stateMap = new HashMap<>(); stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), String.valueOf(testBulletinStartTime)); stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), String.valueOf(testProvenanceStartTime)); mockStateManager.setState(stateMap, Scope.LOCAL); final long bulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name())); final long provenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name())); assertEquals(testBulletinStartTime, bulletinStartTime); assertEquals(testProvenanceStartTime, provenanceStartTime); final long currentTime = Instant.now().toEpochMilli(); this.currentTime.set(currentTime); reportingTask.onTrigger(context); final long updatedBulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name())); final long updatedProvenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name())); assertEquals(currentTime, updatedBulletinStartTime); assertEquals(currentTime, updatedProvenanceStartTime); } //--NEW END @Test void testJvmMetricsTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select " + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT, MetricNames.JVM_THREAD_COUNT, MetricNames.JVM_THREAD_STATES_BLOCKED, MetricNames.JVM_THREAD_STATES_RUNNABLE, MetricNames.JVM_THREAD_STATES_TERMINATED, MetricNames.JVM_THREAD_STATES_TIMED_WAITING, MetricNames.JVM_UPTIME, MetricNames.JVM_HEAP_USED, MetricNames.JVM_HEAP_USAGE, MetricNames.JVM_NON_HEAP_USAGE, MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) -> s.replace(".", "_")).collect(Collectors.joining(",")) + " from JVM_METRICS"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1, rows.size()); Map<String, Object> row = rows.get(0); assertEquals(11, row.size()); assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".", "_")) instanceof Integer); assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".", "_")) instanceof Double); } @Test void testProcessGroupStatusTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(4, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(20, row.size()); assertEquals(1L, row.get("bytesRead")); // Validate the second row row = rows.get(1); assertEquals(1234L, row.get("bytesRead")); // Validate the third row row = rows.get(2); assertEquals(12345L, row.get("bytesRead")); // Validate the fourth row row = rows.get(3); assertEquals(20000L, row.get("bytesRead")); } @Test void testNoResults() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(0, rows.size()); } @Test void testProvenanceTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE order by eventId asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1001, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(24, row.size()); // Verify the first row contents assertEquals(0L, row.get("eventId")); assertEquals("CREATE", row.get("eventType")); assertEquals(12L, row.get("entitySize")); assertNull(row.get("contentPath")); assertNull(row.get("previousContentPath")); Object o = row.get("previousAttributes"); assertTrue(o instanceof Map); Map<String, String> previousAttributes = (Map<String, String>) o; assertEquals("A", previousAttributes.get("test.value")); o = row.get("updatedAttributes"); assertTrue(o instanceof Map); Map<String, String> updatedAttributes = (Map<String, String>) o; assertEquals("B", updatedAttributes.get("test.value")); // Verify some fields in the second row row = rows.get(1); assertEquals(24, row.size()); // Verify the second row contents assertEquals(1L, row.get("eventId")); assertEquals("DROP", row.get("eventType")); // Verify some fields in the last row row = rows.get(1000); assertEquals(24, row.size()); // Verify the last row contents assertEquals(1000L, row.get("eventId")); assertEquals("DROP", row.get("eventType")); } @Test void testBulletinTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS order by bulletinTimestamp asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); final List<Map<String, Object>> rows = mockRecordSinkService.getRows(); final String flowFileUuid = "testFlowFileUuid"; assertEquals(3, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(14, row.size()); assertNotNull(row.get("bulletinId")); assertEquals("controller", row.get("bulletinCategory")); assertEquals("WARN", row.get("bulletinLevel")); assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); // Validate the second row row = rows.get(1); assertEquals("processor", row.get("bulletinCategory")); assertEquals("INFO", row.get("bulletinLevel")); // Validate the third row row = rows.get(2); assertEquals("controller_service", row.get("bulletinCategory")); assertEquals("ERROR", row.get("bulletinLevel")); assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); } private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException { final ComponentLog logger = mock(ComponentLog.class); reportingTask = new MockQueryNiFiReportingTask(); final ReportingInitializationContext initContext = mock(ReportingInitializationContext.class); Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); Mockito.when(initContext.getLogger()).thenReturn(logger); reportingTask.initialize(initContext); Map<PropertyDescriptor, String> properties = new HashMap<>(); for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) { properties.put(descriptor, descriptor.getDefaultValue()); } properties.putAll(customProperties); context = mock(ReportingContext.class); mockStateManager = new MockStateManager(reportingTask); Mockito.when(context.getStateManager()).thenReturn(mockStateManager); Mockito.doAnswer((Answer<PropertyValue>) invocation -> { final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); return new MockPropertyValue(properties.get(descriptor)); }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); final EventAccess eventAccess = mock(EventAccess.class); Mockito.when(context.getEventAccess()).thenReturn(eventAccess); Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); final PropertyValue pValue = mock(StandardPropertyValue.class); mockRecordSinkService = new MockRecordSinkService(); Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService); ConfigurationContext configContext = mock(ConfigurationContext.class); Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new MockPropertyValue("10")); Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new MockPropertyValue("0")); reportingTask.setup(configContext); mockProvenanceRepository = new MockProvenanceRepository(); long currentTimeMillis = System.currentTimeMillis(); Map<String, String> previousAttributes = new HashMap<>(); previousAttributes.put("mime.type", "application/json"); previousAttributes.put("test.value", "A"); Map<String, String> updatedAttributes = new HashMap<>(previousAttributes); updatedAttributes.put("test.value", "B"); // Generate provenance events and put them in a repository Processor processor = mock(Processor.class); SharedSessionState sharedState = new SharedSessionState(processor, new AtomicLong(0)); MockProcessSession processSession = new MockProcessSession(sharedState, processor); MockFlowFile mockFlowFile = processSession.createFlowFile("Test content".getBytes()); ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .setEventTime(currentTimeMillis) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 1") .setAlternateIdentifierUri("remote://test") .setAttributes(previousAttributes, updatedAttributes) .build(); mockProvenanceRepository.registerEvent(prov1); for (int i = 1; i < 1001; i++) { String indexString = Integer.toString(i); mockFlowFile = processSession.createFlowFile(("Test content " + indexString).getBytes()); ProvenanceEventRecord prov = mockProvenanceRepository.eventBuilder() .fromFlowFile(mockFlowFile) .setEventType(ProvenanceEventType.DROP) .setComponentId(indexString) .setComponentType("Processor") .setFlowFileUUID("I am FlowFile " + indexString) .setEventTime(currentTimeMillis - i) .build(); mockProvenanceRepository.registerEvent(prov); } Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository); mockBulletinRepository = new MockQueryBulletinRepository(); mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2", "testFlowFileUuid")); mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(), "INFO", "test bulletin 1", "testFlowFileUuid")); mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(), "ERROR", "test bulletin 2", "testFlowFileUuid")); Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository); return reportingTask; } private final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask { @Override public long getCurrentTime() { return currentTime.get(); } } private static class MockQueryBulletinRepository extends MockBulletinRepository { Map<String, List<Bulletin>> bulletins = new HashMap<>(); @Override public void addBulletin(Bulletin bulletin) { bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new ArrayList<>()) .add(bulletin); } @Override public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) { return new ArrayList<>( Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase())) .orElse(Collections.emptyList())); } @Override public List<Bulletin> findBulletinsForController() { return Optional.ofNullable(bulletins.get("controller")) .orElse(Collections.emptyList()); } } } ``` ########## File path: nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java ########## @@ -67,21 +72,26 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; - import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -public class TestQueryNiFiReportingTask { +class TestQueryNiFiReportingTask { Review comment: This test class can also be significantly simplified: ```java class TestQueryNiFiReportingTask { private ReportingContext context; private MockQueryNiFiReportingTask reportingTask; private MockRecordSinkService mockRecordSinkService; private ProcessGroupStatus status; private BulletinRepository mockBulletinRepository; private MockProvenanceRepository mockProvenanceRepository; private AtomicLong currentTime; private MockStateManager mockStateManager; @BeforeEach public void setup() { currentTime = new AtomicLong(); status = new ProcessGroupStatus(); status.setId("1234"); status.setFlowFilesReceived(5); status.setBytesReceived(10000); status.setFlowFilesSent(10); status.setBytesRead(20000L); status.setBytesSent(20000); status.setQueuedCount(100); status.setQueuedContentSize(1024L); status.setBytesWritten(80000L); status.setActiveThreadCount(5); // create a processor status with processing time ProcessorStatus procStatus = new ProcessorStatus(); procStatus.setId("proc"); procStatus.setProcessingNanos(123456789); Collection<ProcessorStatus> processorStatuses = new ArrayList<>(); processorStatuses.add(procStatus); status.setProcessorStatus(processorStatuses); ConnectionStatus root1ConnectionStatus = new ConnectionStatus(); root1ConnectionStatus.setId("root1"); root1ConnectionStatus.setQueuedCount(1000); root1ConnectionStatus.setBackPressureObjectThreshold(1000); ConnectionStatus root2ConnectionStatus = new ConnectionStatus(); root2ConnectionStatus.setId("root2"); root2ConnectionStatus.setQueuedCount(500); root2ConnectionStatus.setBackPressureObjectThreshold(1000); Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>(); rootConnectionStatuses.add(root1ConnectionStatus); rootConnectionStatuses.add(root2ConnectionStatus); status.setConnectionStatus(rootConnectionStatuses); // create a group status with processing time ProcessGroupStatus groupStatus1 = new ProcessGroupStatus(); groupStatus1.setProcessorStatus(processorStatuses); groupStatus1.setBytesRead(1234L); // Create a nested group status with a connection ProcessGroupStatus groupStatus2 = new ProcessGroupStatus(); groupStatus2.setProcessorStatus(processorStatuses); groupStatus2.setBytesRead(12345L); ConnectionStatus nestedConnectionStatus = new ConnectionStatus(); nestedConnectionStatus.setId("nested"); nestedConnectionStatus.setQueuedCount(1001); Collection<ConnectionStatus> nestedConnectionStatuses = new ArrayList<>(); nestedConnectionStatuses.add(nestedConnectionStatus); groupStatus2.setConnectionStatus(nestedConnectionStatuses); Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>(); nestedGroupStatuses.add(groupStatus2); groupStatus1.setProcessGroupStatus(nestedGroupStatuses); ProcessGroupStatus groupStatus3 = new ProcessGroupStatus(); groupStatus3.setBytesRead(1L); ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus(); nestedConnectionStatus2.setId("nested2"); nestedConnectionStatus2.setQueuedCount(3); Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>(); nestedConnectionStatuses2.add(nestedConnectionStatus2); groupStatus3.setConnectionStatus(nestedConnectionStatuses2); Collection<ProcessGroupStatus> nestedGroupStatuses2 = new ArrayList<>(); nestedGroupStatuses2.add(groupStatus3); Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>(); groupStatuses.add(groupStatus1); groupStatuses.add(groupStatus3); status.setProcessGroupStatus(groupStatuses); } @Test void testConnectionStatusTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(4, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(3, row.size()); // Only projected 2 columns Object id = row.get("id"); assertTrue(id instanceof String); assertEquals("nested", id); assertEquals(1001, row.get("queuedCount")); // Validate the second row row = rows.get(1); id = row.get("id"); assertEquals("root1", id); assertEquals(1000, row.get("queuedCount")); assertEquals(true, row.get("isBackPressureEnabled")); // Validate the third row row = rows.get(2); id = row.get("id"); assertEquals("root2", id); assertEquals(500, row.get("queuedCount")); assertEquals(false, row.get("isBackPressureEnabled")); // Validate the fourth row row = rows.get(3); id = row.get("id"); assertEquals("nested2", id); assertEquals(3, row.get("queuedCount")); } @Test void testBulletinIsInTimeWindow() throws InitializationException { String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime"; final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, query); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(3, rows.size()); final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), "ERROR", "test bulletin 3", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); currentTime.set(bulletin.getTimestamp().getTime()); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(1, sameRows.size()); } @Test void testBulletinIsOutOfTimeWindow() throws InitializationException { String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime"; final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, query); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(3, rows.size()); final Bulletin bulletin = BulletinFactory.createBulletin("input port", "ERROR", "test bulletin 3", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); currentTime.set(bulletin.getTimestamp().getTime() - 1); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(0, sameRows.size()); } @Test void testProvenanceEventIsInTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1001, rows.size()); MockFlowFile mockFlowFile = new MockFlowFile(1002L); ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .setEventTime(Instant.now().toEpochMilli()) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 1") .setAlternateIdentifierUri("remote://test") .build(); mockProvenanceRepository.registerEvent(prov1002); currentTime.set(prov1002.getEventTime()); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(1, sameRows.size()); } @Test void testProvenanceEventIsOutOfTimeWindow() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1001, rows.size()); MockFlowFile mockFlowFile = new MockFlowFile(1002L); ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .setEventTime(Instant.now().toEpochMilli()) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 1") .setAlternateIdentifierUri("remote://test") .build(); mockProvenanceRepository.registerEvent(prov1002); currentTime.set(prov1002.getEventTime() - 1); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(0, sameRows.size()); } @Test void testUniqueProvenanceAndBulletinQuery() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " + "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " + "and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); currentTime.set(Instant.now().toEpochMilli()); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(3003, rows.size()); final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), "ERROR", "test bulletin 3", "testFlowFileUuid"); mockBulletinRepository.addBulletin(bulletin); MockFlowFile mockFlowFile = new MockFlowFile(1002L); ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .build(); mockProvenanceRepository.registerEvent(prov1002); currentTime.set(bulletin.getTimestamp().getTime()); reportingTask.onTrigger(context); List<Map<String, Object>> sameRows = mockRecordSinkService.getRows(); assertEquals(1, sameRows.size()); } @Test void testTimeWindowFromStateMap() throws IOException, InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " + "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " + "and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime"); reportingTask = initTask(properties); long testBulletinStartTime = 1609538145L; long testProvenanceStartTime = 1641074145L; final Map<String, String> stateMap = new HashMap<>(); stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), String.valueOf(testBulletinStartTime)); stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), String.valueOf(testProvenanceStartTime)); mockStateManager.setState(stateMap, Scope.LOCAL); final long bulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name())); final long provenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name())); assertEquals(testBulletinStartTime, bulletinStartTime); assertEquals(testProvenanceStartTime, provenanceStartTime); final long currentTime = Instant.now().toEpochMilli(); this.currentTime.set(currentTime); reportingTask.onTrigger(context); final long updatedBulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name())); final long updatedProvenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name())); assertEquals(currentTime, updatedBulletinStartTime); assertEquals(currentTime, updatedProvenanceStartTime); } //--NEW END @Test void testJvmMetricsTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select " + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT, MetricNames.JVM_THREAD_COUNT, MetricNames.JVM_THREAD_STATES_BLOCKED, MetricNames.JVM_THREAD_STATES_RUNNABLE, MetricNames.JVM_THREAD_STATES_TERMINATED, MetricNames.JVM_THREAD_STATES_TIMED_WAITING, MetricNames.JVM_UPTIME, MetricNames.JVM_HEAP_USED, MetricNames.JVM_HEAP_USAGE, MetricNames.JVM_NON_HEAP_USAGE, MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) -> s.replace(".", "_")).collect(Collectors.joining(",")) + " from JVM_METRICS"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1, rows.size()); Map<String, Object> row = rows.get(0); assertEquals(11, row.size()); assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".", "_")) instanceof Integer); assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".", "_")) instanceof Double); } @Test void testProcessGroupStatusTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(4, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(20, row.size()); assertEquals(1L, row.get("bytesRead")); // Validate the second row row = rows.get(1); assertEquals(1234L, row.get("bytesRead")); // Validate the third row row = rows.get(2); assertEquals(12345L, row.get("bytesRead")); // Validate the fourth row row = rows.get(3); assertEquals(20000L, row.get("bytesRead")); } @Test void testNoResults() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(0, rows.size()); } @Test void testProvenanceTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE order by eventId asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); List<Map<String, Object>> rows = mockRecordSinkService.getRows(); assertEquals(1001, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(24, row.size()); // Verify the first row contents assertEquals(0L, row.get("eventId")); assertEquals("CREATE", row.get("eventType")); assertEquals(12L, row.get("entitySize")); assertNull(row.get("contentPath")); assertNull(row.get("previousContentPath")); Object o = row.get("previousAttributes"); assertTrue(o instanceof Map); Map<String, String> previousAttributes = (Map<String, String>) o; assertEquals("A", previousAttributes.get("test.value")); o = row.get("updatedAttributes"); assertTrue(o instanceof Map); Map<String, String> updatedAttributes = (Map<String, String>) o; assertEquals("B", updatedAttributes.get("test.value")); // Verify some fields in the second row row = rows.get(1); assertEquals(24, row.size()); // Verify the second row contents assertEquals(1L, row.get("eventId")); assertEquals("DROP", row.get("eventType")); // Verify some fields in the last row row = rows.get(1000); assertEquals(24, row.size()); // Verify the last row contents assertEquals(1000L, row.get("eventId")); assertEquals("DROP", row.get("eventType")); } @Test void testBulletinTable() throws InitializationException { final Map<PropertyDescriptor, String> properties = new HashMap<>(); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS order by bulletinTimestamp asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); final List<Map<String, Object>> rows = mockRecordSinkService.getRows(); final String flowFileUuid = "testFlowFileUuid"; assertEquals(3, rows.size()); // Validate the first row Map<String, Object> row = rows.get(0); assertEquals(14, row.size()); assertNotNull(row.get("bulletinId")); assertEquals("controller", row.get("bulletinCategory")); assertEquals("WARN", row.get("bulletinLevel")); assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); // Validate the second row row = rows.get(1); assertEquals("processor", row.get("bulletinCategory")); assertEquals("INFO", row.get("bulletinLevel")); // Validate the third row row = rows.get(2); assertEquals("controller_service", row.get("bulletinCategory")); assertEquals("ERROR", row.get("bulletinLevel")); assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); } private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException { final ComponentLog logger = mock(ComponentLog.class); reportingTask = new MockQueryNiFiReportingTask(); final ReportingInitializationContext initContext = mock(ReportingInitializationContext.class); Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); Mockito.when(initContext.getLogger()).thenReturn(logger); reportingTask.initialize(initContext); Map<PropertyDescriptor, String> properties = new HashMap<>(); for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) { properties.put(descriptor, descriptor.getDefaultValue()); } properties.putAll(customProperties); context = mock(ReportingContext.class); mockStateManager = new MockStateManager(reportingTask); Mockito.when(context.getStateManager()).thenReturn(mockStateManager); Mockito.doAnswer((Answer<PropertyValue>) invocation -> { final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); return new MockPropertyValue(properties.get(descriptor)); }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); final EventAccess eventAccess = mock(EventAccess.class); Mockito.when(context.getEventAccess()).thenReturn(eventAccess); Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); final PropertyValue pValue = mock(StandardPropertyValue.class); mockRecordSinkService = new MockRecordSinkService(); Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService); ConfigurationContext configContext = mock(ConfigurationContext.class); Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new MockPropertyValue("10")); Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new MockPropertyValue("0")); reportingTask.setup(configContext); mockProvenanceRepository = new MockProvenanceRepository(); long currentTimeMillis = System.currentTimeMillis(); Map<String, String> previousAttributes = new HashMap<>(); previousAttributes.put("mime.type", "application/json"); previousAttributes.put("test.value", "A"); Map<String, String> updatedAttributes = new HashMap<>(previousAttributes); updatedAttributes.put("test.value", "B"); // Generate provenance events and put them in a repository Processor processor = mock(Processor.class); SharedSessionState sharedState = new SharedSessionState(processor, new AtomicLong(0)); MockProcessSession processSession = new MockProcessSession(sharedState, processor); MockFlowFile mockFlowFile = processSession.createFlowFile("Test content".getBytes()); ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder() .setEventType(ProvenanceEventType.CREATE) .fromFlowFile(mockFlowFile) .setComponentId("12345") .setComponentType("ReportingTask") .setFlowFileUUID("I am FlowFile 1") .setEventTime(currentTimeMillis) .setEventDuration(100) .setTransitUri("test://") .setSourceSystemFlowFileIdentifier("I am FlowFile 1") .setAlternateIdentifierUri("remote://test") .setAttributes(previousAttributes, updatedAttributes) .build(); mockProvenanceRepository.registerEvent(prov1); for (int i = 1; i < 1001; i++) { String indexString = Integer.toString(i); mockFlowFile = processSession.createFlowFile(("Test content " + indexString).getBytes()); ProvenanceEventRecord prov = mockProvenanceRepository.eventBuilder() .fromFlowFile(mockFlowFile) .setEventType(ProvenanceEventType.DROP) .setComponentId(indexString) .setComponentType("Processor") .setFlowFileUUID("I am FlowFile " + indexString) .setEventTime(currentTimeMillis - i) .build(); mockProvenanceRepository.registerEvent(prov); } Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository); mockBulletinRepository = new MockQueryBulletinRepository(); mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2", "testFlowFileUuid")); mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(), "INFO", "test bulletin 1", "testFlowFileUuid")); mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(), "ERROR", "test bulletin 2", "testFlowFileUuid")); Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository); return reportingTask; } private final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask { @Override public long getCurrentTime() { return currentTime.get(); } } private static class MockQueryBulletinRepository extends MockBulletinRepository { Map<String, List<Bulletin>> bulletins = new HashMap<>(); @Override public void addBulletin(Bulletin bulletin) { bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new ArrayList<>()) .add(bulletin); } @Override public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) { return new ArrayList<>( Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase())) .orElse(Collections.emptyList())); } @Override public List<Bulletin> findBulletinsForController() { return Optional.ofNullable(bulletins.get("controller")) .orElse(Collections.emptyList()); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org