Repository: nifi Updated Branches: refs/heads/master a0d1aae60 -> 0ffdc2eb9
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 38722c5..2d0bbcb 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -179,8 +179,29 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { private Authorizer authorizer; // effectively final private ProvenanceAuthorizableFactory resourceFactory; // effectively final - public PersistentProvenanceRepository() throws IOException { - this(createRepositoryConfiguration(), 10000); + /** + * default no args constructor for service loading only. + */ + public PersistentProvenanceRepository() { + maxPartitionMillis = 0; + maxPartitionBytes = 0; + writers = null; + configuration = null; + indexConfig = null; + indexManager = null; + alwaysSync = false; + rolloverCheckMillis = 0; + maxAttributeChars = 0; + scheduledExecService = null; + rolloverExecutor = null; + queryExecService = null; + eventReporter = null; + authorizer = null; + resourceFactory = null; + } + + public PersistentProvenanceRepository(final NiFiProperties nifiProperties) throws IOException { + this(createRepositoryConfiguration(nifiProperties), 10000); } public PersistentProvenanceRepository(final RepositoryConfiguration configuration, final int rolloverCheckMillis) throws IOException { @@ -296,34 +317,33 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } } - private static RepositoryConfiguration createRepositoryConfiguration() throws IOException { - final NiFiProperties properties = NiFiProperties.getInstance(); - final Map<String, Path> storageDirectories = properties.getProvenanceRepositoryPaths(); + private static RepositoryConfiguration createRepositoryConfiguration(final NiFiProperties nifiProperties) throws IOException { + final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths(); if (storageDirectories.isEmpty()) { storageDirectories.put("provenance_repository", Paths.get("provenance_repository")); } - final String storageTime = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours"); - final String storageSize = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB"); - final String rolloverTime = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins"); - final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); - final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); - final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); - final int indexThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1); - final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); + final String storageTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours"); + final String storageSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB"); + final String rolloverTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins"); + final String rolloverSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); + final String shardSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); + final int queryThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); + final int indexThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1); + final int journalCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS); final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue(); final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS); final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, DataUnit.B).longValue(); - final boolean compressOnRollover = Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)); - final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); - final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); + final boolean compressOnRollover = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)); + final String indexedFieldString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); + final String indexedAttrString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); - final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false")); + final Boolean alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty("nifi.provenance.repository.always.sync", "false")); final int defaultMaxAttrChars = 65536; - final String maxAttrLength = properties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars)); + final String maxAttrLength = nifiProperties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars)); int maxAttrChars; try { maxAttrChars = Integer.parseInt(maxAttrLength); @@ -389,8 +409,9 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * @return the maximum number of characters that any Event attribute should contain. If the event contains - * more characters than this, the attribute may be truncated on retrieval + * @return the maximum number of characters that any Event attribute should + * contain. If the event contains more characters than this, the attribute + * may be truncated on retrieval */ public int getMaxAttributeCharacters() { return maxAttributeChars; @@ -437,7 +458,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } private List<ProvenanceEventRecord> filterUnauthorizedEvents(final List<ProvenanceEventRecord> events, final NiFiUser user) { - return events.stream().filter(event -> isAuthorized(event, user)).collect(Collectors.<ProvenanceEventRecord> toList()); + return events.stream().filter(event -> isAuthorized(event, user)).collect(Collectors.<ProvenanceEventRecord>toList()); } private Set<ProvenanceEventRecord> replaceUnauthorizedWithPlaceholders(final Set<ProvenanceEventRecord> events, final NiFiUser user) { @@ -463,9 +484,9 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { // if this is the first record, try to find out the block index and jump directly to // the block index. This avoids having to read through a lot of data that we don't care about // just to get to the first record that we want. - if ( records.isEmpty() ) { + if (records.isEmpty()) { final TocReader tocReader = reader.getTocReader(); - if ( tocReader != null ) { + if (tocReader != null) { final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId); if (blockIndex != null) { reader.skipToBlock(blockIndex); @@ -661,7 +682,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { // Read the records in the last file to find its max id if (greatestMinIdFile != null) { - try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path> emptyList(), maxAttributeChars)) { + try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList(), maxAttributeChars)) { maxId = recordReader.getMaxEventId(); } } @@ -700,7 +721,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { getIndexManager().close(); - if ( writers != null ) { + if (writers != null) { for (final RecordWriter writer : writers) { writer.close(); } @@ -730,7 +751,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { final int numDirty = dirtyWriterCount.get(); if (numDirty >= recordWriters.length) { throw new IllegalStateException("Cannot update repository because all partitions are unusable at this time. Writing to the repository would cause corruption. " - + "This most often happens as a result of the repository running out of disk space or the JVM running out of memory."); + + "This most often happens as a result of the repository running out of disk space or the JVM running out of memory."); } final long idx = writerIndex.getAndIncrement(); @@ -825,7 +846,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * @return all of the Provenance Event Log Files (not the journals, the merged files) available across all storage directories. + * @return all of the Provenance Event Log Files (not the journals, the + * merged files) available across all storage directories. */ private List<File> getLogFiles() { final List<File> files = new ArrayList<>(); @@ -844,8 +866,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { * Returns the size, in bytes, of the Repository storage * * @param logFiles the log files to consider - * @param timeCutoff if a log file's last modified date is before timeCutoff, it will be skipped - * @return the size of all log files given whose last mod date comes after (or equal to) timeCutoff + * @param timeCutoff if a log file's last modified date is before + * timeCutoff, it will be skipped + * @return the size of all log files given whose last mod date comes after + * (or equal to) timeCutoff */ public long getSize(final List<File> logFiles, final long timeCutoff) { long bytesUsed = 0L; @@ -977,9 +1001,9 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional " + "Expiration Actions on this file at this time", currentAction, file, t.toString()); logger.warn("", t); - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + - " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " + - "on this file at this time"); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " + + "on this file at this time"); } } @@ -1052,7 +1076,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { maxEventId = reader.getMaxEventId(); } catch (final IOException ioe) { logger.warn("Unable to determine the maximum ID for Provenance Event Log File {}; values reported for the number of " - + "events in the Provenance Repository may be inaccurate.", firstLogFile); + + "events in the Provenance Repository may be inaccurate.", firstLogFile); } // check if we can delete the index safely. @@ -1095,8 +1119,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * Recursively deletes the given directory. If unable to delete the directory, will emit a WARN level - * log event and move on. + * Recursively deletes the given directory. If unable to delete the + * directory, will emit a WARN level log event and move on. * * @param dir the directory to delete */ @@ -1124,8 +1148,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * @return a List of all Index directories, sorted by timestamp of the earliest event that could - * be present in the index + * @return a List of all Index directories, sorted by timestamp of the + * earliest event that could be present in the index */ private List<File> getAllIndexDirectories() { final List<File> allIndexDirs = new ArrayList<>(); @@ -1157,8 +1181,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * Takes a File that has a filename "index-" followed by a Long and returns the - * value of that Long + * Takes a File that has a filename "index-" followed by a Long and returns + * the value of that Long * * @param indexDirectory the index directory to obtain the timestamp for * @return the timestamp associated with the given index @@ -1170,7 +1194,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * Blocks the calling thread until the repository rolls over. This is intended for unit testing. + * Blocks the calling thread until the repository rolls over. This is + * intended for unit testing. */ public void waitForRollover() { final int count = rolloverCompletions.get(); @@ -1183,16 +1208,17 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * @return the number of journal files that exist across all storage directories + * @return the number of journal files that exist across all storage + * directories */ // made protected for testing purposes protected int getJournalCount() { // determine how many 'journals' we have in the journals directories int journalFileCount = 0; - for ( final File storageDir : configuration.getStorageDirectories() ) { + for (final File storageDir : configuration.getStorageDirectories()) { final File journalsDir = new File(storageDir, "journals"); final File[] journalFiles = journalsDir.listFiles(); - if ( journalFiles != null ) { + if (journalFiles != null) { journalFileCount += journalFiles.length; } } @@ -1200,7 +1226,6 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return journalFileCount; } - /** * Method is exposed for unit testing * @@ -1221,10 +1246,11 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { * MUST be called with the write lock held. * </p> * - * Rolls over the data in the journal files, merging them into a single Provenance Event Log File, and - * compressing and indexing as needed. + * Rolls over the data in the journal files, merging them into a single + * Provenance Event Log File, and compressing and indexing as needed. * - * @param force if true, will force a rollover regardless of whether or not data has been written + * @param force if true, will force a rollover regardless of whether or not + * data has been written * @throws IOException if unable to complete rollover */ private void rollover(final boolean force) throws IOException { @@ -1283,7 +1309,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { logger.error("", ioe); } - if (fileRolledOver != null) { + if (fileRolledOver != null) { final File file = fileRolledOver; @@ -1304,10 +1330,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } //if files were rolled over or if out of retries stop the future - if(fileRolledOver != null || retryAttempts.decrementAndGet() == 0) { + if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) { - if(fileRolledOver== null && retryAttempts.get() == 0){ - logger.error("Failed to merge Journal Files {} after {} attempts. ",journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES); + if (fileRolledOver == null && retryAttempts.get() == 0) { + logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES); } rolloverCompletions.getAndIncrement(); @@ -1322,7 +1348,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } future.cancel(false); - }else{ + } else { logger.warn("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); } } @@ -1427,7 +1453,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } for (final File journalFile : journalFiles) { - if ( journalFile.isDirectory() ) { + if (journalFile.isDirectory()) { continue; } @@ -1473,30 +1499,37 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return mergedFile; } - /** * <p> - * Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records, - * and will be indexed. + * Merges all of the given Journal Files into a single, merged Provenance + * Event Log File. As these records are merged, they will be compressed, if + * the repository is configured to compress records, and will be indexed. * </p> * * <p> - * If the repository is configured to compress the data, the file written to may not be the same as the <code>suggestedMergeFile</code>, as a filename extension of '.gz' may be appended. If the - * journals are successfully merged, the file that they were merged into will be returned. If unable to merge the records (for instance, because the repository has been closed or because the list - * of journal files was empty), this method will return <code>null</code>. + * If the repository is configured to compress the data, the file written to + * may not be the same as the <code>suggestedMergeFile</code>, as a filename + * extension of '.gz' may be appended. If the journals are successfully + * merged, the file that they were merged into will be returned. If unable + * to merge the records (for instance, because the repository has been + * closed or because the list of journal files was empty), this method will + * return <code>null</code>. * </p> * * @param journalFiles the journal files to merge * @param suggestedMergeFile the file to write the merged records to - * @param eventReporter the event reporter to report any warnings or errors to; may be null. + * @param eventReporter the event reporter to report any warnings or errors + * to; may be null. * - * @return the file that the given journals were merged into, or <code>null</code> if no records were merged. + * @return the file that the given journals were merged into, or + * <code>null</code> if no records were merged. * - * @throws IOException if a problem occurs writing to the mergedFile, reading from a journal, or updating the Lucene Index. + * @throws IOException if a problem occurs writing to the mergedFile, + * reading from a journal, or updating the Lucene Index. */ File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException { logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile); - if ( this.closed.get() ) { + if (this.closed.get()) { logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile); return null; } @@ -1539,7 +1572,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { deleteAction.execute(suggestedMergeFile); } catch (final Exception e) { logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", suggestedMergeFile, e.toString()); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", e); } } @@ -1549,13 +1582,13 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events. if (!suggestedMergeFile.delete()) { logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal " - + "file not being able to be displayed. This file should be deleted manually.", suggestedMergeFile); + + "file not being able to be displayed. This file should be deleted manually.", suggestedMergeFile); } final File tocFile = TocUtil.getTocFile(suggestedMergeFile); - if ( tocFile.exists() && !tocFile.delete() ) { + if (tocFile.exists() && !tocFile.delete()) { logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " - + "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile); + + "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile); } } } else { @@ -1619,8 +1652,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } if (eventReporter != null) { - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + - "; it's possible that hte record wasn't completely written to the file. This record will be skipped."); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + + "; it's possible that hte record wasn't completely written to the file. This record will be skipped."); } } @@ -1628,11 +1661,11 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { continue; } - if ( record.getEventTime() < earliestTimestamp ) { + if (record.getEventTime() < earliestTimestamp) { earliestTimestamp = record.getEventTime(); } - if ( record.getEventId() < minEventId ) { + if (record.getEventId() < minEventId) { minEventId = record.getEventId(); } @@ -1735,7 +1768,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { eventQueue.clear(); final String warning = String.format("Indexing Provenance Events for %s has failed %s times. This exceeds the maximum threshold of %s failures, " - + "so no more Provenance Events will be indexed for this Provenance file.", writerFile, indexingFailureCount.get(), MAX_INDEXING_FAILURE_COUNT); + + "so no more Provenance Events will be indexed for this Provenance file.", writerFile, indexingFailureCount.get(), MAX_INDEXING_FAILURE_COUNT); logger.warn(warning); if (eventReporter != null) { eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning); @@ -1814,8 +1847,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); if (eventReporter != null) { - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + - journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); } } @@ -1824,8 +1857,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath()); if (eventReporter != null) { - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + - tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + + tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); } } } @@ -1844,8 +1877,9 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * This method is protected and exists for testing purposes. This allows unit tests to extend this class and - * override the createIndexingAction so that they can mock out the Indexing Action to throw Exceptions, count + * This method is protected and exists for testing purposes. This allows + * unit tests to extend this class and override the createIndexingAction so + * that they can mock out the Indexing Action to throw Exceptions, count * events indexed, etc. */ protected IndexingAction createIndexingAction() { @@ -1965,7 +1999,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { final AtomicInteger retrievalCount = new AtomicInteger(0); final List<File> indexDirectories = indexConfig.getIndexDirectories( query.getStartDate() == null ? null : query.getStartDate().getTime(), - query.getEndDate() == null ? null : query.getEndDate().getTime()); + query.getEndDate() == null ? null : query.getEndDate().getTime()); final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size(), userId); querySubmissionMap.put(query.getIdentifier(), result); @@ -2147,7 +2181,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } Lineage computeLineage(final String flowFileUuid, final NiFiUser user) throws IOException { - return computeLineage(Collections.<String> singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); + return computeLineage(Collections.<String>singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); } private Lineage computeLineage(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, @@ -2175,13 +2209,13 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { event = getEvent(eventId); } catch (final Exception e) { logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e); - final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity()); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String>emptySet(), 1, user.getIdentity()); result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information."); return result; } if (event == null) { - final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity()); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String>emptySet(), 1, user.getIdentity()); result.getResult().setError("Could not find Provenance Event with ID " + eventId); lineageSubmissionMap.put(result.getLineageIdentifier(), result); return result; @@ -2215,7 +2249,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { try { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); return submission; @@ -2228,13 +2262,13 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { case REPLAY: return submitLineageComputation(event.getChildUuids(), user, LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); default: - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); return submission; } } catch (final IOException ioe) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); if (ioe.getMessage() == null) { @@ -2254,7 +2288,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { try { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); return submission; @@ -2267,14 +2301,14 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { case REPLAY: return submitLineageComputation(event.getParentUuids(), user, LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime()); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); return submission; } } } catch (final IOException ioe) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); if (ioe.getMessage() == null) { @@ -2367,7 +2401,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * @return a List of all Provenance Event Log Files, sorted in ascending order by the first Event ID in each file + * @return a List of all Provenance Event Log Files, sorted in ascending + * order by the first Event ID in each file */ private List<File> getSortedLogFiles() { final List<Path> paths = new ArrayList<>(getAllLogFiles()); @@ -2391,7 +2426,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } /** - * Returns the Event ID of the first event in the given Provenance Event Log File. + * Returns the Event ID of the first event in the given Provenance Event Log + * File. * * @param logFile the log file from which to obtain the first Event ID * @return the ID of the first event in the given log file @@ -2536,7 +2572,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { try { final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, - getIndexManager(), indexDir, null, flowFileUuids, maxAttributeChars); + getIndexManager(), indexDir, null, flowFileUuids, maxAttributeChars); final StandardLineageResult result = submission.getResult(); result.update(replaceUnauthorizedWithPlaceholders(matchingRecords, user)); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index 79f7d9f..6c04ecb 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -84,14 +84,26 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { private Authorizer authorizer; // effectively final private ProvenanceAuthorizableFactory resourceFactory; // effectively final + /** + * Default no args constructor for service loading only + */ public VolatileProvenanceRepository() { - final NiFiProperties properties = NiFiProperties.getInstance(); + ringBuffer = null; + searchableFields = null; + searchableAttributes = null; + queryExecService = null; + scheduledExecService = null; + authorizer = null; + resourceFactory = null; + } + + public VolatileProvenanceRepository(final NiFiProperties nifiProperties) { - final int bufferSize = properties.getIntegerProperty(BUFFER_SIZE, DEFAULT_BUFFER_SIZE); + final int bufferSize = nifiProperties.getIntegerProperty(BUFFER_SIZE, DEFAULT_BUFFER_SIZE); ringBuffer = new RingBuffer<>(bufferSize); - final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); - final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); + final String indexedFieldString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); + final String indexedAttrString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); searchableFields = Collections.unmodifiableList(SearchableFieldParser.extractSearchableFields(indexedFieldString, true)); searchableAttributes = Collections.unmodifiableList(SearchableFieldParser.extractSearchableFields(indexedAttrString, false)); @@ -237,7 +249,6 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return result; } - public boolean isAuthorized(final ProvenanceEventRecord event, final NiFiUser user) { if (authorizer == null) { return true; @@ -312,10 +323,8 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { if (!pattern.matcher(eventAttributeValue).matches()) { return false; } - } else { - if (!searchValue.equalsIgnoreCase(eventAttributeValue)) { - return false; - } + } else if (!searchValue.equalsIgnoreCase(eventAttributeValue)) { + return false; } } else { // if FlowFileUUID, search parent & child UUID's also. @@ -363,10 +372,8 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { if (!pattern.matcher(String.valueOf(fieldValue)).matches()) { return false; } - } else { - if (!searchValue.equalsIgnoreCase(String.valueOf(fieldValue))) { - return false; - } + } else if (!searchValue.equalsIgnoreCase(String.valueOf(fieldValue))) { + return false; } } } @@ -455,7 +462,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { } public Lineage computeLineage(final String flowFileUUID, final NiFiUser user) throws IOException { - return computeLineage(Collections.<String> singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null); + return computeLineage(Collections.<String>singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null); } private Lineage computeLineage(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) throws IOException { @@ -480,7 +487,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { final String userId = user.getIdentity(); - final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, userId); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String>emptySet(), 1, userId); result.getResult().setError("Could not find event with ID " + eventId); lineageSubmissionMap.put(result.getLineageIdentifier(), result); return result; @@ -524,7 +531,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { final ProvenanceEventRecord event = getEvent(eventId, user); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); return submission; @@ -537,7 +544,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { case CLONE: return submitLineageComputation(event.getParentUuids(), user, LineageComputationType.EXPAND_PARENTS, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); return submission; @@ -555,7 +562,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { final ProvenanceEventRecord event = getEvent(eventId, user); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); return submission; @@ -568,7 +575,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { case CLONE: return submitLineageComputation(event.getChildUuids(), user, LineageComputationType.EXPAND_CHILDREN, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); return submission; http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java index 7db650d..4190ebb 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java @@ -39,12 +39,13 @@ public class TestVolatileProvenanceRepository { @BeforeClass public static void setup() { - System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestVolatileProvenanceRepository.class.getResource("/nifi.properties").getFile()); } @Test public void testAddAndGet() throws IOException, InterruptedException { - repo = new VolatileProvenanceRepository(); + + repo = new VolatileProvenanceRepository(NiFiProperties.createBasicNiFiProperties(null, null)); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -77,7 +78,7 @@ public class TestVolatileProvenanceRepository { @Test public void testIndexAndCompressOnRolloverAndSubsequentSearchAsync() throws InterruptedException { - repo = new VolatileProvenanceRepository(); + repo = new VolatileProvenanceRepository(NiFiProperties.createBasicNiFiProperties(null, null)); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index a8eac77..6e6cf26 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.script; +import java.io.File; import java.io.FileInputStream; import java.util.ArrayList; import java.util.Collection; @@ -69,9 +70,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); private ScriptEngine scriptEngine = null; + private volatile String kerberosServicePrincipal = null; + private volatile File kerberosConfigFile = null; + private volatile File kerberosServiceKeytab = null; /** - * Returns the valid relationships for this processor as supplied by the script itself. + * Returns the valid relationships for this processor as supplied by the + * script itself. * * @return a Set of Relationships supported by this processor */ @@ -82,7 +87,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { if (instance != null) { try { final Set<Relationship> rels = instance.getRelationships(); - if(rels != null && !rels.isEmpty()){ + if (rels != null && !rels.isEmpty()) { relationships.addAll(rels); } } catch (final Throwable t) { @@ -98,10 +103,19 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { return Collections.unmodifiableSet(relationships); } + @Override + protected void init(final ProcessorInitializationContext context) { + kerberosServicePrincipal = context.getKerberosServicePrincipal(); + kerberosConfigFile = context.getKerberosConfigurationFile(); + kerberosServiceKeytab = context.getKerberosServiceKeytab(); + } + /** - * Returns a list of property descriptors supported by this processor. The list always includes properties such as - * script engine name, script file name, script body name, script arguments, and an external module path. If the - * scripted processor also defines supported properties, those are added to the list as well. + * Returns a list of property descriptors supported by this processor. The + * list always includes properties such as script engine name, script file + * name, script body name, script arguments, and an external module path. If + * the scripted processor also defines supported properties, those are added + * to the list as well. * * @return a List of PropertyDescriptor objects supported by this processor */ @@ -138,11 +152,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } /** - * Returns a PropertyDescriptor for the given name. This is for the user to be able to define their own properties - * which will be available as variables in the script + * Returns a PropertyDescriptor for the given name. This is for the user to + * be able to define their own properties which will be available as + * variables in the script * - * @param propertyDescriptorName used to lookup if any property descriptors exist for that name - * @return a PropertyDescriptor object corresponding to the specified dynamic property name + * @param propertyDescriptorName used to lookup if any property descriptors + * exist for that name + * @return a PropertyDescriptor object corresponding to the specified + * dynamic property name */ @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { @@ -156,8 +173,9 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } /** - * Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's - * properties, as well as reloading the script (from file or the "Script Body" property) + * Performs setup operations when the processor is scheduled to run. This + * includes evaluating the processor's properties, as well as reloading the + * script (from file or the "Script Body" property) * * @param context the context in which to perform the setup operations */ @@ -193,14 +211,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } } - /** - * Handles changes to this processor's properties. If changes are made to script- or engine-related properties, - * the script will be reloaded. + * Handles changes to this processor's properties. If changes are made to + * script- or engine-related properties, the script will be reloaded. * * @param descriptor of the modified property - * @param oldValue non-null property value (previous) - * @param newValue the new property value or if null indicates the property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property */ @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { @@ -212,15 +229,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { || MODULES.equals(descriptor) || SCRIPT_ENGINE.equals(descriptor)) { scriptNeedsReload.set(true); - } else { - if (instance != null) { - // If the script provides a Processor, call its onPropertyModified() method - try { - instance.onPropertyModified(descriptor, oldValue, newValue); - } catch (final Exception e) { - final String message = "Unable to invoke onPropertyModified from script Processor: " + e; - logger.error(message, e); - } + } else if (instance != null) { + // If the script provides a Processor, call its onPropertyModified() method + try { + instance.onPropertyModified(descriptor, oldValue, newValue); + } catch (final Exception e) { + final String message = "Unable to invoke onPropertyModified from script Processor: " + e; + logger.error(message, e); } } } @@ -354,6 +369,21 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { public NodeTypeProvider getNodeTypeProvider() { return InvokeScriptedProcessor.super.getNodeTypeProvider(); } + + @Override + public String getKerberosServicePrincipal() { + return InvokeScriptedProcessor.this.kerberosServicePrincipal; + } + + @Override + public File getKerberosServiceKeytab() { + return InvokeScriptedProcessor.this.kerberosServiceKeytab; + } + + @Override + public File getKerberosConfigurationFile() { + return InvokeScriptedProcessor.this.kerberosConfigFile; + } }); } catch (final Exception e) { logger.error("Unable to initialize scripted Processor: " + e.getLocalizedMessage(), e); @@ -386,12 +416,15 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } /** - * Invokes the validate() routine provided by the script, allowing for custom validation code. - * This method assumes there is a valid Processor defined in the script and it has been loaded - * by the InvokeScriptedProcessor processor + * Invokes the validate() routine provided by the script, allowing for + * custom validation code. This method assumes there is a valid Processor + * defined in the script and it has been loaded by the + * InvokeScriptedProcessor processor * - * @param context The validation context to be passed into the custom validate method - * @return A collection of ValidationResults returned by the custom validate method + * @param context The validation context to be passed into the custom + * validate method + * @return A collection of ValidationResults returned by the custom validate + * method */ @Override protected Collection<ValidationResult> customValidate(final ValidationContext context) { @@ -446,17 +479,19 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } /** - * Invokes the onTrigger() method of the scripted processor. If the script failed to reload, the processor yields - * until the script can be reloaded successfully. If the scripted processor's onTrigger() method throws an - * exception, a ProcessException will be thrown. If no processor is defined by the script, an error is logged - * with the system. + * Invokes the onTrigger() method of the scripted processor. If the script + * failed to reload, the processor yields until the script can be reloaded + * successfully. If the scripted processor's onTrigger() method throws an + * exception, a ProcessException will be thrown. If no processor is defined + * by the script, an error is logged with the system. * - * @param context provides access to convenience methods for obtaining - * property values, delaying the scheduling of the processor, provides - * access to Controller Services, etc. - * @param sessionFactory provides access to a {@link ProcessSessionFactory}, which - * can be used for accessing FlowFiles, etc. - * @throws ProcessException if the scripted processor's onTrigger() method throws an exception + * @param context provides access to convenience methods for obtaining + * property values, delaying the scheduling of the processor, provides + * access to Controller Services, etc. + * @param sessionFactory provides access to a {@link ProcessSessionFactory}, + * which can be used for accessing FlowFiles, etc. + * @throws ProcessException if the scripted processor's onTrigger() method + * throws an exception */ @Override public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorDiskUsage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorDiskUsage.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorDiskUsage.java index f2644b5..1cda544 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorDiskUsage.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorDiskUsage.java @@ -20,7 +20,6 @@ import java.io.File; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -33,13 +32,12 @@ import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Tags({"disk", "storage", "warning", "monitoring", "repo"}) -@CapabilityDescription("Checks the amount of storage space available for the Content Repository and FlowFile Repository" - + " and warns (via a log message and a System-Level Bulletin) if the partition on which either repository exceeds" +@CapabilityDescription("Checks the amount of storage space available for the specified directory" + + " and warns (via a log message and a System-Level Bulletin) if the partition on which it lives exceeds" + " some configurable threshold of storage space") public class MonitorDiskUsage extends AbstractReportingTask { @@ -47,50 +45,50 @@ public class MonitorDiskUsage extends AbstractReportingTask { private static final Pattern PERCENT_PATTERN = Pattern.compile("(\\d+{1,2})%"); - public static final PropertyDescriptor CONTENT_REPO_THRESHOLD = new PropertyDescriptor.Builder() - .name("Content Repository Threshold") - .description("The threshold at which a bulletin will be generated to indicate that the disk usage of the Content Repository is of concern") + public static final PropertyDescriptor DIR_THRESHOLD = new PropertyDescriptor.Builder() + .name("Threshold") + .description("The threshold at which a bulletin will be generated to indicate that the disk usage of the partition on which the directory found is of concern") .required(true) .addValidator(StandardValidators.createRegexMatchingValidator(PERCENT_PATTERN)) .defaultValue("80%") .build(); - public static final PropertyDescriptor FLOWFILE_REPO_THRESHOLD = new PropertyDescriptor.Builder() - .name("FlowFile Repository Threshold") - .description("The threshold at which a bulletin will be generated to indicate that the disk usage of the FlowFile Repository is of concern") + + public static final PropertyDescriptor DIR_LOCATION = new PropertyDescriptor.Builder() + .name("Directory Location") + .description("The directory path of the partition to be monitored.") .required(true) - .addValidator(StandardValidators.createRegexMatchingValidator(PERCENT_PATTERN)) - .defaultValue("80%") + .addValidator(StandardValidators.createDirectoryExistsValidator(false, false)) + .build(); + + public static final PropertyDescriptor DIR_DISPLAY_NAME = new PropertyDescriptor.Builder() + .name("Directory Display Name") + .description("The name to display for the directory in alerts.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("Un-Named") .build(); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> descriptors = new ArrayList<>(2); - descriptors.add(CONTENT_REPO_THRESHOLD); - descriptors.add(FLOWFILE_REPO_THRESHOLD); + descriptors.add(DIR_THRESHOLD); + descriptors.add(DIR_LOCATION); return descriptors; } @Override public void onTrigger(final ReportingContext context) { - final String contentRepoTresholdValue = context.getProperty(CONTENT_REPO_THRESHOLD).getValue(); - final Matcher contentRepoMatcher = PERCENT_PATTERN.matcher(contentRepoTresholdValue.trim()); - contentRepoMatcher.find(); - final String contentRepoPercentageVal = contentRepoMatcher.group(1); - final int contentRepoThreshold = Integer.parseInt(contentRepoPercentageVal); - - final String flowfileRepoTresholdValue = context.getProperty(FLOWFILE_REPO_THRESHOLD).getValue(); - final Matcher flowFileRepoMatcher = PERCENT_PATTERN.matcher(flowfileRepoTresholdValue.trim()); - flowFileRepoMatcher.find(); - final String flowFileRepoPercentageVal = flowFileRepoMatcher.group(1); - final int flowFileRepoThreshold = Integer.parseInt(flowFileRepoPercentageVal); - - final NiFiProperties properties = NiFiProperties.getInstance(); - - for (final Map.Entry<String, Path> entry : properties.getContentRepositoryPaths().entrySet()) { - checkThreshold("Content Repository (" + entry.getKey() + ")", entry.getValue(), contentRepoThreshold, context); - } + final String thresholdValue = context.getProperty(DIR_THRESHOLD).getValue(); + final Matcher thresholdMatcher = PERCENT_PATTERN.matcher(thresholdValue.trim()); + thresholdMatcher.find(); + final String thresholdPercentageVal = thresholdMatcher.group(1); + final int contentRepoThreshold = Integer.parseInt(thresholdPercentageVal); + + final File dir = new File(context.getProperty(DIR_LOCATION).getValue()); + final String dirName = context.getProperty(DIR_DISPLAY_NAME).getValue(); + + checkThreshold(dirName, dir.toPath(), contentRepoThreshold, context); - checkThreshold("FlowFile Repository", properties.getFlowFileRepositoryPath(), flowFileRepoThreshold, context); } static void checkThreshold(final String pathName, final Path path, final int threshold, final ReportingContext context) { http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java index db8f394..52db337 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java @@ -32,6 +32,8 @@ import org.slf4j.Logger; import java.io.File; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -43,10 +45,11 @@ public class MonitorMemoryTest { @Before public void before() throws Exception { System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); - NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); - NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); - NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider"); - fc = this.buildFlowControllerForTest(); + final Map<String, String> addProps = new HashMap<>(); + addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); + addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); + addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider"); + fc = this.buildFlowControllerForTest(addProps); } @After @@ -127,17 +130,15 @@ public class MonitorMemoryTest { return capturingLogger; } - private FlowController buildFlowControllerForTest() throws Exception { - NiFiProperties properties = NiFiProperties.getInstance(); - - properties.setProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, - MockProvenanceRepository.class.getName()); - properties.setProperty("nifi.remote.input.socket.port", ""); - properties.setProperty("nifi.remote.input.secure", ""); + private FlowController buildFlowControllerForTest(final Map<String, String> addProps) throws Exception { + addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName()); + addProps.put("nifi.remote.input.socket.port", ""); + addProps.put("nifi.remote.input.secure", ""); + final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, addProps); return FlowController.createStandaloneInstance( mock(FlowFileEventRepository.class), - properties, + nifiProperties, mock(Authorizer.class), mock(AuditService.class), null, http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index e07b728..e91c32f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.hbase; +import java.io.File; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -55,7 +56,6 @@ import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.NiFiProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -91,6 +91,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme private List<PropertyDescriptor> properties; private KerberosProperties kerberosProperties; + private volatile String kerberosServicePrincipal = null; + private volatile File kerberosConfigFile = null; + private volatile File kerberosServiceKeytab = null; // Holder of cached Configuration information so validation does not reload the same config over and over private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>(); @@ -108,10 +111,13 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme props.add(ZOOKEEPER_ZNODE_PARENT); props.add(HBASE_CLIENT_RETRIES); this.properties = Collections.unmodifiableList(props); + kerberosServicePrincipal = config.getKerberosServicePrincipal(); + kerberosConfigFile = config.getKerberosConfigurationFile(); + kerberosServiceKeytab = config.getKerberosServiceKeytab(); } protected KerberosProperties getKerberosProperties() { - return KerberosProperties.create(NiFiProperties.getInstance()); + return new KerberosProperties(kerberosConfigFile); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java index 0854d28..33536b5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java @@ -32,7 +32,6 @@ import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -71,13 +70,9 @@ public class TestHBase_1_1_2_ClientService { System.setProperty("java.security.krb5.realm", "nifi.com"); System.setProperty("java.security.krb5.kdc", "nifi.kdc"); - NiFiProperties niFiPropertiesWithKerberos = Mockito.mock(NiFiProperties.class); - when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(new File("src/test/resources/krb5.conf")); - kerberosPropsWithFile = KerberosProperties.create(niFiPropertiesWithKerberos); + kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf")); - NiFiProperties niFiPropertiesWithoutKerberos = Mockito.mock(NiFiProperties.class); - when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(null); - kerberosPropsWithoutFile = KerberosProperties.create(niFiPropertiesWithoutKerberos); + kerberosPropsWithoutFile = new KerberosProperties(null); } @Test