NIFI-80: Truncate attribute values that exceed some threshold. Expose threshold 
as properties in nifi.properties file


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/25146a58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/25146a58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/25146a58

Branch: refs/heads/NIFI-632
Commit: 25146a5828bc5dd1d04c5252843bef93ea87afac
Parents: 979671c
Author: Mark Payne <marka...@hotmail.com>
Authored: Sat Jun 20 08:52:14 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon Jun 22 16:06:40 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/pom.xml                      |  1 +
 .../src/main/resources/conf/nifi.properties     |  3 +
 .../nifi/provenance/IndexConfiguration.java     |  2 +-
 .../PersistentProvenanceRepository.java         | 90 +++++++++++++++++---
 .../provenance/RepositoryConfiguration.java     | 18 ++++
 .../nifi/provenance/StandardRecordReader.java   | 15 ++--
 .../provenance/lucene/DeleteIndexAction.java    |  6 +-
 .../nifi/provenance/lucene/DocsReader.java      |  9 +-
 .../nifi/provenance/lucene/IndexSearch.java     |  7 +-
 .../nifi/provenance/lucene/LineageQuery.java    |  6 +-
 .../provenance/serialization/RecordReaders.java | 17 +++-
 .../TestPersistentProvenanceRepository.java     | 39 ++++++++-
 .../TestStandardRecordReaderWriter.java         | 10 +--
 13 files changed, 180 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index 02a16f9..9b17617 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -276,6 +276,7 @@ language governing permissions and limitations under the 
License. -->
         <nifi.provenance.repository.index.shard.size>500 
MB</nifi.provenance.repository.index.shard.size>
         
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
         
<nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count>
+        
<nifi.provenance.repository.max.attribute.length>65536</nifi.provenance.repository.max.attribute.length>
 
         <!-- volatile provenance repository properties -->
         
<nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 90b3cdd..4043076 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -82,6 +82,9 @@ 
nifi.provenance.repository.indexed.attributes=${nifi.provenance.repository.index
 # Large values for the shard size will result in more Java heap usage when 
searching the Provenance Repository
 # but should provide better performance
 
nifi.provenance.repository.index.shard.size=${nifi.provenance.repository.index.shard.size}
+# Indicates the maximum length that a FlowFile attribute can be when 
retrieving a Provenance Event from
+# the repository. If the length of any attribute exceeds this value, it will 
be truncated when the event is retrieved.
+nifi.provenance.repository.max.attribute.length=${nifi.provenance.repository.max.attribute.length}
 
 # Volatile Provenance Respository Properties
 
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index 9ea793d..4e80811 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -85,7 +85,7 @@ public class IndexConfiguration {
     }
 
     private Long getFirstEntryTime(final File provenanceLogFile) {
-        try (final RecordReader reader = 
RecordReaders.newRecordReader(provenanceLogFile, null)) {
+        try (final RecordReader reader = 
RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
             final StandardProvenanceEventRecord firstRecord = 
reader.nextRecord();
             if (firstRecord == null) {
                 return provenanceLogFile.lastModified();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 5da5d6f..81d883a 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -134,6 +134,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     private final IndexManager indexManager;
     private final boolean alwaysSync;
     private final int rolloverCheckMillis;
+    private final int maxAttributeChars;
 
     private final ScheduledExecutorService scheduledExecService;
     private final ScheduledExecutorService rolloverExecutor;
@@ -167,6 +168,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
 
         this.configuration = configuration;
+        this.maxAttributeChars = configuration.getMaxAttributeChars();
 
         for (final File file : configuration.getStorageDirectories()) {
             final Path storageDirectory = file.toPath();
@@ -289,6 +291,21 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
         final Boolean alwaysSync = 
Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync",
 "false"));
 
+        final int defaultMaxAttrChars = 65536;
+        final String maxAttrLength = 
properties.getProperty("nifi.provenance.repository.max.attribute.length", 
String.valueOf(defaultMaxAttrChars));
+        int maxAttrChars;
+        try {
+            maxAttrChars = Integer.parseInt(maxAttrLength);
+            // must be at least 36 characters because that's the length of the 
uuid attribute,
+            // which must be kept intact
+            if (maxAttrChars < 36) {
+                maxAttrChars = 36;
+                logger.warn("Found max attribute length property set to " + 
maxAttrLength + " but minimum length is 36; using 36 instead");
+            }
+        } catch (final Exception e) {
+            maxAttrChars = defaultMaxAttrChars;
+        }
+
         final List<SearchableField> searchableFields = 
SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
         final List<SearchableField> searchableAttributes = 
SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
 
@@ -310,6 +327,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         config.setMaxStorageCapacity(maxStorageBytes);
         config.setQueryThreadPoolSize(queryThreads);
         config.setJournalCount(journalCount);
+        config.setMaxAttributeChars(maxAttrChars);
 
         if (shardSize != null) {
             config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, 
DataUnit.B).longValue());
@@ -337,6 +355,14 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return writers;
     }
 
+    /**
+     * @return the maximum number of characters that any Event attribute 
should contain. If the event contains
+     *         more characters than this, the attribute may be truncated on 
retrieval
+     */
+    public int getMaxAttributeCharacters() {
+        return maxAttributeChars;
+    }
+
     @Override
     public StandardProvenanceEventRecord.Builder eventBuilder() {
         return new StandardProvenanceEventRecord.Builder();
@@ -362,7 +388,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
 
         for (final Path path : paths) {
-            try (RecordReader reader = 
RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) {
+            try (RecordReader reader = 
RecordReaders.newRecordReader(path.toFile(), getAllLogFiles(), 
maxAttributeChars)) {
                 // if this is the first record, try to find out the block 
index and jump directly to
                 // the block index. This avoids having to read through a lot 
of data that we don't care about
                 // just to get to the first record that we want.
@@ -377,7 +403,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 }
 
                 StandardProvenanceEventRecord record;
-                while (records.size() < maxRecords && ((record = 
reader.nextRecord()) != null)) {
+                while (records.size() < maxRecords && (record = 
reader.nextRecord()) != null) {
                     if (record.getEventId() >= firstRecordId) {
                         records.add(record);
                     }
@@ -507,7 +533,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
         if (maxIdFile != null) {
             // Determine the max ID in the last file.
-            try (final RecordReader reader = 
RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
+            try (final RecordReader reader = 
RecordReaders.newRecordReader(maxIdFile, getAllLogFiles(), maxAttributeChars)) {
                 final long eventId = reader.getMaxEventId();
                 if (eventId > maxId) {
                     maxId = eventId;
@@ -571,7 +597,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
             // Read the records in the last file to find its max id
             if (greatestMinIdFile != null) {
-                try (final RecordReader recordReader = 
RecordReaders.newRecordReader(greatestMinIdFile, 
Collections.<Path>emptyList())) {
+                try (final RecordReader recordReader = 
RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path> 
emptyList(), maxAttributeChars)) {
                     maxId = recordReader.getMaxEventId();
                 }
             }
@@ -1224,7 +1250,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         try {
             for (final File journalFile : journalFiles) {
                 try {
-                    readers.add(RecordReaders.newRecordReader(journalFile, 
null));
+                    // Use MAX_VALUE for number of chars because we don't want 
to truncate the value as we write it
+                    // out. This allows us to later decide that we want more 
characters and still be able to retrieve
+                    // the entire event.
+                    readers.add(RecordReaders.newRecordReader(journalFile, 
null, Integer.MAX_VALUE));
                 } catch (final EOFException eof) {
                     // there's nothing here. Skip over it.
                 } catch (final IOException ioe) {
@@ -1314,7 +1343,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         indexingAction.index(record, indexWriter, blockIndex);
                         maxId = record.getEventId();
 
-                        latestRecords.add(record);
+                        latestRecords.add(truncateAttributes(record));
                         records++;
 
                         // Remove this entry from the map
@@ -1383,6 +1412,39 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return writerFile;
     }
 
+    private StandardProvenanceEventRecord truncateAttributes(final 
StandardProvenanceEventRecord original) {
+        boolean requireTruncation = false;
+
+        for (final Map.Entry<String, String> entry : 
original.getAttributes().entrySet()) {
+            if (entry.getValue().length() > maxAttributeChars) {
+                requireTruncation = true;
+                break;
+            }
+        }
+
+        if (!requireTruncation) {
+            return original;
+        }
+
+        final StandardProvenanceEventRecord.Builder builder = new 
StandardProvenanceEventRecord.Builder().fromEvent(original);
+        
builder.setAttributes(truncateAttributes(original.getPreviousAttributes()), 
truncateAttributes(original.getUpdatedAttributes()));
+        final StandardProvenanceEventRecord truncated = builder.build();
+        truncated.setEventId(original.getEventId());
+        return truncated;
+    }
+
+    private Map<String, String> truncateAttributes(final Map<String, String> 
original) {
+        final Map<String, String> truncatedAttrs = new HashMap<>();
+        for (final Map.Entry<String, String> entry : original.entrySet()) {
+            if (entry.getValue().length() > maxAttributeChars) {
+                truncatedAttrs.put(entry.getKey(), 
entry.getValue().substring(0, maxAttributeChars));
+            } else {
+                truncatedAttrs.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return truncatedAttrs;
+    }
+
     @Override
     public List<SearchableField> getSearchableFields() {
         final List<SearchableField> searchableFields = new 
ArrayList<>(configuration.getSearchableFields());
@@ -1612,7 +1674,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
                         for (final File file : potentialFiles) {
                             try {
-                                reader = RecordReaders.newRecordReader(file, 
allLogFiles);
+                                reader = RecordReaders.newRecordReader(file, 
allLogFiles, maxAttributeChars);
                             } catch (final IOException ioe) {
                                 continue;
                             }
@@ -1788,7 +1850,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             return true;
         }
 
-        if (repoDirty.get() || (writtenSinceRollover > 0 && 
System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) {
+        if (repoDirty.get() || writtenSinceRollover > 0 && 
System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) {
             return true;
         }
 
@@ -1797,7 +1859,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
     public Collection<Path> getAllLogFiles() {
         final SortedMap<Long, Path> map = idToPathMap.get();
-        return (map == null) ? new ArrayList<Path>() : map.values();
+        return map == null ? new ArrayList<Path>() : map.values();
     }
 
     private static class PathMapComparator implements Comparator<Long> {
@@ -1885,7 +1947,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         @Override
         public void run() {
             try {
-                final IndexSearch search = new 
IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager);
+                final IndexSearch search = new 
IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, 
maxAttributeChars);
                 final StandardQueryResult queryResult = search.search(query, 
retrievalCount);
                 submission.getResult().update(queryResult.getMatchingEvents(), 
queryResult.getTotalHitCount());
                 if (queryResult.isFinished()) {
@@ -1926,7 +1988,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
 
             try {
-                final Set<ProvenanceEventRecord> matchingRecords = 
LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, 
indexManager, indexDir, null, flowFileUuids);
+                final Set<ProvenanceEventRecord> matchingRecords = 
LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
+                    indexManager, indexDir, null, flowFileUuids, 
maxAttributeChars);
+
                 final StandardLineageResult result = submission.getResult();
                 result.update(matchingRecords);
 
@@ -1959,7 +2023,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     final Map.Entry<String, AsyncQuerySubmission> entry = 
queryIterator.next();
 
                     final StandardQueryResult result = 
entry.getValue().getResult();
-                    if (entry.getValue().isCanceled() || (result.isFinished() 
&& result.getExpiration().before(now))) {
+                    if (entry.getValue().isCanceled() || result.isFinished() 
&& result.getExpiration().before(now)) {
                         queryIterator.remove();
                     }
                 }
@@ -1969,7 +2033,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     final Map.Entry<String, AsyncLineageSubmission> entry = 
lineageIterator.next();
 
                     final StandardLineageResult result = 
entry.getValue().getResult();
-                    if (entry.getValue().isCanceled() || (result.isFinished() 
&& result.getExpiration().before(now))) {
+                    if (entry.getValue().isCanceled() || result.isFinished() 
&& result.getExpiration().before(now)) {
                         lineageIterator.remove();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index d0d147c..381d778 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -34,6 +34,7 @@ public class RepositoryConfiguration {
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int journalCount = 16;
     private int compressionBlockBytes = 1024 * 1024;
+    private int maxAttributeChars = 65536;
 
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
@@ -278,4 +279,21 @@ public class RepositoryConfiguration {
     public void setAlwaysSync(boolean alwaysSync) {
         this.alwaysSync = alwaysSync;
     }
+
+    /**
+     * @return the maximum number of characters to include in any attribute. 
If an attribute in a Provenance
+     *         Event has more than this number of characters, it will be 
truncated when the event is retrieved.
+     */
+    public int getMaxAttributeChars() {
+        return maxAttributeChars;
+    }
+
+    /**
+     * Sets the maximum number of characters to include in any attribute. If 
an attribute in a Provenance
+     * Event has more than this number of characters, it will be truncated 
when it is retrieved.
+     */
+    public void setMaxAttributeChars(int maxAttributeChars) {
+        this.maxAttributeChars = maxAttributeChars;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index ca0d5ed..0939107 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -47,18 +47,20 @@ public class StandardRecordReader implements RecordReader {
     private final boolean compressed;
     private final TocReader tocReader;
     private final int headerLength;
+    private final int maxAttributeChars;
 
     private DataInputStream dis;
     private ByteCountingInputStream byteCountingIn;
 
-    public StandardRecordReader(final InputStream in, final String filename) 
throws IOException {
-        this(in, filename, null);
+    public StandardRecordReader(final InputStream in, final String filename, 
final int maxAttributeChars) throws IOException {
+        this(in, filename, null, maxAttributeChars);
     }
 
-    public StandardRecordReader(final InputStream in, final String filename, 
final TocReader tocReader) throws IOException {
+    public StandardRecordReader(final InputStream in, final String filename, 
final TocReader tocReader, final int maxAttributeChars) throws IOException {
         logger.trace("Creating RecordReader for {}", filename);
 
         rawInputStream = new ByteCountingInputStream(in);
+        this.maxAttributeChars = maxAttributeChars;
 
         final InputStream limitedStream;
         if ( tocReader == null ) {
@@ -367,7 +369,8 @@ public class StandardRecordReader implements RecordReader {
         for (int i = 0; i < numAttributes; i++) {
             final String key = readLongString(dis);
             final String value = valueNullable ? readLongNullableString(dis) : 
readLongString(dis);
-            attrs.put(key, value);
+            final String truncatedValue = value.length() > maxAttributeChars ? 
value.substring(0, maxAttributeChars) : value;
+            attrs.put(key, truncatedValue);
         }
 
         return attrs;
@@ -429,7 +432,7 @@ public class StandardRecordReader implements RecordReader {
             byteCountingIn.reset();
         }
 
-        return (nextByte >= 0);
+        return nextByte >= 0;
     }
 
     @Override
@@ -451,7 +454,7 @@ public class StandardRecordReader implements RecordReader {
             // committed, so we can just process the FlowFile again.
         }
 
-        return (lastRecord == null) ? -1L : lastRecord.getEventId();
+        return lastRecord == null ? -1L : lastRecord.getEventId();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 70bf36e..7707352 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -46,9 +46,9 @@ public class DeleteIndexAction implements ExpirationAction {
     @Override
     public File execute(final File expiredFile) throws IOException {
         // count the number of records and determine the max event id that we 
are deleting.
-        long numDeleted = 0;
+        final long numDeleted = 0;
         long maxEventId = -1L;
-        try (final RecordReader reader = 
RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
+        try (final RecordReader reader = 
RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles(), 
Integer.MAX_VALUE)) {
             maxEventId = reader.getMaxEventId();
         } catch (final IOException ioe) {
             logger.warn("Failed to obtain max ID present in journal file {}", 
expiredFile.getAbsolutePath());
@@ -65,7 +65,7 @@ public class DeleteIndexAction implements ExpirationAction {
                 writer.deleteDocuments(term);
                 writer.commit();
                 final int docsLeft = writer.numDocs();
-                deleteDir = (docsLeft <= 0);
+                deleteDir = docsLeft <= 0;
                 logger.debug("After expiring {}, there are {} docs left for 
index {}", expiredFile, docsLeft, indexingDirectory);
             } finally {
                 indexManager.returnIndexWriter(indexingDirectory, writer);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 02fd5c3..eef4628 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -51,7 +51,7 @@ public class DocsReader {
     }
 
     public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final 
IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
-            final AtomicInteger retrievalCount, final int maxResults) throws 
IOException {
+            final AtomicInteger retrievalCount, final int maxResults, final 
int maxAttributeChars) throws IOException {
         if (retrievalCount.get() >= maxResults) {
             return Collections.emptySet();
         }
@@ -68,7 +68,7 @@ public class DocsReader {
 
         final long readDocuments = System.nanoTime() - start;
         logger.debug("Reading {} Lucene Documents took {} millis", 
docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
-        return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
+        return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, 
maxAttributeChars);
     }
 
 
@@ -108,7 +108,8 @@ public class DocsReader {
     }
 
 
-    public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, 
final int maxResults) throws IOException {
+    public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
Collection<Path> allProvenanceLogFiles,
+        final AtomicInteger retrievalCount, final int maxResults, final int 
maxAttributeChars) throws IOException {
         if (retrievalCount.get() >= maxResults) {
             return Collections.emptySet();
         }
@@ -161,7 +162,7 @@ public class DocsReader {
 
                         for (final File file : potentialFiles) {
                             try {
-                                reader = RecordReaders.newRecordReader(file, 
allProvenanceLogFiles);
+                                reader = RecordReaders.newRecordReader(file, 
allProvenanceLogFiles, maxAttributeChars);
                                 matchingRecords.add(getRecord(d, reader));
 
                                 if ( retrievalCount.incrementAndGet() >= 
maxResults ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index 53869f4..c9bb238 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -39,11 +39,13 @@ public class IndexSearch {
     private final PersistentProvenanceRepository repository;
     private final File indexDirectory;
     private final IndexManager indexManager;
+    private final int maxAttributeChars;
 
-    public IndexSearch(final PersistentProvenanceRepository repo, final File 
indexDirectory, final IndexManager indexManager) {
+    public IndexSearch(final PersistentProvenanceRepository repo, final File 
indexDirectory, final IndexManager indexManager, final int maxAttributeChars) {
         this.repository = repo;
         this.indexDirectory = indexDirectory;
         this.indexManager = indexManager;
+        this.maxAttributeChars = maxAttributeChars;
     }
 
     public StandardQueryResult search(final 
org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger 
retrievedCount) throws IOException {
@@ -82,7 +84,8 @@ public class IndexSearch {
             }
 
             final DocsReader docsReader = new 
DocsReader(repository.getConfiguration().getStorageDirectories());
-            matchingRecords = docsReader.read(topDocs, 
searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, 
provenanceQuery.getMaxResults());
+            matchingRecords = docsReader.read(topDocs, 
searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
+                provenanceQuery.getMaxResults(), maxAttributeChars);
 
             final long readRecordsNanos = System.nanoTime() - finishSearch;
             logger.debug("Reading {} records took {} millis for {}", 
matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index 502068b..e9e6e63 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -46,7 +46,7 @@ public class LineageQuery {
     private static final Logger logger = 
LoggerFactory.getLogger(LineageQuery.class);
 
     public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final 
PersistentProvenanceRepository repo, final IndexManager indexManager, final 
File indexDirectory,
-            final String lineageIdentifier, final Collection<String> 
flowFileUuids) throws IOException {
+        final String lineageIdentifier, final Collection<String> 
flowFileUuids, final int maxAttributeChars) throws IOException {
         if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
             throw new IllegalArgumentException(String.format("Cannot compute 
lineage for more than %s FlowFiles. This lineage contains %s.", 
MAX_LINEAGE_UUIDS, flowFileUuids.size()));
         }
@@ -94,7 +94,9 @@ public class LineageQuery {
                 final long searchEnd = System.nanoTime();
 
                 final DocsReader docsReader = new 
DocsReader(repo.getConfiguration().getStorageDirectories());
-                final Set<ProvenanceEventRecord> recs = 
docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), 
repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
+                final Set<ProvenanceEventRecord> recs = 
docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), 
repo.getAllLogFiles(),
+                    new AtomicInteger(0), Integer.MAX_VALUE, 
maxAttributeChars);
+
                 final long readDocsEnd = System.nanoTime();
                 logger.debug("Finished Lineage Query against {}; Lucene search 
took {} millis, reading records took {} millis",
                         indexDirectory, 
TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), 
TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index cab5e6f..7889cd6 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -32,7 +32,18 @@ import org.apache.nifi.provenance.toc.TocUtil;
 
 public class RecordReaders {
 
-    public static RecordReader newRecordReader(File file, final 
Collection<Path> provenanceLogFiles) throws IOException {
+    /**
+     * Creates a new Record Reader that is capable of reading Provenance Event 
Journals
+     *
+     * @param file the Provenance Event Journal to read data from
+     * @param provenanceLogFiles collection of all provenance journal files
+     * @param maxAttributeChars the maximum number of characters to retrieve 
for any one attribute. This allows us to avoid
+     *            issues where a FlowFile has an extremely large attribute and 
reading events
+     *            for that FlowFile results in loading that attribute into 
memory many times, exhausting the Java Heap
+     * @return a Record Reader capable of reading Provenance Event Journals
+     * @throws IOException if unable to create a Record Reader for the given 
file
+     */
+    public static RecordReader newRecordReader(File file, final 
Collection<Path> provenanceLogFiles, final int maxAttributeChars) throws 
IOException {
         final File originalFile = file;
         InputStream fis = null;
 
@@ -92,9 +103,9 @@ public class RecordReaders {
             final File tocFile = TocUtil.getTocFile(file);
             if ( tocFile.exists() ) {
                 final TocReader tocReader = new StandardTocReader(tocFile);
-                return new StandardRecordReader(fis, filename, tocReader);
+                return new StandardRecordReader(fis, filename, tocReader, 
maxAttributeChars);
             } else {
-                return new StandardRecordReader(fis, filename);
+                return new StandardRecordReader(fis, filename, 
maxAttributeChars);
             }
         } catch (final IOException ioe) {
             if ( fis != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 7d97bcd..16f0312 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -252,7 +252,7 @@ public class TestPersistentProvenanceRepository {
         assertEquals(10, recoveredRecords.size());
         for (int i = 0; i < 10; i++) {
             final ProvenanceEventRecord recovered = recoveredRecords.get(i);
-            assertEquals((long) i, recovered.getEventId());
+            assertEquals(i, recovered.getEventId());
             assertEquals("nifi://unit-test", recovered.getTransitUri());
             assertEquals(ProvenanceEventType.RECEIVE, 
recovered.getEventType());
             assertEquals(attributes, recovered.getAttributes());
@@ -283,7 +283,7 @@ public class TestPersistentProvenanceRepository {
         builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
         builder.setComponentId("1234");
         builder.setComponentType("dummy processor");
-        ProvenanceEventRecord record = builder.build();
+        final ProvenanceEventRecord record = builder.build();
 
         for (int i = 0; i < 10; i++) {
             repo.registerEvent(record);
@@ -1106,7 +1106,7 @@ public class TestPersistentProvenanceRepository {
 
             final Query q = new Query("");
             q.setMaxResults(1000);
-            TopDocs topDocs = searcher.search(luceneQuery, 1000);
+            final TopDocs topDocs = searcher.search(luceneQuery, 1000);
 
             final List<Document> docs = new ArrayList<>();
             for (int i = 0; i < topDocs.scoreDocs.length; i++) {
@@ -1157,7 +1157,7 @@ public class TestPersistentProvenanceRepository {
         for (final File file : storageDir.listFiles()) {
             if (file.isFile()) {
 
-                try (RecordReader reader = RecordReaders.newRecordReader(file, 
null)) {
+                try (RecordReader reader = RecordReaders.newRecordReader(file, 
null, 2048)) {
                     ProvenanceEventRecord r = null;
 
                     while ((r = reader.nextRecord()) != null) {
@@ -1169,4 +1169,35 @@ public class TestPersistentProvenanceRepository {
 
         assertEquals(10000, counter);
     }
+
+    @Test
+    public void testTruncateAttributes() throws IOException, 
InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxAttributeChars(50);
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
+        repo.initialize(getEventReporter());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+        repo.registerEvent(record);
+        repo.waitForRollover();
+
+        final ProvenanceEventRecord retrieved = repo.getEvent(0L);
+        assertNotNull(retrieved);
+        assertEquals("12345678-0000-0000-0000-012345678912", 
retrieved.getAttributes().get("uuid"));
+        assertEquals("12345678901234567890123456789012345678901234567890", 
retrieved.getAttributes().get("75chars"));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/25146a58/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index f242642..d9e64e5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -74,7 +74,7 @@ public class TestStandardRecordReaderWriter {
         final TocReader tocReader = new StandardTocReader(tocFile);
 
         try (final FileInputStream fis = new FileInputStream(journalFile);
-                final StandardRecordReader reader = new 
StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
             assertEquals(0, reader.getBlockIndex());
             reader.skipToBlock(0);
             final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
@@ -102,7 +102,7 @@ public class TestStandardRecordReaderWriter {
         final TocReader tocReader = new StandardTocReader(tocFile);
 
         try (final FileInputStream fis = new FileInputStream(journalFile);
-                final StandardRecordReader reader = new 
StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
             assertEquals(0, reader.getBlockIndex());
             reader.skipToBlock(0);
             final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
@@ -133,7 +133,7 @@ public class TestStandardRecordReaderWriter {
         final TocReader tocReader = new StandardTocReader(tocFile);
 
         try (final FileInputStream fis = new FileInputStream(journalFile);
-                final StandardRecordReader reader = new 
StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
             for (int i=0; i < 10; i++) {
                 assertEquals(0, reader.getBlockIndex());
 
@@ -172,12 +172,12 @@ public class TestStandardRecordReaderWriter {
         final TocReader tocReader = new StandardTocReader(tocFile);
 
         try (final FileInputStream fis = new FileInputStream(journalFile);
-                final StandardRecordReader reader = new 
StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
             for (int i=0; i < 10; i++) {
                 final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
                 System.out.println(recovered);
                 assertNotNull(recovered);
-                assertEquals((long) i, recovered.getEventId());
+                assertEquals(i, recovered.getEventId());
                 assertEquals("nifi://unit-test", recovered.getTransitUri());
             }
 


Reply via email to