NIFI-724: Ensure that bulletins generated for reporting tasks and controller services are shown at Controller level as well as component level
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e7c0461b Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e7c0461b Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e7c0461b Branch: refs/heads/master Commit: e7c0461b15bff045d68e7ae8814eda2073cba209 Parents: 59aa8ff Author: Mark Payne <marka...@hotmail.com> Authored: Wed Jul 1 12:54:18 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Jul 1 12:54:18 2015 -0400 ---------------------------------------------------------------------- .../nifi/events/VolatileBulletinRepository.java | 105 ++++++++++++++----- 1 file changed, 78 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e7c0461b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java index c18fffd..8aeb34d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java @@ -36,6 +36,8 @@ public class VolatileBulletinRepository implements BulletinRepository { private static final int CONTROLLER_BUFFER_SIZE = 10; private static final int COMPONENT_BUFFER_SIZE = 5; private static final String CONTROLLER_BULLETIN_STORE_KEY = "CONTROLLER"; + private static final String SERVICE_BULLETIN_STORE_KEY = "SERVICE"; + private static final String REPORTING_TASK_BULLETIN_STORE_KEY = "REPORTING_TASK"; private final ConcurrentMap<String, ConcurrentMap<String, RingBuffer<Bulletin>>> bulletinStoreMap = new ConcurrentHashMap<>(); private volatile BulletinProcessingStrategy processingStrategy = new DefaultBulletinProcessingStrategy(); @@ -170,18 +172,39 @@ public class VolatileBulletinRepository implements BulletinRepository { public List<Bulletin> findBulletinsForController(final int max) { final long fiveMinutesAgo = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5); - final ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY); - if (componentMap == null) { - return Collections.<Bulletin>emptyList(); - } - - final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY); - return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() { + final Filter<Bulletin> filter = new Filter<Bulletin>() { @Override public boolean select(final Bulletin bulletin) { return bulletin.getTimestamp().getTime() >= fiveMinutesAgo; } - }, max); + }; + + final List<Bulletin> controllerBulletins = new ArrayList<>(); + + final ConcurrentMap<String, RingBuffer<Bulletin>> controllerBulletinMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY); + if (controllerBulletinMap != null) { + final RingBuffer<Bulletin> buffer = controllerBulletinMap.get(CONTROLLER_BULLETIN_STORE_KEY); + if (buffer != null) { + controllerBulletins.addAll(buffer.getSelectedElements(filter, max)); + } + } + + for (final String key : new String[] { SERVICE_BULLETIN_STORE_KEY, REPORTING_TASK_BULLETIN_STORE_KEY }) { + final ConcurrentMap<String, RingBuffer<Bulletin>> bulletinMap = bulletinStoreMap.get(key); + if (bulletinMap != null) { + for (final RingBuffer<Bulletin> buffer : bulletinMap.values()) { + controllerBulletins.addAll(buffer.getSelectedElements(filter, max)); + } + } + } + + // We only want the newest bulletin, so we sort based on time and take the top 'max' entries + Collections.sort(controllerBulletins); + if (controllerBulletins.size() > max) { + return controllerBulletins.subList(0, max); + } + + return controllerBulletins; } /** @@ -203,7 +226,7 @@ public class VolatileBulletinRepository implements BulletinRepository { this.processingStrategy = new DefaultBulletinProcessingStrategy(); } - private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) { + private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) { final String storageKey = getBulletinStoreKey(bulletin); ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey); @@ -215,40 +238,68 @@ public class VolatileBulletinRepository implements BulletinRepository { } } - final boolean controllerBulletin = isControllerBulletin(bulletin); - final String sourceId = controllerBulletin ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getSourceId(); - RingBuffer<Bulletin> bulletinBuffer = componentMap.get(sourceId); - if (bulletinBuffer == null) { - final int bufferSize = controllerBulletin ? CONTROLLER_BUFFER_SIZE : COMPONENT_BUFFER_SIZE; - bulletinBuffer = new RingBuffer<>(bufferSize); - final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(sourceId, bulletinBuffer); - if (existingBuffer != null) { - bulletinBuffer = existingBuffer; + final List<RingBuffer<Bulletin>> buffers = new ArrayList<>(2); + + if (isControllerBulletin(bulletin)) { + RingBuffer<Bulletin> bulletinBuffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY); + if (bulletinBuffer == null) { + bulletinBuffer = new RingBuffer<>(CONTROLLER_BUFFER_SIZE); + final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(CONTROLLER_BULLETIN_STORE_KEY, bulletinBuffer); + if (existingBuffer != null) { + bulletinBuffer = existingBuffer; + } } + + buffers.add(bulletinBuffer); } - return bulletinBuffer; + if (bulletin.getSourceType() != ComponentType.FLOW_CONTROLLER) { + RingBuffer<Bulletin> bulletinBuffer = componentMap.get(bulletin.getSourceId()); + if (bulletinBuffer == null) { + bulletinBuffer = new RingBuffer<>(COMPONENT_BUFFER_SIZE); + final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(bulletin.getSourceId(), bulletinBuffer); + if (existingBuffer != null) { + bulletinBuffer = existingBuffer; + } + } + + buffers.add(bulletinBuffer); + } + + return buffers; } private String getBulletinStoreKey(final Bulletin bulletin) { - if (isControllerBulletin(bulletin)) { - return CONTROLLER_BULLETIN_STORE_KEY; + switch (bulletin.getSourceType()) { + case FLOW_CONTROLLER: + return CONTROLLER_BULLETIN_STORE_KEY; + case CONTROLLER_SERVICE: + return SERVICE_BULLETIN_STORE_KEY; + case REPORTING_TASK: + return REPORTING_TASK_BULLETIN_STORE_KEY; + default: + return bulletin.getGroupId(); } - - final String groupId = bulletin.getGroupId(); - return groupId == null ? bulletin.getSourceId() : groupId; } private boolean isControllerBulletin(final Bulletin bulletin) { - return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType()); + switch (bulletin.getSourceType()) { + case FLOW_CONTROLLER: + case CONTROLLER_SERVICE: + case REPORTING_TASK: + return true; + default: + return false; + } } private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy { @Override public void update(final Bulletin bulletin) { - final RingBuffer<Bulletin> bulletinBuffer = getBulletinBuffer(bulletin); - bulletinBuffer.add(bulletin); + for (final RingBuffer<Bulletin> bulletinBuffer : getBulletinBuffers(bulletin)) { + bulletinBuffer.add(bulletin); + } } } }