NIFI-3356: Initial implementation of writeahead provenance repository - The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
- Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles - Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream - Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. - Allow nifi.properties to specify multiple directories for FlowFile Repository - If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. - Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356: Fixed NPE NIFI-3356: Added debug-level performance monitoring NIFI-3356: Updates to unit tests that failed after rebasing against master NIFI-3356: Incorporated PR review feedback NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/96ed405d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/96ed405d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/96ed405d Branch: refs/heads/master Commit: 96ed405d708894ee5400ebbdbf335325219faa09 Parents: 8d467f3 Author: Mark Payne <marka...@hotmail.com> Authored: Fri Dec 9 10:52:33 2016 -0500 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Wed Feb 22 12:40:06 2017 -0500 ---------------------------------------------------------------------- .../nifi/provenance/ProvenanceEventRecord.java | 10 + .../java/org/apache/nifi/bootstrap/RunNiFi.java | 10 +- .../nifi/provenance/ProgressiveResult.java | 34 + .../nifi/provenance/StandardLineageResult.java | 47 +- .../StandardProvenanceEventRecord.java | 14 +- .../nifi/provenance/StandardQueryResult.java | 91 ++- .../nifi/provenance/lineage/EventNode.java | 3 +- .../nifi/repository/schema/FieldMapRecord.java | 5 + .../nifi/repository/schema/RecordSchema.java | 2 +- .../repository/schema/SchemaRecordReader.java | 10 +- .../repository/schema/SchemaRecordWriter.java | 6 +- .../org/wali/MinimalLockingWriteAheadLog.java | 165 ++--- .../wali/TestMinimalLockingWriteAheadLog.java | 77 +- .../src/main/asciidoc/administration-guide.adoc | 88 ++- .../org/apache/nifi/controller/Triggerable.java | 2 +- .../nifi/provenance/IdentifierLookup.java | 88 +++ .../nifi/provenance/ProvenanceRepository.java | 7 +- .../lineage/ComputeLineageResult.java | 3 + .../apache/nifi/provenance/search/Query.java | 4 + .../nifi/provenance/search/QueryResult.java | 3 + .../provenance/MockProvenanceRepository.java | 2 +- .../java/org/apache/nifi/util/MockFlowFile.java | 24 +- .../apache/nifi/controller/AbstractPort.java | 2 +- .../apache/nifi/controller/StandardFunnel.java | 2 +- .../apache/nifi/controller/FlowController.java | 40 +- .../repository/FileSystemRepository.java | 1 + .../repository/StandardProcessSession.java | 114 ++- .../WriteAheadFlowFileRepository.java | 36 +- .../claim/ContentClaimWriteCache.java | 166 +++++ .../repository/schema/ContentClaimFieldMap.java | 33 + .../schema/RepositoryRecordUpdate.java | 6 +- .../scheduling/EventDrivenSchedulingAgent.java | 4 +- .../history/ProcessorStatusDescriptor.java | 48 +- .../VolatileComponentStatusRepository.java | 4 +- .../tasks/ContinuallyRunProcessorTask.java | 16 +- .../repository/TestFileSystemRepository.java | 39 +- .../repository/TestStandardProcessSession.java | 11 +- .../TestWriteAheadFlowFileRepository.java | 261 +++++++ .../claim/TestContentClaimWriteCache.java | 98 +++ .../nifi-framework/nifi-resources/pom.xml | 4 +- .../nifi/provenance/AbstractRecordWriter.java | 22 +- .../provenance/ByteArraySchemaRecordWriter.java | 17 +- .../EventIdFirstSchemaRecordReader.java | 145 ++++ .../EventIdFirstSchemaRecordWriter.java | 241 ++++++ .../nifi/provenance/IndexConfiguration.java | 12 +- .../PersistentProvenanceRepository.java | 260 +++---- .../provenance/RepositoryConfiguration.java | 140 +++- .../nifi/provenance/StandardRecordReader.java | 7 +- .../nifi/provenance/StandardRecordWriter.java | 12 +- .../WriteAheadProvenanceRepository.java | 280 +++++++ .../authorization/EventAuthorizer.java | 119 +++ .../authorization/EventTransformer.java | 42 ++ .../authorization/UserEventAuthorizer.java | 76 ++ .../nifi/provenance/index/EventIndex.java | 160 ++++ .../provenance/index/EventIndexSearcher.java | 29 + .../nifi/provenance/index/EventIndexWriter.java | 44 ++ .../provenance/index/SearchFailedException.java | 28 + .../provenance/index/lucene/CachedQuery.java | 33 + .../index/lucene/CommitPreference.java | 24 + .../lucene/ConvertEventToLuceneDocument.java | 143 ++++ .../provenance/index/lucene/EventIndexTask.java | 244 ++++++ .../index/lucene/IndexDirectoryManager.java | 358 +++++++++ .../provenance/index/lucene/IndexLocation.java | 90 +++ .../index/lucene/IndexableDocument.java | 47 ++ .../lucene/LatestEventsPerProcessorQuery.java | 77 ++ .../index/lucene/LatestEventsQuery.java | 55 ++ .../index/lucene/LuceneCacheWarmer.java | 67 ++ .../index/lucene/LuceneEventIndex.java | 737 +++++++++++++++++++ .../nifi/provenance/index/lucene/QueryTask.java | 208 ++++++ .../provenance/index/lucene/StoredDocument.java | 39 + .../provenance/lucene/CachingIndexManager.java | 176 +++-- .../provenance/lucene/DeleteIndexAction.java | 12 +- .../nifi/provenance/lucene/DocsReader.java | 65 +- .../lucene/DocumentToEventConverter.java | 30 + .../nifi/provenance/lucene/IndexManager.java | 16 +- .../nifi/provenance/lucene/IndexSearch.java | 38 +- .../nifi/provenance/lucene/IndexingAction.java | 16 +- .../nifi/provenance/lucene/LineageQuery.java | 23 +- .../lucene/LuceneEventIndexSearcher.java | 92 +++ .../lucene/LuceneEventIndexWriter.java | 144 ++++ .../provenance/lucene/SimpleIndexManager.java | 336 ++++++--- .../nifi/provenance/schema/EventFieldNames.java | 59 ++ .../schema/EventIdFirstHeaderSchema.java | 52 ++ .../nifi/provenance/schema/EventRecord.java | 108 +-- .../provenance/schema/EventRecordFields.java | 92 +-- .../schema/LookupTableEventRecord.java | 363 +++++++++ .../schema/LookupTableEventRecordFields.java | 89 +++ .../schema/LookupTableEventSchema.java | 94 +++ .../schema/ProvenanceEventSchema.java | 10 +- .../serialization/CompressableRecordReader.java | 107 ++- .../serialization/CompressableRecordWriter.java | 80 +- .../serialization/EmptyRecordReader.java | 12 + .../serialization/EventFileCompressor.java | 188 +++++ .../provenance/serialization/RecordReader.java | 23 +- .../provenance/serialization/RecordReaders.java | 19 +- .../provenance/serialization/RecordWriter.java | 13 +- .../provenance/serialization/RecordWriters.java | 10 +- .../serialization/StorageSummary.java | 72 ++ .../nifi/provenance/store/EventFileManager.java | 109 +++ .../nifi/provenance/store/EventStore.java | 123 ++++ .../provenance/store/EventStorePartition.java | 113 +++ .../provenance/store/PartitionedEventStore.java | 284 +++++++ .../store/PartitionedWriteAheadEventStore.java | 142 ++++ .../provenance/store/RecordReaderFactory.java | 29 + .../provenance/store/RecordWriterFactory.java | 28 + .../provenance/store/RecordWriterLease.java | 93 +++ .../nifi/provenance/store/StorageResult.java | 67 ++ .../store/WriteAheadStorePartition.java | 637 ++++++++++++++++ .../iterator/AuthorizingEventIterator.java | 63 ++ .../store/iterator/EventIterator.java | 56 ++ .../SelectiveRecordReaderEventIterator.java | 174 +++++ .../SequentialRecordReaderEventIterator.java | 115 +++ .../nifi/provenance/toc/StandardTocReader.java | 68 +- .../apache/nifi/provenance/toc/TocReader.java | 18 +- .../org/apache/nifi/provenance/toc/TocUtil.java | 2 +- .../nifi/provenance/util/CloseableUtil.java | 45 ++ .../nifi/provenance/util/DirectoryUtils.java | 96 +++ .../nifi/provenance/util/DumpEventFile.java | 79 ++ .../provenance/util/NamedThreadFactory.java | 47 ++ .../provenance/util/StorageSummaryEvent.java | 185 +++++ ....apache.nifi.provenance.ProvenanceRepository | 3 +- .../AbstractTestRecordReaderWriter.java | 80 +- ...estEventIdFirstSchemaRecordReaderWriter.java | 477 ++++++++++++ .../TestPersistentProvenanceRepository.java | 193 ++--- .../TestSchemaRecordReaderWriter.java | 105 ++- .../TestStandardRecordReaderWriter.java | 29 +- .../org/apache/nifi/provenance/TestUtil.java | 19 + .../index/lucene/TestEventIndexTask.java | 142 ++++ .../index/lucene/TestIndexDirectoryManager.java | 100 +++ .../index/lucene/TestLuceneEventIndex.java | 538 ++++++++++++++ .../lucene/TestCachingIndexManager.java | 28 +- .../lucene/TestSimpleIndexManager.java | 104 ++- .../provenance/store/ArrayListEventStore.java | 155 ++++ .../provenance/store/TestEventFileManager.java | 240 ++++++ .../TestPartitionedWriteAheadEventStore.java | 468 ++++++++++++ .../store/TestWriteAheadStorePartition.java | 111 +++ .../TestSelectiveRecordReaderEventIterator.java | 146 ++++ .../VolatileProvenanceRepository.java | 9 +- 138 files changed, 12165 insertions(+), 1030 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java index 7ba1622..b05bd85 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java @@ -80,6 +80,16 @@ public interface ProvenanceEventRecord { Map<String, String> getAttributes(); /** + * Returns the attribute with the given name + * + * @param attributeName the name of the attribute to get + * @return the attribute with the given name or <code>null</code> if no attribute exists with the given name + */ + default String getAttribute(String attributeName) { + return getAttributes().get(attributeName); + } + + /** * @return all FlowFile attributes that existed on the FlowFile before this * event occurred */ http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index 8d92c44..10d5cde 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -1067,11 +1067,14 @@ public class RunNiFi { Process process = builder.start(); handleLogging(process); Long pid = getPid(process, cmdLogger); - if (pid != null) { + if (pid == null) { + cmdLogger.info("Launched Apache NiFi but could not determined the Process ID"); + } else { nifiPid = pid; final Properties pidProperties = new Properties(); pidProperties.setProperty(PID_KEY, String.valueOf(nifiPid)); savePidProperties(pidProperties, cmdLogger); + cmdLogger.info("Launched Apache NiFi with Process ID " + pid); } shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); @@ -1129,11 +1132,14 @@ public class RunNiFi { handleLogging(process); pid = getPid(process, defaultLogger); - if (pid != null) { + if (pid == null) { + cmdLogger.info("Launched Apache NiFi but could not obtain the Process ID"); + } else { nifiPid = pid; final Properties pidProperties = new Properties(); pidProperties.setProperty(PID_KEY, String.valueOf(nifiPid)); savePidProperties(pidProperties, defaultLogger); + cmdLogger.info("Launched Apache NiFi with Process ID " + pid); } shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java new file mode 100644 index 0000000..e2dd33b --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance; + +import java.util.Collection; + +/** + * A Provenance query result that is capable of being updated + */ +public interface ProgressiveResult { + + void update(Collection<ProvenanceEventRecord> records, long totalHitCount); + + void setError(String error); + + long getTotalHitCount(); + + boolean isFinished(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java index 78b3188..2b7b34a 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java @@ -44,7 +44,7 @@ import org.apache.nifi.provenance.lineage.LineageNode; /** * */ -public class StandardLineageResult implements ComputeLineageResult { +public class StandardLineageResult implements ComputeLineageResult, ProgressiveResult { public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); private static final Logger logger = LoggerFactory.getLogger(StandardLineageResult.class); @@ -66,6 +66,7 @@ public class StandardLineageResult implements ComputeLineageResult { private int numCompletedSteps = 0; private volatile boolean canceled = false; + private final Object completionMonitor = new Object(); public StandardLineageResult(final int numSteps, final Collection<String> flowFileUuids) { this.numSteps = numSteps; @@ -162,6 +163,7 @@ public class StandardLineageResult implements ComputeLineageResult { } } + @Override public void setError(final String error) { writeLock.lock(); try { @@ -178,7 +180,10 @@ public class StandardLineageResult implements ComputeLineageResult { } } - public void update(final Collection<ProvenanceEventRecord> records) { + @Override + public void update(final Collection<ProvenanceEventRecord> records, final long totalHitCount) { + boolean computationComplete = false; + writeLock.lock(); try { relevantRecords.addAll(records); @@ -187,12 +192,22 @@ public class StandardLineageResult implements ComputeLineageResult { updateExpiration(); if (numCompletedSteps >= numSteps && error == null) { + computationComplete = true; computeLineage(); computationNanos = System.nanoTime() - creationNanos; } } finally { writeLock.unlock(); } + + if (computationComplete) { + final long computationMillis = TimeUnit.NANOSECONDS.toMillis(computationNanos); + logger.info("Completed computation of lineage for FlowFile UUID(s) {} comprised of {} steps in {} millis", flowFileUuids, numSteps, computationMillis); + + synchronized (completionMonitor) { + completionMonitor.notifyAll(); + } + } } /** @@ -201,6 +216,7 @@ public class StandardLineageResult implements ComputeLineageResult { * useful after all of the records have been successfully obtained */ private void computeLineage() { + logger.debug("Computing lineage with the following events: {}", relevantRecords); final long startNanos = System.nanoTime(); nodes.clear(); @@ -324,4 +340,31 @@ public class StandardLineageResult implements ComputeLineageResult { private void updateExpiration() { expirationDate = new Date(System.currentTimeMillis() + TTL); } + + @Override + public boolean awaitCompletion(final long time, final TimeUnit unit) throws InterruptedException { + final long finishTime = System.currentTimeMillis() + unit.toMillis(time); + synchronized (completionMonitor) { + while (!isFinished()) { + final long millisToWait = finishTime - System.currentTimeMillis(); + if (millisToWait > 0) { + completionMonitor.wait(millisToWait); + } else { + return isFinished(); + } + } + } + + return isFinished(); + } + + @Override + public long getTotalHitCount() { + readLock.lock(); + try { + return relevantRecords.size(); + } finally { + readLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java index f8f4055..ac60d4f 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java @@ -67,7 +67,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor private final Map<String, String> previousAttributes; private final Map<String, String> updatedAttributes; - private volatile long eventId; + private volatile long eventId = -1L; private StandardProvenanceEventRecord(final Builder builder) { this.eventTime = builder.eventTime; @@ -369,14 +369,22 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor return false; } - if (a == null && b != null) { + if (a == null && b != null && !b.isEmpty()) { return true; } - if (a != null && b == null) { + if (a == null && b.isEmpty()) { + return false; + } + + if (a != null && !a.isEmpty() && b == null) { return true; } + if (a.isEmpty() && b == null) { + return false; + } + if (a.size() != b.size()) { return true; } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java index 5c09e8e..2777339 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java @@ -22,7 +22,7 @@ import java.util.Comparator; import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -31,8 +31,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QueryResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class StandardQueryResult implements QueryResult { +public class StandardQueryResult implements QueryResult, ProgressiveResult { + private static final Logger logger = LoggerFactory.getLogger(StandardQueryResult.class); public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); private final Query query; @@ -44,12 +47,12 @@ public class StandardQueryResult implements QueryResult { private final Lock writeLock = rwLock.writeLock(); // guarded by writeLock - private final Set<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator()); - private long totalHitCount; + private final SortedSet<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator()); private int numCompletedSteps = 0; private Date expirationDate; private String error; private long queryTime; + private final Object completionMonitor = new Object(); private volatile boolean canceled = false; @@ -65,22 +68,7 @@ public class StandardQueryResult implements QueryResult { public List<ProvenanceEventRecord> getMatchingEvents() { readLock.lock(); try { - if (matchingRecords.size() <= query.getMaxResults()) { - return new ArrayList<>(matchingRecords); - } - - final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults()); - - int i = 0; - final Iterator<ProvenanceEventRecord> itr = matchingRecords.iterator(); - while (itr.hasNext()) { - copy.add(itr.next()); - if (++i >= query.getMaxResults()) { - break; - } - } - - return copy; + return new ArrayList<>(matchingRecords); } finally { readLock.unlock(); } @@ -137,7 +125,7 @@ public class StandardQueryResult implements QueryResult { public boolean isFinished() { readLock.lock(); try { - return numCompletedSteps >= numSteps || canceled; + return numCompletedSteps >= numSteps || canceled || matchingRecords.size() >= query.getMaxResults(); } finally { readLock.unlock(); } @@ -147,6 +135,7 @@ public class StandardQueryResult implements QueryResult { this.canceled = true; } + @Override public void setError(final String error) { writeLock.lock(); try { @@ -163,22 +152,74 @@ public class StandardQueryResult implements QueryResult { } } - public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) { + @Override + public void update(final Collection<ProvenanceEventRecord> newEvents, final long totalHits) { + boolean queryComplete = false; + writeLock.lock(); try { - this.matchingRecords.addAll(matchingRecords); - this.totalHitCount += totalHits; + if (isFinished()) { + return; + } + + this.matchingRecords.addAll(newEvents); + + // If we've added more records than the query's max, then remove the trailing elements. + // We do this, rather than avoiding the addition of the elements because we want to choose + // the events with the largest ID. + if (matchingRecords.size() > query.getMaxResults()) { + final Iterator<ProvenanceEventRecord> itr = matchingRecords.iterator(); + for (int i = 0; i < query.getMaxResults(); i++) { + itr.next(); + } + + while (itr.hasNext()) { + itr.next(); + itr.remove(); + } + } numCompletedSteps++; updateExpiration(); - if (numCompletedSteps >= numSteps) { + if (numCompletedSteps >= numSteps || this.matchingRecords.size() >= query.getMaxResults()) { final long searchNanos = System.nanoTime() - creationNanos; queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS); + queryComplete = true; + + if (numCompletedSteps >= numSteps) { + logger.info("Completed {} comprised of {} steps in {} millis", query, numSteps, queryTime); + } else { + logger.info("Completed {} comprised of {} steps in {} millis (only completed {} steps because the maximum number of results was reached)", + query, numSteps, queryTime, numCompletedSteps); + } } } finally { writeLock.unlock(); } + + if (queryComplete) { + synchronized (completionMonitor) { + completionMonitor.notifyAll(); + } + } + } + + @Override + public boolean awaitCompletion(final long time, final TimeUnit unit) throws InterruptedException { + final long finishTime = System.currentTimeMillis() + unit.toMillis(time); + synchronized (completionMonitor) { + while (!isFinished()) { + final long millisToWait = finishTime - System.currentTimeMillis(); + if (millisToWait > 0) { + completionMonitor.wait(millisToWait); + } else { + return isFinished(); + } + } + } + + return isFinished(); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java index de516cb..4906ea3 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java @@ -18,7 +18,6 @@ package org.apache.nifi.provenance.lineage; import java.util.List; -import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -58,7 +57,7 @@ public class EventNode implements ProvenanceEventLineageNode { @Override public String getFlowFileUuid() { - return record.getAttributes().get(CoreAttributes.UUID.key()); + return record.getFlowFileUuid(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java index c9368a7..acebcb9 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java @@ -29,6 +29,11 @@ public class FieldMapRecord implements Record { this.values = convertFieldToName(values); } + public FieldMapRecord(final RecordSchema schema, final Map<String, Object> values) { + this.schema = schema; + this.values = new HashMap<>(values); + } + private static Map<String, Object> convertFieldToName(final Map<RecordField, Object> map) { final Map<String, Object> nameMap = new HashMap<>(map.size()); for (final Map.Entry<RecordField, Object> entry : map.entrySet()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java index 965254b..fe18765 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java @@ -120,8 +120,8 @@ public class RecordSchema { @SuppressWarnings("unchecked") private static RecordField readField(final DataInputStream dis) throws IOException { - final Map<String, Object> schemaFieldMap = new HashMap<>(); final int numElementsToRead = dis.readInt(); + final Map<String, Object> schemaFieldMap = new HashMap<>(numElementsToRead); for (int i = 0; i < numElementsToRead; i++) { final String fieldName = dis.readUTF(); final String typeName = dis.readUTF(); http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java index b296b13..84f3532 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java @@ -109,7 +109,15 @@ public class SchemaRecordReader { } } - return readFieldValue(in, field.getFieldType(), field.getFieldName(), field.getSubFields()); + try { + return readFieldValue(in, field.getFieldType(), field.getFieldName(), field.getSubFields()); + } catch (final EOFException eof) { + final EOFException exception = new EOFException("Failed to read field '" + field.getFieldName() + "'"); + exception.addSuppressed(eof); + throw exception; + } catch (final IOException ioe) { + throw new IOException("Failed to read field '" + field.getFieldName() + "'", ioe); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java index 3e4a059..5305e5b 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java @@ -44,8 +44,12 @@ public class SchemaRecordWriter { } private void writeRecordFields(final Record record, final OutputStream out) throws IOException { + writeRecordFields(record, record.getSchema(), out); + } + + private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out) throws IOException { final DataOutputStream dos = out instanceof DataOutputStream ? (DataOutputStream) out : new DataOutputStream(out); - for (final RecordField field : record.getSchema().getFields()) { + for (final RecordField field : schema.getFields()) { final Object value = record.getFieldValue(field); try { http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 12e2b10..1a9e219 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -18,6 +18,9 @@ package org.wali; import static java.util.Objects.requireNonNull; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; @@ -55,12 +58,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -230,7 +230,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor final long transactionId = transactionIdGenerator.getAndIncrement(); if (logger.isTraceEnabled()) { for (final T record : records) { - logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record}); + logger.trace("Partition {} performing Transaction {}: {}", new Object[] {partition, transactionId, record}); } } @@ -670,11 +670,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private final Path editDirectory; private final int writeAheadLogVersion; - private final Lock lock = new ReentrantLock(); private DataOutputStream dataOut = null; private FileOutputStream fileOut = null; - private boolean blackListed = false; - private boolean closed = false; + private volatile boolean blackListed = false; + private volatile boolean closed = false; private DataInputStream recoveryIn; private int recoveryVersion; private String currentJournalFilename = ""; @@ -707,26 +706,15 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } public boolean tryClaim() { - final boolean obtainedLock = lock.tryLock(); - if (!obtainedLock) { - return false; - } - - // Check if the partition is blacklisted. If so, unlock it and return false. Otherwise, - // leave it locked and return true, so that the caller will need to unlock. - if (blackListed) { - lock.unlock(); - return false; - } - - return true; + return !blackListed; } public void releaseClaim() { - lock.unlock(); } public void close() { + this.closed = true; + // Note that here we are closing fileOut and NOT dataOut. // This is very much intentional, not an oversight. This is done because of // the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream, @@ -761,18 +749,12 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } } - this.closed = true; this.dataOut = null; this.fileOut = null; } public void blackList() { - lock.lock(); - try { - blackListed = true; - } finally { - lock.unlock(); - } + blackListed = true; logger.debug("Blacklisted {}", this); } @@ -783,55 +765,50 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * @throws IOException if failure to rollover */ public OutputStream rollover() throws IOException { - lock.lock(); - try { - // Note that here we are closing fileOut and NOT dataOut. See the note in the close() - // method to understand the logic behind this. - final OutputStream oldOutputStream = fileOut; - dataOut = null; - fileOut = null; + // Note that here we are closing fileOut and NOT dataOut. See the note in the close() + // method to understand the logic behind this. + final OutputStream oldOutputStream = fileOut; + dataOut = null; + fileOut = null; - this.serde = serdeFactory.createSerDe(null); - final Path editPath = getNewEditPath(); - final FileOutputStream fos = new FileOutputStream(editPath.toFile()); + this.serde = serdeFactory.createSerDe(null); + final Path editPath = getNewEditPath(); + final FileOutputStream fos = new FileOutputStream(editPath.toFile()); + try { + final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); + outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); + outStream.writeInt(writeAheadLogVersion); + outStream.writeUTF(serde.getClass().getName()); + outStream.writeInt(serde.getVersion()); + serde.writeHeader(outStream); + + outStream.flush(); + dataOut = outStream; + fileOut = fos; + } catch (final IOException ioe) { try { - final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); - outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); - outStream.writeInt(writeAheadLogVersion); - outStream.writeUTF(serde.getClass().getName()); - outStream.writeInt(serde.getVersion()); - serde.writeHeader(outStream); - - outStream.flush(); - dataOut = outStream; - fileOut = fos; - } catch (final IOException ioe) { - try { - oldOutputStream.close(); - } catch (final IOException ioe2) { - ioe.addSuppressed(ioe2); - } - - logger.error("Failed to create new journal for {} due to {}", new Object[] {this, ioe.toString()}, ioe); - try { - fos.close(); - } catch (final IOException innerIOE) { - } - - dataOut = null; - fileOut = null; - blackList(); + oldOutputStream.close(); + } catch (final IOException ioe2) { + ioe.addSuppressed(ioe2); + } - throw ioe; + logger.error("Failed to create new journal for {} due to {}", new Object[] {this, ioe.toString()}, ioe); + try { + fos.close(); + } catch (final IOException innerIOE) { } - currentJournalFilename = editPath.toFile().getName(); + dataOut = null; + fileOut = null; + blackList(); - blackListed = false; - return oldOutputStream; - } finally { - lock.unlock(); + throw ioe; } + + currentJournalFilename = editPath.toFile().getName(); + + blackListed = false; + return oldOutputStream; } private long getJournalIndex(final File file) { @@ -939,33 +916,39 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor return true; } - public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) - throws IOException { - if (this.closed) { - throw new IllegalStateException("Partition is closed"); - } + public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); + final DataOutputStream out = new DataOutputStream(baos)) { - final DataOutputStream out = dataOut; - out.writeLong(transactionId); + out.writeLong(transactionId); + final int numEditsToSerialize = records.size(); + int editsSerialized = 0; + for (final S record : records) { + final Object recordId = serde.getRecordIdentifier(record); + final S previousVersion = recordMap.get(recordId); - final int numEditsToSerialize = records.size(); - int editsSerialized = 0; - for (final S record : records) { - final Object recordId = serde.getRecordIdentifier(record); - final S previousVersion = recordMap.get(recordId); + serde.serializeEdit(previousVersion, record, out); + if (++editsSerialized < numEditsToSerialize) { + out.write(TRANSACTION_CONTINUE); + } else { + out.write(TRANSACTION_COMMIT); + } + } - serde.serializeEdit(previousVersion, record, out); - if (++editsSerialized < numEditsToSerialize) { - out.write(TRANSACTION_CONTINUE); - } else { - out.write(TRANSACTION_COMMIT); + out.flush(); + + if (this.closed) { + throw new IllegalStateException("Partition is closed"); } - } - out.flush(); + baos.writeTo(dataOut); + dataOut.flush(); - if (forceSync) { - fileOut.getFD().sync(); + if (forceSync) { + synchronized (fileOut) { + fileOut.getFD().sync(); + } + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index 7b7d2ca..5cdad82 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -54,6 +54,64 @@ public class TestMinimalLockingWriteAheadLog { @Test + @Ignore("for local testing only") + public void testUpdatePerformance() throws IOException, InterruptedException { + final int numPartitions = 4; + + final Path path = Paths.get("target/minimal-locking-repo"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final int updateCountPerThread = 1_000_000; + final int numThreads = 16; + + final Thread[] threads = new Thread[numThreads]; + + for (int j = 0; j < 2; j++) { + for (int i = 0; i < numThreads; i++) { + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < updateCountPerThread; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + try { + repo.update(Collections.singleton(record), false); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } + } + }); + + threads[i] = t; + } + + final long start = System.nanoTime(); + for (final Thread t : threads) { + t.start(); + } + for (final Thread t : threads) { + t.join(); + } + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + if (j == 0) { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads, *as a warmup!*"); + } else { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads"); + } + } + } + + + + @Test public void testRepoDoesntContinuallyGrowOnOutOfMemoryError() throws IOException, InterruptedException { final int numPartitions = 8; @@ -557,21 +615,10 @@ public class TestMinimalLockingWriteAheadLog { assertEquals(2, transactionIndicator); } - long transactionId = in.readLong(); - assertEquals(2L, transactionId); - - long thirdSize = in.readLong(); - assertEquals(8194, thirdSize); - - // should be 8176 A's because we threw an Exception after writing 8194 of them, - // but the BufferedOutputStream's buffer already had 8 bytes on it for the - // transaction id and the size. - for (int i = 0; i < 8176; i++) { - final int c = in.read(); - assertEquals("i = " + i, 'A', c); - } - - // Stream should now be out of data, because we threw an Exception! + // In previous implementations, we would still have a partial record written out. + // In the current version, however, the serde above would result in the data serialization + // failing and as a result no data would be written to the stream, so the stream should + // now be out of data final int nextByte = in.read(); assertEquals(-1, nextByte); } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-docs/src/main/asciidoc/administration-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index f7f9920..fff0bdd 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -2047,8 +2047,8 @@ FlowFile Repository, if also on that disk, could become corrupt. To avoid this s + For example, to provide two additional locations to act as part of the content repository, a user could also specify additional properties with keys of: + + -nifi.provenance.repository.directory.content1=/repos/provenance1 + -nifi.provenance.repository.directory.content2=/repos/provenance2 + +nifi.content.repository.directory.content1=/repos/content1 + +nifi.content.repository.directory.content2=/repos/content2 + + Providing three total locations, including _nifi.content.repository.directory.default_. |nifi.content.repository.archive.max.retention.period|If archiving is enabled (see nifi.content.repository.archive.enabled below), then @@ -2073,7 +2073,25 @@ The Provenance Repository contains the information related to Data Provenance. T |==== |*Property*|*Description* -|nifi.provenance.repository.implementation|The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. +|nifi.provenance.repository.implementation|The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository. +Two additional repositories are available as well. +To store provenance events in memory instead of on disk (in which case all events will be lost on restart, and events will be evicted in a first-in-first-out order), +set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. This leaves a configurable number of Provenance Events in the Java heap, so the number +of events that can be retained is very limited. + +As of Apache NiFi 1.2.0, a third option is available: org.apache.nifi.provenance.WriteAheadProvenanceRepository. +This implementation was created to replace the PersistentProvenanceRepository. The PersistentProvenanceRepository was originally written with the simple goal of persisting +Provenance Events as they are generated and providing the ability to iterate over those events sequentially. Later, it was desired to be able to compress the data so that +more data could be stored. After that, the ability to index and query the data was added. As requirements evolved over time, the repository kept changing without any major +redesigns. When used in a NiFi instance that is responsible for processing large volumes of small FlowFiles, the PersistentProvenanceRepository can quickly become a bottleneck. +The WriteAheadProvenanceRepository was then written to provide the same capabilities as the PersistentProvenanceRepository while providing far better performance. +Changing to the WriteAheadProvenanceRepository is easy to accomplish, as the two repositories support most of the same properties. +*Note Well*, however, the follow caveat: The WriteAheadProvenanceRepository will make use of the Provenance data stored by the PersistentProvenanceRepository. However, the +PersistentProvenanceRepository may not be able to read the data written by the WriteAheadProvenanceRepository. Therefore, once the Provenance Repository is changed to use +the WriteAheadProvenanceRepository, it cannot be changed back to the PersistentProvenanceRepository without deleting the data in the Provenance Repository. It is therefore +recommended that before changing the implementation, users ensure that their version of NiFi is stable, in case any issue arises that causes the user to need to roll back to +a previous version of NiFi that did not support the WriteAheadProvenanceRepository. It is for this reason that the default is still set to the PersistentProvenanceRepository +at this time. |==== === Persistent Provenance Repository Properties @@ -2115,6 +2133,70 @@ Providing three total locations, including _nifi.provenance.repository.director |nifi.provenance.repository.buffer.size|The Provenance Repository buffer size. The default value is 100000. |==== +=== Write Ahead Provenance Repository Properties + +|==== +|*Property*|*Description* +|nifi.provenance.repository.directory.default*|The location of the Provenance Repository. The default value is ./provenance_repository. + + + + *NOTE*: Multiple provenance repositories can be specified by using the *_nifi.provenance.repository.directory._* prefix with unique suffixes and separate paths as values. + + + + For example, to provide two additional locations to act as part of the provenance repository, a user could also specify additional properties with keys of: + + + + nifi.provenance.repository.directory.provenance1=/repos/provenance1 + + nifi.provenance.repository.directory.provenance2=/repos/provenance2 + + + + Providing three total locations, including _nifi.provenance.repository.directory.default_. +|nifi.provenance.repository.max.storage.time|The maximum amount of time to keep data provenance information. The default value is 24 hours. +|nifi.provenance.repository.max.storage.size|The maximum amount of data provenance information to store at a time. + The default is 1 GB. The Data Provenance capability can consume a great deal of storage space because so much data is kept. + For production environments, values of 1-2 TB or more is not uncommon. The repository will write to a single "event file" (or set of + "event files" if multiple storage locations are defined, as described above) for some period of time (defined by the + nifi.provenance.repository.rollover.time and nifi.provenance.repository.rollover.size properties). Data is always aged off one file at a time, + so it is not advisable to write to a single "event file" for a tremendous amount of time, as it will prevent old data from aging off as smoothly. +|nifi.provenance.repository.rollover.time|The amount of time to wait before rolling over the "event file" that the repository is writing to. +|nifi.provenance.repository.rollover.size|The amount of data to write to a single "event file." The default value is 100 MB. For production + environments where a very large amount of Data Provenance is generated, a value of 1 GB is also very reasonable. +|nifi.provenance.repository.query.threads|The number of threads to use for Provenance Repository queries. The default value is 2. +|nifi.provenance.repository.index.threads|The number of threads to use for indexing Provenance events so that they are searchable. The default value is 1. + For flows that operate on a very high number of FlowFiles, the indexing of Provenance events could become a bottleneck. If this happens, increasing the + value of this property may increase the rate at which the Provenance Repository is able to process these records, resulting in better overall throughput. + It is advisable to use at least 1 thread per storage location (i.e., if there are 3 storage locations, at least 3 threads should be used). For high + throughput environments, where more CPU and disk I/O is available, it may make sense to increase this value significantly. Typically going beyond + 2-4 threads per storage location is not valuable. However, this can be tuned depending on the CPU resources available compared to the I/O resources. +|nifi.provenance.repository.compress.on.rollover|Indicates whether to compress the provenance information when an "event file" is rolled over. The default value is _true_. +|nifi.provenance.repository.always.sync|If set to _true_, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system + not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is _false_, there could be the potential for data + loss if either there is a sudden power loss or the operating system crashes. The default value is _false_. +|nifi.provenance.repository.indexed.fields|This is a comma-separated list of the fields that should be indexed and made searchable. + Fields that are not indexed will not be searchable. Valid fields are: EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, + AlternateIdentifierURI, Relationship, Details. The default value is: EventType, FlowFileUUID, Filename, ProcessorID. +|nifi.provenance.repository.indexed.attributes|This is a comma-separated list of FlowFile Attributes that should be indexed and made searchable. It is blank by default. + But some good examples to consider are 'filename' and 'mime.type' as well as any custom attritubes you might use which are valuable for your use case. +|nifi.provenance.repository.index.shard.size|The repository uses Apache Lucene to performing indexing and searching capabilities. This value indicates how large a Lucene Index should + become before the Repository starts writing to a new Index. Large values for the shard size will result in more Java heap usage when searching the Provenance Repository but should + provide better performance. The default value is 500 MB. However, this is due to the fact that defaults are tuned for very small environments where most users begin to use NiFi. + For production environments, it is advisable to change this value to *4 to 8 GB*. Once all Provenance Events in the index have been aged off from the "event files," the index + will be destroyed as well. +|nifi.provenance.repository.max.attribute.length|Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from the repository. + If the length of any attribute exceeds this value, it will be truncated when the event is retrieved. The default is 65536. +|nifi.provenance.repository.concurrent.merge.threads|Apache Lucene creates several "segments" in an Index. These segments are periodically merged together in order to provide faster + querying. This property specifies the maximum number of threads that are allowed to be used for *each* of the storage directories. The default value is 2. For high throughput + environments, it is advisable to set the number of index threads larger than the number of merge threads * the number of storage locations. For example, if there are 2 storage + locations and the number of index threads is set to 8, then the number of merge threads should likely be less than 4. While it is not critical that this be done, setting the + number of merge threads larger than this can result in all index threads being used to merge, which would cause the NiFi flow to periodically pause while indexing is happening, + resulting in some data being processed with much higher latency than other data. +|nifi.provenance.repository.warm.cache.frequency|Each time that a Provenance query is run, the query must first search the Apache Lucene indices (at least, in most cases - there are + some queries that are run often and the results are cached to avoid searching the Lucene indices). When a Lucene index is opened for the first time, it can be very expensive and take + several seconds. This is compounded by having many different indices, and can result in a Provenance query taking much longer. After the index has been opened, the Operating System's + disk cache will typically hold onto enough data to make re-opening the index much faster - at least for a period of time, until the disk cache evicts this data. If this value is set, + NiFi will periodically open each Lucene index and then close it, in order to "warm" the cache. This will result in far faster queries when the Provenance Repository is large. As with + all great things, though, it comes with a cost. Warming the cache does take some CPU resources, but more importantly it will evict other data from the Operating System disk cache and + will result in reading (potentially a great deal of) data from the disk. This can result in lower NiFi performance. However, if NiFi is running in an environment where CPU and disk + are not fully utilized, this feature can result in far faster Provenance queries. +|==== + + === Component Status Repository The Component Status Repository contains the information for the Component Status History tool in the User Interface. These http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java index 4b3149b..5255c05 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java @@ -25,7 +25,7 @@ import org.apache.nifi.processor.exception.ProcessException; public interface Triggerable { - public static final long MINIMUM_SCHEDULING_NANOS = 30000L; + public static final long MINIMUM_SCHEDULING_NANOS = 1L; /** * <p> http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java new file mode 100644 index 0000000..6d548d2 --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Provides a mechanism for obtaining the identifiers of components, queues, etc. + */ +public interface IdentifierLookup { + + /** + * @return the identifiers of components that may generate Provenance Events + */ + List<String> getComponentIdentifiers(); + + /** + * @return a list of component types that may generate Provenance Events + */ + List<String> getComponentTypes(); + + /** + * + * @return the identifiers of FlowFile Queues that are in the flow + */ + List<String> getQueueIdentifiers(); + + default Map<String, Integer> invertQueueIdentifiers() { + return invertList(getQueueIdentifiers()); + } + + default Map<String, Integer> invertComponentTypes() { + return invertList(getComponentTypes()); + } + + default Map<String, Integer> invertComponentIdentifiers() { + return invertList(getComponentIdentifiers()); + } + + default Map<String, Integer> invertList(final List<String> values) { + final Map<String, Integer> inverted = new HashMap<>(values.size()); + for (int i = 0; i < values.size(); i++) { + inverted.put(values.get(i), i); + } + return inverted; + } + + + public static final IdentifierLookup EMPTY = new IdentifierLookup() { + @Override + public List<String> getComponentIdentifiers() { + return Collections.emptyList(); + } + + @Override + public List<String> getComponentTypes() { + return Collections.emptyList(); + } + + @Override + public List<String> getQueueIdentifiers() { + return Collections.emptyList(); + } + + @Override + public Map<String, Integer> invertList(List<String> values) { + return Collections.emptyMap(); + } + }; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java index 6a5954a..516a36d 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java @@ -34,12 +34,13 @@ public interface ProvenanceRepository extends ProvenanceEventRepository { * Performs any initialization needed. This should be called only by the * framework. * - * @param eventReporter to report to - * @param authorizer the authorizer to use for authorizing individual events + * @param eventReporter to report to + * @param authorizer the authorizer to use for authorizing individual events * @param resourceFactory the resource factory to use for generating Provenance Resource objects for authorization purposes + * @param identifierLookup a mechanism for looking up identifiers in the flow * @throws java.io.IOException if unable to initialize */ - void initialize(EventReporter eventReporter, Authorizer authorizer, ProvenanceAuthorizableFactory resourceFactory) throws IOException; + void initialize(EventReporter eventReporter, Authorizer authorizer, ProvenanceAuthorizableFactory resourceFactory, IdentifierLookup identifierLookup) throws IOException; ProvenanceEventRecord getEvent(long id, NiFiUser user) throws IOException; http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java index 4d0f991..ad480be 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java @@ -18,6 +18,7 @@ package org.apache.nifi.provenance.lineage; import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @@ -55,4 +56,6 @@ public interface ComputeLineageResult { * @return Indicates whether or not the lineage has finished running */ boolean isFinished(); + + boolean awaitCompletion(long time, TimeUnit unit) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java index 3519c14..4db8e0f 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java @@ -93,4 +93,8 @@ public class Query { public String toString() { return "Query[ " + searchTerms + " ]"; } + + public boolean isEmpty() { + return searchTerms.isEmpty() && maxFileSize == null && minFileSize == null && startDate == null && endDate == null; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java index 0079433..cc84ea1 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java @@ -18,6 +18,7 @@ package org.apache.nifi.provenance.search; import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -60,4 +61,6 @@ public interface QueryResult { * @return Indicates whether or not the query has finished running */ boolean isFinished(); + + boolean awaitCompletion(long time, TimeUnit unit) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java index 9bc5f0e..53c3c2e 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java +++ b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java @@ -56,7 +56,7 @@ public class MockProvenanceRepository implements ProvenanceRepository { } @Override - public void initialize(EventReporter eventReporter, Authorizer authorizer, ProvenanceAuthorizableFactory resourceFactory) throws IOException { + public void initialize(EventReporter eventReporter, Authorizer authorizer, ProvenanceAuthorizableFactory resourceFactory, IdentifierLookup idLookup) throws IOException { } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index 1ff1a2f..df87de5 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -63,7 +63,29 @@ public class MockFlowFile implements FlowFileRecord { } public MockFlowFile(final long id, final FlowFile toCopy) { - this(id); + this.creationTime = System.nanoTime(); + this.id = id; + entryDate = System.currentTimeMillis(); + + final Map<String, String> attributesToCopy = toCopy.getAttributes(); + String filename = attributesToCopy.get(CoreAttributes.FILENAME.key()); + if (filename == null) { + filename = String.valueOf(System.nanoTime()) + ".mockFlowFile"; + } + attributes.put(CoreAttributes.FILENAME.key(), filename); + + String path = attributesToCopy.get(CoreAttributes.PATH.key()); + if (path == null) { + path = "target"; + } + attributes.put(CoreAttributes.PATH.key(), path); + + String uuid = attributesToCopy.get(CoreAttributes.UUID.key()); + if (uuid == null) { + uuid = UUID.randomUUID().toString(); + } + attributes.put(CoreAttributes.UUID.key(), uuid); + attributes.putAll(toCopy.getAttributes()); final byte[] dataToCopy = ((MockFlowFile) toCopy).data; this.data = new byte[dataToCopy.length]; http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java index ff42f47..1177dad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -111,7 +111,7 @@ public abstract class AbstractPort implements Port { yieldPeriod = new AtomicReference<>("1 sec"); yieldExpiration = new AtomicLong(0L); schedulingPeriod = new AtomicReference<>("0 millis"); - schedulingNanos = new AtomicLong(30000); + schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); scheduledState = new AtomicReference<>(ScheduledState.STOPPED); } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index 18bcc3c..34ffbac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -100,7 +100,7 @@ public class StandardFunnel implements Funnel { yieldPeriod = new AtomicReference<>("250 millis"); yieldExpiration = new AtomicLong(0L); schedulingPeriod = new AtomicReference<>("0 millis"); - schedulingNanos = new AtomicLong(30000); + schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); name = new AtomicReference<>("Funnel"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 7fd85b9..191fc65 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -152,6 +152,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.provenance.IdentifierLookup; import org.apache.nifi.provenance.ProvenanceAuthorizableFactory; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -233,10 +234,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; -public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { +public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, + QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup { // default repository implementations public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; @@ -454,7 +457,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { this.provenanceRepository = createProvenanceRepository(nifiProperties); - this.provenanceRepository.initialize(createEventReporter(bulletinRepository), authorizer, this); + this.provenanceRepository.initialize(createEventReporter(bulletinRepository), authorizer, this, this); } catch (final Exception e) { throw new RuntimeException("Unable to create Provenance Repository", e); } @@ -3886,6 +3889,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return replayEvent; } + @Override + public List<String> getComponentIdentifiers() { + final List<String> componentIds = new ArrayList<>(); + getGroup(getRootGroupId()).findAllProcessors().stream() + .forEach(proc -> componentIds.add(proc.getIdentifier())); + getGroup(getRootGroupId()).getInputPorts().stream() + .forEach(port -> componentIds.add(port.getIdentifier())); + getGroup(getRootGroupId()).getOutputPorts().stream() + .forEach(port -> componentIds.add(port.getIdentifier())); + + return componentIds; + } + + @Override + @SuppressWarnings("rawtypes") + public List<String> getComponentTypes() { + final Set<Class> procClasses = ExtensionManager.getExtensions(Processor.class); + final List<String> componentTypes = new ArrayList<>(procClasses.size() + 2); + componentTypes.add(ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE); + componentTypes.add(ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE); + procClasses.stream() + .map(procClass -> procClass.getSimpleName()) + .forEach(componentType -> componentTypes.add(componentType)); + return componentTypes; + } + + @Override + public List<String> getQueueIdentifiers() { + return getAllQueues().stream() + .map(q -> q.getIdentifier()) + .collect(Collectors.toList()); + } + public boolean isConnected() { rwLock.readLock().lock(); try { http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 7108cfe..67df539 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -911,6 +911,7 @@ public class FileSystemRepository implements ContentRepository { } bytesWritten += len; + scc.setLength(bytesWritten + initialLength); }