NIFI-35: Provide an EventReporter to the FlowFileSwapManager and provide events for any errors
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1cc3ce57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1cc3ce57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1cc3ce57 Branch: refs/heads/NIFI-169 Commit: 1cc3ce57556eb7cf9a5f94b269eb24b284e518eb Parents: 9e60aa0 Author: Mark Payne <marka...@hotmail.com> Authored: Mon Dec 15 14:28:11 2014 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Mon Dec 15 14:28:11 2014 -0500 ---------------------------------------------------------------------- .../nifi/controller/FileSystemSwapManager.java | 33 +++++++++++++++----- .../apache/nifi/controller/FlowController.java | 20 +++++++----- .../repository/FlowFileSwapManager.java | 5 ++- 3 files changed, 41 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1cc3ce57/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 3af2098..ad95f8e 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -61,11 +61,12 @@ import org.apache.nifi.controller.repository.StandardFlowFileRecord; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.io.BufferedOutputStream; import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,10 +81,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager { public static final int MINIMUM_SWAP_COUNT = 10000; private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); public static final int SWAP_ENCODING_VERSION = 6; + public static final String EVENT_CATEGORY = "Swap FlowFiles"; private final ScheduledExecutorService swapQueueIdentifierExecutor; private final ScheduledExecutorService swapInExecutor; private volatile FlowFileRepository flowFileRepository; + private volatile EventReporter eventReporter; // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>(); @@ -129,9 +132,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } - public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) { + public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) { this.claimManager = claimManager; this.flowFileRepository = flowFileRepository; + this.eventReporter = eventReporter; swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS); swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS); } @@ -437,10 +441,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } if (!swapFile.delete()) { - logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually"); + final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"; + logger.warn(errMsg); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg); } } catch (final Exception e) { - logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e); + final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e; + logger.error(errMsg); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); + if (swapFile != null) { queue.add(swapFile); } @@ -488,7 +497,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } catch (final IOException ioe) { recordsSwapped = 0; flowFileQueue.putSwappedRecords(toSwap); - logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe); + final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe; + logger.error(errMsg); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); } if (recordsSwapped > 0) { @@ -549,14 +560,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { - throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); + final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " + + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; + + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); + throw new IOException(errMsg); } final String connectionId = in.readUTF(); final FlowFileQueue queue = queueMap.get(connectionId); if (queue == null) { logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist"); continue; } @@ -579,7 +594,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager { maxRecoveredId = maxId; } } catch (final IOException ioe) { - logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString()); + final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe; + logger.error(errMsg); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); if (logger.isDebugEnabled()) { logger.error("", ioe); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1cc3ce57/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java index e1abe4e..545017a 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -388,13 +388,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H try { this.provenanceEventRepository = createProvenanceRepository(properties); - this.provenanceEventRepository.initialize(new EventReporter() { - @Override - public void reportEvent(final Severity severity, final String category, final String message) { - final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); - bulletinRepository.addBulletin(bulletin); - } - }); + this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository)); this.contentRepository = createContentRepository(properties); } catch (final Exception e) { @@ -516,6 +510,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H } } + private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) { + return new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); + bulletinRepository.addBulletin(bulletin); + } + }; + } + public void initializeFlow() throws IOException { writeLock.lock(); try { @@ -537,7 +541,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H contentRepository.cleanup(); if (flowFileSwapManager != null) { - flowFileSwapManager.start(flowFileRepository, this, contentClaimManager); + flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository)); } if (externalSiteListener != null) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1cc3ce57/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index 739cb2b..c6daab8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.events.EventReporter; /** * Defines a mechanism by which FlowFiles can be move into external storage or @@ -34,8 +35,10 @@ public interface FlowFileSwapManager { * can be obtained and restored * @param claimManager the ContentClaimManager to use for interacting with * Content Claims + * @param reporter the EventReporter that can be used for notifying users of + * important events */ - void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager); + void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter); /** * Shuts down the manager