http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java new file mode 100644 index 0000000..18d3860 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.lucene; + +import java.io.IOException; +import java.util.Set; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.TopDocs; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface DocumentToEventConverter { + + Set<ProvenanceEventRecord> convert(TopDocs topDocs, IndexReader indexReader) throws IOException; +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index f84021f..331d141 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -21,17 +21,19 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.search.IndexSearcher; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.EventIndexWriter; public interface IndexManager extends Closeable { - IndexSearcher borrowIndexSearcher(File indexDir) throws IOException; + EventIndexSearcher borrowIndexSearcher(File indexDir) throws IOException; - IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException; + EventIndexWriter borrowIndexWriter(File indexDirectory) throws IOException; - void removeIndex(final File indexDirectory); + boolean removeIndex(final File indexDirectory); - void returnIndexSearcher(File indexDirectory, IndexSearcher searcher); + void returnIndexSearcher(EventIndexSearcher searcher); - void returnIndexWriter(File indexingDirectory, IndexWriter writer); + void returnIndexWriter(EventIndexWriter writer, boolean commit, boolean isCloseable); + + void returnIndexWriter(EventIndexWriter writer); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index 8d7df8b..514af38 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -21,18 +21,20 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.Date; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; +import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.provenance.PersistentProvenanceRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.StandardQueryResult; -import org.apache.nifi.provenance.authorization.AuthorizationCheck; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.index.EventIndexSearcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,14 +88,14 @@ public class IndexSearch { final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery); final long start = System.nanoTime(); - IndexSearcher searcher = null; + EventIndexSearcher searcher = null; try { searcher = indexManager.borrowIndexSearcher(indexDirectory); final long searchStartNanos = System.nanoTime(); final long openSearcherNanos = searchStartNanos - start; logger.debug("Searching {} for {}", this, provenanceQuery); - final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final TopDocs topDocs = searcher.getIndexSearcher().search(luceneQuery, provenanceQuery.getMaxResults()); final long finishSearch = System.nanoTime(); final long searchNanos = finishSearch - searchStartNanos; @@ -107,9 +109,29 @@ public class IndexSearch { final DocsReader docsReader = new DocsReader(); - final AuthorizationCheck authCheck = event -> repository.isAuthorized(event, user); - - matchingRecords = docsReader.read(topDocs, authCheck, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, + final EventAuthorizer authorizer = new EventAuthorizer() { + @Override + public boolean isAuthorized(ProvenanceEventRecord event) { + return repository.isAuthorized(event, user); + } + + @Override + public void authorize(ProvenanceEventRecord event) throws AccessDeniedException { + repository.authorize(event, user); + } + + @Override + public List<ProvenanceEventRecord> filterUnauthorizedEvents(List<ProvenanceEventRecord> events) { + return repository.filterUnauthorizedEvents(events, user); + } + + @Override + public Set<ProvenanceEventRecord> replaceUnauthorizedWithPlaceholders(Set<ProvenanceEventRecord> events) { + return repository.replaceUnauthorizedWithPlaceholders(events, user); + } + }; + + matchingRecords = docsReader.read(topDocs, authorizer, searcher.getIndexSearcher().getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults(), maxAttributeChars); final long readRecordsNanos = System.nanoTime() - finishSearch; @@ -133,7 +155,7 @@ public class IndexSearch { return sqr; } finally { if ( searcher != null ) { - indexManager.returnIndexSearcher(indexDirectory, searcher); + indexManager.returnIndexSearcher(searcher); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java index f725208..a0be319 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java @@ -19,6 +19,7 @@ package org.apache.nifi.provenance.lucene; import java.io.IOException; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.lucene.document.Document; @@ -28,23 +29,22 @@ import org.apache.lucene.document.LongField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexWriter; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.provenance.PersistentProvenanceRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.search.SearchableField; public class IndexingAction { - private final Set<SearchableField> nonAttributeSearchableFields; - private final Set<SearchableField> attributeSearchableFields; + private final Set<SearchableField> searchableEventFields; + private final Set<SearchableField> searchableAttributeFields; - public IndexingAction(final PersistentProvenanceRepository repo) { - attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableAttributes())); - nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableFields())); + public IndexingAction(final List<SearchableField> searchableEventFields, final List<SearchableField> searchableAttributes) { + this.searchableEventFields = Collections.unmodifiableSet(new HashSet<>(searchableEventFields)); + this.searchableAttributeFields = Collections.unmodifiableSet(new HashSet<>(searchableAttributes)); } private void addField(final Document doc, final SearchableField field, final String value, final Store store) { - if (value == null || (!field.isAttribute() && !nonAttributeSearchableFields.contains(field))) { + if (value == null || (!field.isAttribute() && !searchableEventFields.contains(field))) { return; } @@ -67,7 +67,7 @@ public class IndexingAction { addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO); addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO); - for (final SearchableField searchableField : attributeSearchableFields) { + for (final SearchableField searchableField : searchableAttributeFields) { addField(doc, searchableField, LuceneUtil.truncateIndexField(record.getAttribute(searchableField.getSearchableFieldName())), Store.NO); } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index 1b13504..2388483 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -25,18 +25,15 @@ import java.util.Collection; import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; -import org.apache.nifi.provenance.PersistentProvenanceRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.SearchableFields; -import org.apache.nifi.provenance.authorization.AuthorizationCheck; +import org.apache.nifi.provenance.index.EventIndexSearcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +43,8 @@ public class LineageQuery { public static final int MAX_LINEAGE_UUIDS = 100; private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); - public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory, - final String lineageIdentifier, final Collection<String> flowFileUuids, final int maxAttributeChars) throws IOException { + public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final IndexManager indexManager, final File indexDirectory, + final String lineageIdentifier, final Collection<String> flowFileUuids, final DocumentToEventConverter docsToEventConverter) throws IOException { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); } @@ -56,7 +53,7 @@ public class LineageQuery { throw new IllegalArgumentException("Must specify either Lineage Identifier or FlowFile UUIDs to compute lineage"); } - final IndexSearcher searcher; + final EventIndexSearcher searcher; try { searcher = indexManager.borrowIndexSearcher(indexDirectory); try { @@ -75,16 +72,10 @@ public class LineageQuery { final long searchStart = System.nanoTime(); logger.debug("Searching {} for {}", indexDirectory, flowFileIdQuery); - final TopDocs uuidQueryTopDocs = searcher.search(flowFileIdQuery, MAX_QUERY_RESULTS); + final TopDocs uuidQueryTopDocs = searcher.getIndexSearcher().search(flowFileIdQuery, MAX_QUERY_RESULTS); final long searchEnd = System.nanoTime(); - // Always authorized. We do this because we need to pull back the event, regardless of whether or not - // the user is truly authorized, because instead of ignoring unauthorized events, we want to replace them. - final AuthorizationCheck authCheck = event -> true; - - final DocsReader docsReader = new DocsReader(); - final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, authCheck, searcher.getIndexReader(), repo.getAllLogFiles(), - new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars); + final Set<ProvenanceEventRecord> recs = docsToEventConverter.convert(uuidQueryTopDocs, searcher.getIndexSearcher().getIndexReader()); final long readDocsEnd = System.nanoTime(); logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis", @@ -92,7 +83,7 @@ public class LineageQuery { return recs; } finally { - indexManager.returnIndexSearcher(indexDirectory, searcher); + indexManager.returnIndexSearcher(searcher); } } catch (final FileNotFoundException fnfe) { // nothing has been indexed yet, or the data has already aged off http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java new file mode 100644 index 0000000..07b9167 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.lucene; + +import java.io.Closeable; +import java.io.File; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LuceneEventIndexSearcher implements EventIndexSearcher { + private static final Logger logger = LoggerFactory.getLogger(LuceneEventIndexSearcher.class); + + private final IndexSearcher indexSearcher; + private final File indexDirectory; + private final Directory directory; + private final DirectoryReader directoryReader; + + // guarded by synchronizing on 'this' + private int usageCounter = 0; + private boolean closed = false; + + public LuceneEventIndexSearcher(final IndexSearcher indexSearcher, final File indexDirectory, final Directory directory, final DirectoryReader directoryReader) { + this.indexSearcher = indexSearcher; + this.indexDirectory = indexDirectory; + this.directory = directory; + this.directoryReader = directoryReader; + } + + @Override + public IndexSearcher getIndexSearcher() { + return indexSearcher; + } + + @Override + public File getIndexDirectory() { + return indexDirectory; + } + + @Override + public synchronized void close() { + closed = true; + if (usageCounter == 0) { + closeQuietly(directoryReader); + closeQuietly(directory); + } + } + + public synchronized void incrementUsageCounter() { + usageCounter++; + } + + public synchronized void decrementUsageCounter() { + usageCounter--; + if (usageCounter == 0 && closed) { + closeQuietly(directoryReader); + closeQuietly(directory); + } + } + + private void closeQuietly(final Closeable closeable) { + if (closeable == null) { + return; + } + + try { + closeable.close(); + } catch (final Exception e) { + logger.warn("Failed to close {} due to {}", closeable, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java new file mode 100644 index 0000000..db8c528 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.lucene; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.nifi.provenance.index.EventIndexWriter; + +public class LuceneEventIndexWriter implements EventIndexWriter { + private final IndexWriter indexWriter; + private final File directory; + private final long maxCommitNanos; + + private final AtomicReference<CommitStats> commitStats = new AtomicReference<>(); + private final AtomicLong totalIndexed = new AtomicLong(0L); + private final AtomicLong lastCommitTotalIndexed = new AtomicLong(0L); + + public LuceneEventIndexWriter(final IndexWriter indexWriter, final File directory) { + this(indexWriter, directory, TimeUnit.SECONDS.toNanos(30L)); + } + + public LuceneEventIndexWriter(final IndexWriter indexWriter, final File directory, final long maxCommitNanos) { + this.indexWriter = indexWriter; + this.directory = directory; + this.maxCommitNanos = maxCommitNanos; + + commitStats.set(new CommitStats(0, System.nanoTime() + maxCommitNanos)); + } + + @Override + public void close() throws IOException { + indexWriter.close(); + } + + @Override + public boolean index(final Document document, final int commitThreshold) throws IOException { + return index(Collections.singletonList(document), commitThreshold); + } + + @Override + public boolean index(List<Document> documents, final int commitThreshold) throws IOException { + if (documents.isEmpty()) { + return false; + } + + final int numDocs = documents.size(); + indexWriter.addDocuments(documents); + totalIndexed.addAndGet(numDocs); + + boolean updated = false; + while (!updated) { + final CommitStats stats = commitStats.get(); + CommitStats updatedStats = new CommitStats(stats.getIndexedSinceCommit() + numDocs, stats.getNextCommitTimestamp()); + + if (updatedStats.getIndexedSinceCommit() >= commitThreshold || System.nanoTime() >= updatedStats.getNextCommitTimestamp()) { + updatedStats = new CommitStats(0, System.nanoTime() + maxCommitNanos); + updated = commitStats.compareAndSet(stats, updatedStats); + if (updated) { + return true; + } + } else { + updated = commitStats.compareAndSet(stats, updatedStats); + } + } + + return false; + } + + @Override + public File getDirectory() { + return directory; + } + + @Override + public long commit() throws IOException { + final long lastCommitCount = lastCommitTotalIndexed.get(); + final long currentCommitCount = totalIndexed.get(); + indexWriter.commit(); + commitStats.set(new CommitStats(0, System.nanoTime() + maxCommitNanos)); + lastCommitTotalIndexed.set(currentCommitCount); + return currentCommitCount - lastCommitCount; + } + + @Override + public int getEventsIndexedSinceCommit() { + return commitStats.get().getIndexedSinceCommit(); + } + + @Override + public long getEventsIndexed() { + return totalIndexed.get(); + } + + @Override + public IndexWriter getIndexWriter() { + return indexWriter; + } + + @Override + public String toString() { + return "LuceneEventIndexWriter[dir=" + directory + "]"; + } + + private static class CommitStats { + private final long nextCommitTimestamp; + private final int indexedSinceCommit; + + public CommitStats(final int indexedCount, final long nextCommitTime) { + this.nextCommitTimestamp = nextCommitTime; + this.indexedSinceCommit = indexedCount; + } + + public long getNextCommitTimestamp() { + return nextCommitTimestamp; + } + + public int getIndexedSinceCommit() { + return indexedSinceCommit; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java index 81816d2..b0b01e5 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java @@ -24,155 +24,306 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.EventIndexWriter; +import org.apache.nifi.provenance.util.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleIndexManager implements IndexManager { private static final Logger logger = LoggerFactory.getLogger(SimpleIndexManager.class); - private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>(); - private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); - - private final ExecutorService searchExecutor = Executors.newCachedThreadPool(); + private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); // guarded by synchronizing on map itself + private final ExecutorService searchExecutor; + private final RepositoryConfiguration repoConfig; + public SimpleIndexManager(final RepositoryConfiguration repoConfig) { + this.repoConfig = repoConfig; + this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index")); + } @Override public void close() throws IOException { logger.debug("Shutting down SimpleIndexManager search executor"); - this.searchExecutor.shutdown(); + + searchExecutor.shutdown(); try { - if (!this.searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - this.searchExecutor.shutdownNow(); + if (!searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + searchExecutor.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - this.searchExecutor.shutdownNow(); + searchExecutor.shutdownNow(); } } @Override - public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { - logger.debug("Creating index searcher for {}", indexDir); - final Directory directory = FSDirectory.open(indexDir); - final DirectoryReader directoryReader = DirectoryReader.open(directory); - final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor); + public EventIndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { + final File absoluteFile = indexDir.getAbsoluteFile(); + + final IndexWriterCount writerCount; + synchronized (writerCounts) { + writerCount = writerCounts.remove(absoluteFile); - final List<Closeable> closeableList = new ArrayList<>(2); - closeableList.add(directoryReader); - closeableList.add(directory); - closeables.put(searcher, closeableList); - logger.debug("Created index searcher {} for {}", searcher, indexDir); + if (writerCount != null) { + // Increment writer count and create an Index Searcher based on the writer + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), writerCount.getAnalyzer(), + writerCount.getDirectory(), writerCount.getCount() + 1, writerCount.isCloseableWhenUnused())); + } + } + + final DirectoryReader directoryReader; + if (writerCount == null) { + logger.trace("Creating index searcher for {}", indexDir); + final Directory directory = FSDirectory.open(indexDir); + directoryReader = DirectoryReader.open(directory); + } else { + final EventIndexWriter eventIndexWriter = writerCount.getWriter(); + directoryReader = DirectoryReader.open(eventIndexWriter.getIndexWriter(), false); + } - return searcher; + final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor); + + logger.trace("Created index searcher {} for {}", searcher, indexDir); + return new LuceneEventIndexSearcher(searcher, indexDir, null, directoryReader); } @Override - public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { + public void returnIndexSearcher(final EventIndexSearcher searcher) { + final File indexDirectory = searcher.getIndexDirectory(); logger.debug("Closing index searcher {} for {}", searcher, indexDirectory); + closeQuietly(searcher); + logger.debug("Closed index searcher {}", searcher); + + final IndexWriterCount count; + boolean closeWriter = false; + synchronized (writerCounts) { + final File absoluteFile = searcher.getIndexDirectory().getAbsoluteFile(); + count = writerCounts.get(absoluteFile); + if (count == null) { + logger.debug("Returning EventIndexSearcher for {}; there is no active writer for this searcher so will not decrement writerCounts", absoluteFile); + return; + } - final List<Closeable> closeableList = closeables.get(searcher); - if (closeableList != null) { - for (final Closeable closeable : closeableList) { - closeQuietly(closeable); + if (count.getCount() <= 1) { + // we are finished with this writer. + final boolean close = count.isCloseableWhenUnused(); + logger.debug("Decrementing count for Index Writer for {} to {}{}", indexDirectory, count.getCount() - 1, close ? "; closing writer" : ""); + + if (close) { + writerCounts.remove(absoluteFile); + closeWriter = true; + } else { + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), + count.getCount() - 1, count.isCloseableWhenUnused())); + } + } else { + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), + count.getCount() - 1, count.isCloseableWhenUnused())); } } - logger.debug("Closed index searcher {}", searcher); + if (closeWriter) { + try { + close(count); + } catch (final Exception e) { + logger.warn("Failed to close Index Writer {} due to {}", count.getWriter(), e.toString(), e); + } + } } @Override - public void removeIndex(final File indexDirectory) { + public boolean removeIndex(final File indexDirectory) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.debug("Attempting to remove index {} from SimpleIndexManager", absoluteFile); + + IndexWriterCount writerCount; + synchronized (writerCounts) { + writerCount = writerCounts.remove(absoluteFile); + if (writerCount == null) { + logger.debug("Allowing removal of index {} because there is no IndexWriterCount for this directory", absoluteFile); + return true; // return true since directory has no writers + } + + if (writerCount.getCount() > 0) { + logger.debug("Not allowing removal of index {} because the active writer count for this directory is {}", absoluteFile, writerCount.getCount()); + writerCounts.put(absoluteFile, writerCount); + return false; + } + } + + try { + logger.debug("Removing index {} from SimpleIndexManager and closing the writer", absoluteFile); + + close(writerCount); + } catch (final Exception e) { + logger.error("Failed to close Index Writer for {} while removing Index from the repository;" + + "this directory may need to be cleaned up manually.", e); + } + + return true; } - @Override - public synchronized IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.trace("Borrowing index writer for {}", indexingDirectory); + private IndexWriterCount createWriter(final File indexDirectory) throws IOException { + final List<Closeable> closeables = new ArrayList<>(); + final Directory directory = FSDirectory.open(indexDirectory); + closeables.add(directory); - IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if (writerCount == null) { - final List<Closeable> closeables = new ArrayList<>(); - final Directory directory = FSDirectory.open(indexingDirectory); - closeables.add(directory); + try { + final Analyzer analyzer = new StandardAnalyzer(); + closeables.add(analyzer); - try { - final Analyzer analyzer = new StandardAnalyzer(); - closeables.add(analyzer); - - final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); - config.setWriteLockTimeout(300000L); - - final IndexWriter indexWriter = new IndexWriter(directory, config); - writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1); - logger.debug("Providing new index writer for {}", indexingDirectory); - } catch (final IOException ioe) { - for (final Closeable closeable : closeables) { - try { - closeable.close(); - } catch (final IOException ioe2) { - ioe.addSuppressed(ioe2); - } + final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); + + final ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler(); + final int mergeThreads = repoConfig.getConcurrentMergeThreads(); + mergeScheduler.setMaxMergesAndThreads(mergeThreads, mergeThreads); + config.setMergeScheduler(mergeScheduler); + + final IndexWriter indexWriter = new IndexWriter(directory, config); + final EventIndexWriter eventIndexWriter = new LuceneEventIndexWriter(indexWriter, indexDirectory); + + final IndexWriterCount writerCount = new IndexWriterCount(eventIndexWriter, analyzer, directory, 1, false); + logger.debug("Providing new index writer for {}", indexDirectory); + return writerCount; + } catch (final IOException ioe) { + for (final Closeable closeable : closeables) { + try { + closeable.close(); + } catch (final IOException ioe2) { + ioe.addSuppressed(ioe2); } + } + + throw ioe; + } + } - throw ioe; + @Override + public EventIndexWriter borrowIndexWriter(final File indexDirectory) throws IOException { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.trace("Borrowing index writer for {}", indexDirectory); + + IndexWriterCount writerCount = null; + synchronized (writerCounts) { + writerCount = writerCounts.get(absoluteFile); + + if (writerCount == null) { + writerCount = createWriter(indexDirectory); + writerCounts.put(absoluteFile, writerCount); + } else { + logger.trace("Providing existing index writer for {} and incrementing count to {}", indexDirectory, writerCount.getCount() + 1); + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1, writerCount.isCloseableWhenUnused())); } - writerCounts.put(absoluteFile, writerCount); - } else { - logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); - writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + if (writerCounts.size() > repoConfig.getStorageDirectories().size() * 2) { + logger.debug("Index Writer returned; writer count map now has size {}; writerCount = {}; full writerCounts map = {}", + writerCounts.size(), writerCount, writerCounts); + } } return writerCount.getWriter(); } + @Override + public void returnIndexWriter(final EventIndexWriter writer) { + returnIndexWriter(writer, true, true); + } @Override - public synchronized void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory); + public void returnIndexWriter(final EventIndexWriter writer, final boolean commit, final boolean isCloseable) { + final File indexDirectory = writer.getDirectory(); + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.trace("Returning Index Writer for {} to IndexManager", indexDirectory); + + boolean unused = false; + IndexWriterCount count = null; + boolean close = isCloseable; + try { + synchronized (writerCounts) { + count = writerCounts.get(absoluteFile); + if (count != null && count.isCloseableWhenUnused()) { + close = true; + } - final IndexWriterCount count = writerCounts.remove(absoluteFile); + if (count == null) { + logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " + + "This could potentially lead to a resource leak", writer, indexDirectory); + writer.close(); + } else if (count.getCount() <= 1) { + // we are finished with this writer. + unused = true; + if (close) { + logger.debug("Decrementing count for Index Writer for {} to {}; closing writer", indexDirectory, count.getCount() - 1); + writerCounts.remove(absoluteFile); + } else { + logger.trace("Decrementing count for Index Writer for {} to {}", indexDirectory, count.getCount() - 1); + + // If writer is not closeable, then we need to decrement its count. + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), + count.getCount() - 1, close)); + } + } else { + // decrement the count. + if (close) { + logger.debug("Decrementing count for Index Writer for {} to {} and marking as closeable when no longer in use", indexDirectory, count.getCount() - 1); + } else { + logger.trace("Decrementing count for Index Writer for {} to {}", indexDirectory, count.getCount() - 1); + } - try { - if (count == null) { - logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " - + "This could potentially lead to a resource leak", writer, indexingDirectory); - writer.close(); - } else if (count.getCount() <= 1) { - // we are finished with this writer. - logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1); + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), + count.getDirectory(), count.getCount() - 1, close)); + } + + if (writerCounts.size() > repoConfig.getStorageDirectories().size() * 2) { + logger.debug("Index Writer returned; writer count map now has size {}; writer = {}, commit = {}, isCloseable = {}, writerCount = {}; full writerCounts Map = {}", + writerCounts.size(), writer, commit, isCloseable, count, writerCounts); + } + } + + // Committing and closing are very expensive, so we want to do those outside of the synchronized block. + // So we use an 'unused' variable to tell us whether or not we should actually do so. + if (unused) { try { - writer.commit(); + if (commit) { + writer.commit(); + } } finally { - count.close(); + if (close) { + logger.info("Index Writer for {} has been returned to Index Manager and is no longer in use. Closing Index Writer", indexDirectory); + close(count); + } } - } else { - // decrement the count. - logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); - writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); - } - } catch (final IOException ioe) { - logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); - if (logger.isDebugEnabled()) { - logger.warn("", ioe); } + } catch (final Exception e) { + logger.warn("Failed to close Index Writer {} due to {}", writer, e.toString(), e); + } + } + + // This method exists solely for unit testing purposes. + protected void close(final IndexWriterCount count) throws IOException { + count.close(); + } + + protected int getWriterCount() { + synchronized (writerCounts) { + return writerCounts.size(); } } @@ -191,17 +342,23 @@ public class SimpleIndexManager implements IndexManager { } - private static class IndexWriterCount implements Closeable { - private final IndexWriter writer; + protected static class IndexWriterCount implements Closeable { + private final EventIndexWriter writer; private final Analyzer analyzer; private final Directory directory; private final int count; + private final boolean closeableWhenUnused; - public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { + public IndexWriterCount(final EventIndexWriter writer, final Analyzer analyzer, final Directory directory, final int count, final boolean closeableWhenUnused) { this.writer = writer; this.analyzer = analyzer; this.directory = directory; this.count = count; + this.closeableWhenUnused = closeableWhenUnused; + } + + public boolean isCloseableWhenUnused() { + return closeableWhenUnused; } public Analyzer getAnalyzer() { @@ -212,7 +369,7 @@ public class SimpleIndexManager implements IndexManager { return directory; } - public IndexWriter getWriter() { + public EventIndexWriter getWriter() { return writer; } @@ -224,5 +381,10 @@ public class SimpleIndexManager implements IndexManager { public void close() throws IOException { closeQuietly(writer, analyzer, directory); } + + @Override + public String toString() { + return "IndexWriterCount[count=" + count + ", writer=" + writer + ", closeableWhenUnused=" + closeableWhenUnused + "]"; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java new file mode 100644 index 0000000..d6f50dd --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.schema; + +public class EventFieldNames { + public static final String EVENT_IDENTIFIER = "Event ID"; + public static final String EVENT_TYPE = "Event Type"; + public static final String EVENT_TIME = "Event Time"; + public static final String FLOWFILE_ENTRY_DATE = "FlowFile Entry Date"; + public static final String EVENT_DURATION = "Event Duration"; + public static final String LINEAGE_START_DATE = "Lineage Start Date"; + public static final String COMPONENT_ID = "Component ID"; + public static final String COMPONENT_TYPE = "Component Type"; + public static final String FLOWFILE_UUID = "FlowFile UUID"; + public static final String EVENT_DETAILS = "Event Details"; + public static final String SOURCE_QUEUE_IDENTIFIER = "Source Queue Identifier"; + public static final String CONTENT_CLAIM = "Content Claim"; + public static final String PREVIOUS_CONTENT_CLAIM = "Previous Content Claim"; + public static final String EXPLICIT_CURRENT_CONTENT_CLAIM = "Full Current Content Claim"; + public static final String PARENT_UUIDS = "Parent UUIDs"; + public static final String CHILD_UUIDS = "Child UUIDs"; + + public static final String ATTRIBUTE_NAME = "Attribute Name"; + public static final String ATTRIBUTE_VALUE = "Attribute Value"; + public static final String PREVIOUS_ATTRIBUTES = "Previous Attributes"; + public static final String UPDATED_ATTRIBUTES = "Updated Attributes"; + + public static final String CONTENT_CLAIM_CONTAINER = "Content Claim Container"; + public static final String CONTENT_CLAIM_SECTION = "Content Claim Section"; + public static final String CONTENT_CLAIM_IDENTIFIER = "Content Claim Identifier"; + public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset"; + public static final String CONTENT_CLAIM_SIZE = "Content Claim Size"; + + public static final String TRANSIT_URI = "Transit URI"; + public static final String SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = "Source System FlowFile Identifier"; + public static final String ALTERNATE_IDENTIFIER = "Alternate Identifier"; + public static final String RELATIONSHIP = "Relationship"; + + // For Lookup Tables + public static final String NO_VALUE = "No Value"; + public static final String EXPLICIT_VALUE = "Explicit Value"; + public static final String LOOKUP_VALUE = "Lookup Value"; + public static final String UNCHANGED_VALUE = "Unchanged"; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java new file mode 100644 index 0000000..1c35c5a --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.schema; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.repository.schema.FieldType; +import org.apache.nifi.repository.schema.RecordField; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.Repetition; +import org.apache.nifi.repository.schema.SimpleRecordField; + +public class EventIdFirstHeaderSchema { + + public static RecordSchema SCHEMA = buildSchema(); + + public static final class FieldNames { + public static final String FIRST_EVENT_ID = "First Event ID"; + public static final String TIMESTAMP_OFFSET = "Timestamp Offset"; + public static final String COMPONENT_IDS = "Component Identifiers"; + public static final String COMPONENT_TYPES = "Component Types"; + public static final String QUEUE_IDS = "Queue Identifiers"; + public static final String EVENT_TYPES = "Event Types"; + } + + private static RecordSchema buildSchema() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new SimpleRecordField(FieldNames.FIRST_EVENT_ID, FieldType.LONG, Repetition.EXACTLY_ONE)); + fields.add(new SimpleRecordField(FieldNames.TIMESTAMP_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE)); + fields.add(new SimpleRecordField(FieldNames.COMPONENT_IDS, FieldType.STRING, Repetition.ZERO_OR_MORE)); + fields.add(new SimpleRecordField(FieldNames.COMPONENT_TYPES, FieldType.STRING, Repetition.ZERO_OR_MORE)); + fields.add(new SimpleRecordField(FieldNames.QUEUE_IDS, FieldType.STRING, Repetition.ZERO_OR_MORE)); + fields.add(new SimpleRecordField(FieldNames.EVENT_TYPES, FieldType.STRING, Repetition.ZERO_OR_MORE)); + return new RecordSchema(fields); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java index c9e7dc8..8c82b11 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java @@ -69,47 +69,47 @@ public class EventRecord implements Record { @Override public Object getFieldValue(final String fieldName) { switch (fieldName) { - case EventRecordFields.Names.EVENT_IDENTIFIER: + case EventFieldNames.EVENT_IDENTIFIER: return eventId; - case EventRecordFields.Names.ALTERNATE_IDENTIFIER: + case EventFieldNames.ALTERNATE_IDENTIFIER: return event.getAlternateIdentifierUri(); - case EventRecordFields.Names.CHILD_UUIDS: + case EventFieldNames.CHILD_UUIDS: return event.getChildUuids(); - case EventRecordFields.Names.COMPONENT_ID: + case EventFieldNames.COMPONENT_ID: return event.getComponentId(); - case EventRecordFields.Names.COMPONENT_TYPE: + case EventFieldNames.COMPONENT_TYPE: return event.getComponentType(); - case EventRecordFields.Names.CONTENT_CLAIM: + case EventFieldNames.CONTENT_CLAIM: return contentClaimRecord; - case EventRecordFields.Names.EVENT_DETAILS: + case EventFieldNames.EVENT_DETAILS: return event.getDetails(); - case EventRecordFields.Names.EVENT_DURATION: + case EventFieldNames.EVENT_DURATION: return event.getEventDuration(); - case EventRecordFields.Names.EVENT_TIME: + case EventFieldNames.EVENT_TIME: return event.getEventTime(); - case EventRecordFields.Names.EVENT_TYPE: + case EventFieldNames.EVENT_TYPE: return event.getEventType().name(); - case EventRecordFields.Names.FLOWFILE_ENTRY_DATE: + case EventFieldNames.FLOWFILE_ENTRY_DATE: return event.getFlowFileEntryDate(); - case EventRecordFields.Names.FLOWFILE_UUID: + case EventFieldNames.FLOWFILE_UUID: return event.getFlowFileUuid(); - case EventRecordFields.Names.LINEAGE_START_DATE: + case EventFieldNames.LINEAGE_START_DATE: return event.getLineageStartDate(); - case EventRecordFields.Names.PARENT_UUIDS: + case EventFieldNames.PARENT_UUIDS: return event.getParentUuids(); - case EventRecordFields.Names.PREVIOUS_ATTRIBUTES: + case EventFieldNames.PREVIOUS_ATTRIBUTES: return event.getPreviousAttributes(); - case EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM: + case EventFieldNames.PREVIOUS_CONTENT_CLAIM: return previousClaimRecord; - case EventRecordFields.Names.RELATIONSHIP: + case EventFieldNames.RELATIONSHIP: return event.getRelationship(); - case EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER: + case EventFieldNames.SOURCE_QUEUE_IDENTIFIER: return event.getSourceQueueIdentifier(); - case EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER: + case EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER: return event.getSourceSystemFlowFileIdentifier(); - case EventRecordFields.Names.TRANSIT_URI: + case EventFieldNames.TRANSIT_URI: return event.getTransitUri(); - case EventRecordFields.Names.UPDATED_ATTRIBUTES: + case EventFieldNames.UPDATED_ATTRIBUTES: return event.getUpdatedAttributes(); } @@ -119,48 +119,52 @@ public class EventRecord implements Record { @SuppressWarnings("unchecked") public static StandardProvenanceEventRecord getEvent(final Record record, final String storageFilename, final long storageByteOffset, final int maxAttributeLength) { final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder(); - builder.setAlternateIdentifierUri((String) record.getFieldValue(EventRecordFields.Names.ALTERNATE_IDENTIFIER)); - builder.setChildUuids((List<String>) record.getFieldValue(EventRecordFields.Names.CHILD_UUIDS)); - builder.setComponentId((String) record.getFieldValue(EventRecordFields.Names.COMPONENT_ID)); - builder.setComponentType((String) record.getFieldValue(EventRecordFields.Names.COMPONENT_TYPE)); - builder.setDetails((String) record.getFieldValue(EventRecordFields.Names.EVENT_DETAILS)); - builder.setEventDuration((Long) record.getFieldValue(EventRecordFields.Names.EVENT_DURATION)); - builder.setEventTime((Long) record.getFieldValue(EventRecordFields.Names.EVENT_TIME)); - builder.setEventType(ProvenanceEventType.valueOf((String) record.getFieldValue(EventRecordFields.Names.EVENT_TYPE))); - builder.setFlowFileEntryDate((Long) record.getFieldValue(EventRecordFields.Names.FLOWFILE_ENTRY_DATE)); - builder.setFlowFileUUID((String) record.getFieldValue(EventRecordFields.Names.FLOWFILE_UUID)); - builder.setLineageStartDate((Long) record.getFieldValue(EventRecordFields.Names.LINEAGE_START_DATE)); - builder.setParentUuids((List<String>) record.getFieldValue(EventRecordFields.Names.PARENT_UUIDS)); - builder.setPreviousAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventRecordFields.Names.PREVIOUS_ATTRIBUTES), maxAttributeLength)); - builder.setEventId((Long) record.getFieldValue(EventRecordFields.Names.EVENT_IDENTIFIER)); - builder.setRelationship((String) record.getFieldValue(EventRecordFields.Names.RELATIONSHIP)); - builder.setSourceQueueIdentifier((String) record.getFieldValue(EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER)); - builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER)); - builder.setTransitUri((String) record.getFieldValue(EventRecordFields.Names.TRANSIT_URI)); - builder.setUpdatedAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventRecordFields.Names.UPDATED_ATTRIBUTES), maxAttributeLength)); + builder.setAlternateIdentifierUri((String) record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER)); + builder.setChildUuids((List<String>) record.getFieldValue(EventFieldNames.CHILD_UUIDS)); + builder.setComponentId((String) record.getFieldValue(EventFieldNames.COMPONENT_ID)); + builder.setComponentType((String) record.getFieldValue(EventFieldNames.COMPONENT_TYPE)); + builder.setDetails((String) record.getFieldValue(EventFieldNames.EVENT_DETAILS)); + builder.setEventDuration((Long) record.getFieldValue(EventFieldNames.EVENT_DURATION)); + builder.setEventTime((Long) record.getFieldValue(EventFieldNames.EVENT_TIME)); + builder.setEventType(ProvenanceEventType.valueOf((String) record.getFieldValue(EventFieldNames.EVENT_TYPE))); + builder.setFlowFileEntryDate((Long) record.getFieldValue(EventFieldNames.FLOWFILE_ENTRY_DATE)); + builder.setFlowFileUUID((String) record.getFieldValue(EventFieldNames.FLOWFILE_UUID)); + builder.setLineageStartDate((Long) record.getFieldValue(EventFieldNames.LINEAGE_START_DATE)); + builder.setParentUuids((List<String>) record.getFieldValue(EventFieldNames.PARENT_UUIDS)); + builder.setPreviousAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength)); + builder.setRelationship((String) record.getFieldValue(EventFieldNames.RELATIONSHIP)); + builder.setSourceQueueIdentifier((String) record.getFieldValue(EventFieldNames.SOURCE_QUEUE_IDENTIFIER)); + builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER)); + builder.setTransitUri((String) record.getFieldValue(EventFieldNames.TRANSIT_URI)); + builder.setUpdatedAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength)); + + final Long eventId = (Long) record.getFieldValue(EventFieldNames.EVENT_IDENTIFIER); + if (eventId != null) { + builder.setEventId(eventId); + } builder.setStorageLocation(storageFilename, storageByteOffset); - final Record currentClaimRecord = (Record) record.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM); + final Record currentClaimRecord = (Record) record.getFieldValue(EventFieldNames.CONTENT_CLAIM); if (currentClaimRecord == null) { builder.setCurrentContentClaim(null, null, null, null, 0L); } else { builder.setCurrentContentClaim( - (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER), - (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION), - (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER), - (Long) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET), - (Long) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE)); + (String) currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_CONTAINER), + (String) currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SECTION), + (String) currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_IDENTIFIER), + (Long) currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_OFFSET), + (Long) currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE)); } - final Record previousClaimRecord = (Record) record.getFieldValue(EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM); + final Record previousClaimRecord = (Record) record.getFieldValue(EventFieldNames.PREVIOUS_CONTENT_CLAIM); if (previousClaimRecord != null) { builder.setPreviousContentClaim( - (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER), - (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION), - (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER), - (Long) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET), - (Long) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE)); + (String) previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_CONTAINER), + (String) previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SECTION), + (String) previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_IDENTIFIER), + (Long) previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_OFFSET), + (Long) previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE)); } return builder.build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java index 0582dd8..3d79ab4 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java @@ -29,82 +29,48 @@ import org.apache.nifi.repository.schema.SimpleRecordField; public class EventRecordFields { - public static class Names { - public static final String EVENT_IDENTIFIER = "Event ID"; - public static final String EVENT_TYPE = "Event Type"; - public static final String EVENT_TIME = "Event Time"; - public static final String FLOWFILE_ENTRY_DATE = "FlowFile Entry Date"; - public static final String EVENT_DURATION = "Event Duration"; - public static final String LINEAGE_START_DATE = "Lineage Start Date"; - public static final String COMPONENT_ID = "Component ID"; - public static final String COMPONENT_TYPE = "Component Type"; - public static final String FLOWFILE_UUID = "FlowFile UUID"; - public static final String EVENT_DETAILS = "Event Details"; - public static final String SOURCE_QUEUE_IDENTIFIER = "Source Queue Identifier"; - public static final String CONTENT_CLAIM = "Content Claim"; - public static final String PREVIOUS_CONTENT_CLAIM = "Previous Content Claim"; - public static final String PARENT_UUIDS = "Parent UUIDs"; - public static final String CHILD_UUIDS = "Child UUIDs"; - - public static final String ATTRIBUTE_NAME = "Attribute Name"; - public static final String ATTRIBUTE_VALUE = "Attribute Value"; - public static final String PREVIOUS_ATTRIBUTES = "Previous Attributes"; - public static final String UPDATED_ATTRIBUTES = "Updated Attributes"; - - public static final String CONTENT_CLAIM_CONTAINER = "Content Claim Container"; - public static final String CONTENT_CLAIM_SECTION = "Content Claim Section"; - public static final String CONTENT_CLAIM_IDENTIFIER = "Content Claim Identifier"; - public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset"; - public static final String CONTENT_CLAIM_SIZE = "Content Claim Size"; - - public static final String TRANSIT_URI = "Transit URI"; - public static final String SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = "Source System FlowFile Identifier"; - public static final String ALTERNATE_IDENTIFIER = "Alternate Identifier"; - public static final String RELATIONSHIP = "Relationship"; - } - // General Event fields. - public static final RecordField RECORD_IDENTIFIER = new SimpleRecordField(Names.EVENT_IDENTIFIER, FieldType.LONG, EXACTLY_ONE); - public static final RecordField EVENT_TYPE = new SimpleRecordField(Names.EVENT_TYPE, FieldType.STRING, EXACTLY_ONE); - public static final RecordField EVENT_TIME = new SimpleRecordField(Names.EVENT_TIME, FieldType.LONG, EXACTLY_ONE); - public static final RecordField FLOWFILE_ENTRY_DATE = new SimpleRecordField(Names.FLOWFILE_ENTRY_DATE, FieldType.LONG, EXACTLY_ONE); - public static final RecordField EVENT_DURATION = new SimpleRecordField(Names.EVENT_DURATION, FieldType.LONG, EXACTLY_ONE); - public static final RecordField LINEAGE_START_DATE = new SimpleRecordField(Names.LINEAGE_START_DATE, FieldType.LONG, EXACTLY_ONE); - public static final RecordField COMPONENT_ID = new SimpleRecordField(Names.COMPONENT_ID, FieldType.STRING, ZERO_OR_ONE); - public static final RecordField COMPONENT_TYPE = new SimpleRecordField(Names.COMPONENT_TYPE, FieldType.STRING, ZERO_OR_ONE); - public static final RecordField FLOWFILE_UUID = new SimpleRecordField(Names.FLOWFILE_UUID, FieldType.STRING, EXACTLY_ONE); - public static final RecordField EVENT_DETAILS = new SimpleRecordField(Names.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE); - public static final RecordField SOURCE_QUEUE_IDENTIFIER = new SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField RECORD_IDENTIFIER = new SimpleRecordField(EventFieldNames.EVENT_IDENTIFIER, FieldType.LONG, EXACTLY_ONE); + public static final RecordField EVENT_TYPE = new SimpleRecordField(EventFieldNames.EVENT_TYPE, FieldType.STRING, EXACTLY_ONE); + public static final RecordField EVENT_TIME = new SimpleRecordField(EventFieldNames.EVENT_TIME, FieldType.LONG, EXACTLY_ONE); + public static final RecordField FLOWFILE_ENTRY_DATE = new SimpleRecordField(EventFieldNames.FLOWFILE_ENTRY_DATE, FieldType.LONG, EXACTLY_ONE); + public static final RecordField EVENT_DURATION = new SimpleRecordField(EventFieldNames.EVENT_DURATION, FieldType.LONG, EXACTLY_ONE); + public static final RecordField LINEAGE_START_DATE = new SimpleRecordField(EventFieldNames.LINEAGE_START_DATE, FieldType.LONG, EXACTLY_ONE); + public static final RecordField COMPONENT_ID = new SimpleRecordField(EventFieldNames.COMPONENT_ID, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField COMPONENT_TYPE = new SimpleRecordField(EventFieldNames.COMPONENT_TYPE, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField FLOWFILE_UUID = new SimpleRecordField(EventFieldNames.FLOWFILE_UUID, FieldType.STRING, EXACTLY_ONE); + public static final RecordField EVENT_DETAILS = new SimpleRecordField(EventFieldNames.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField SOURCE_QUEUE_IDENTIFIER = new SimpleRecordField(EventFieldNames.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE); // Attributes - public static final RecordField ATTRIBUTE_NAME = new SimpleRecordField(Names.ATTRIBUTE_NAME, FieldType.LONG_STRING, EXACTLY_ONE); - public static final RecordField ATTRIBUTE_VALUE_REQUIRED = new SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, EXACTLY_ONE); - public static final RecordField ATTRIBUTE_VALUE_OPTIONAL = new SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, ZERO_OR_ONE); + public static final RecordField ATTRIBUTE_NAME = new SimpleRecordField(EventFieldNames.ATTRIBUTE_NAME, FieldType.LONG_STRING, EXACTLY_ONE); + public static final RecordField ATTRIBUTE_VALUE_REQUIRED = new SimpleRecordField(EventFieldNames.ATTRIBUTE_VALUE, FieldType.LONG_STRING, EXACTLY_ONE); + public static final RecordField ATTRIBUTE_VALUE_OPTIONAL = new SimpleRecordField(EventFieldNames.ATTRIBUTE_VALUE, FieldType.LONG_STRING, ZERO_OR_ONE); - public static final RecordField PREVIOUS_ATTRIBUTES = new MapRecordField(Names.PREVIOUS_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_REQUIRED, EXACTLY_ONE); - public static final RecordField UPDATED_ATTRIBUTES = new MapRecordField(Names.UPDATED_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_OPTIONAL, EXACTLY_ONE); + public static final RecordField PREVIOUS_ATTRIBUTES = new MapRecordField(EventFieldNames.PREVIOUS_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_REQUIRED, EXACTLY_ONE); + public static final RecordField UPDATED_ATTRIBUTES = new MapRecordField(EventFieldNames.UPDATED_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_OPTIONAL, EXACTLY_ONE); // Content Claims - public static final RecordField CONTENT_CLAIM_CONTAINER = new SimpleRecordField(Names.CONTENT_CLAIM_CONTAINER, FieldType.STRING, EXACTLY_ONE); - public static final RecordField CONTENT_CLAIM_SECTION = new SimpleRecordField(Names.CONTENT_CLAIM_SECTION, FieldType.STRING, EXACTLY_ONE); - public static final RecordField CONTENT_CLAIM_IDENTIFIER = new SimpleRecordField(Names.CONTENT_CLAIM_IDENTIFIER, FieldType.STRING, EXACTLY_ONE); - public static final RecordField CONTENT_CLAIM_OFFSET = new SimpleRecordField(Names.CONTENT_CLAIM_OFFSET, FieldType.LONG, EXACTLY_ONE); - public static final RecordField CONTENT_CLAIM_SIZE = new SimpleRecordField(Names.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE); - public static final RecordField CURRENT_CONTENT_CLAIM = new ComplexRecordField(Names.CONTENT_CLAIM, ZERO_OR_ONE, + public static final RecordField CONTENT_CLAIM_CONTAINER = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_CONTAINER, FieldType.STRING, EXACTLY_ONE); + public static final RecordField CONTENT_CLAIM_SECTION = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SECTION, FieldType.STRING, EXACTLY_ONE); + public static final RecordField CONTENT_CLAIM_IDENTIFIER = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_IDENTIFIER, FieldType.STRING, EXACTLY_ONE); + public static final RecordField CONTENT_CLAIM_OFFSET = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_OFFSET, FieldType.LONG, EXACTLY_ONE); + public static final RecordField CONTENT_CLAIM_SIZE = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE); + public static final RecordField CURRENT_CONTENT_CLAIM = new ComplexRecordField(EventFieldNames.CONTENT_CLAIM, ZERO_OR_ONE, CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE); - public static final RecordField PREVIOUS_CONTENT_CLAIM = new ComplexRecordField(Names.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE, + public static final RecordField PREVIOUS_CONTENT_CLAIM = new ComplexRecordField(EventFieldNames.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE, CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE); // EventType-Specific fields // for FORK, JOIN, CLONE, REPLAY - public static final RecordField PARENT_UUIDS = new SimpleRecordField(Names.PARENT_UUIDS, FieldType.STRING, ZERO_OR_MORE); - public static final RecordField CHILD_UUIDS = new SimpleRecordField(Names.CHILD_UUIDS, FieldType.STRING, ZERO_OR_MORE); + public static final RecordField PARENT_UUIDS = new SimpleRecordField(EventFieldNames.PARENT_UUIDS, FieldType.STRING, ZERO_OR_MORE); + public static final RecordField CHILD_UUIDS = new SimpleRecordField(EventFieldNames.CHILD_UUIDS, FieldType.STRING, ZERO_OR_MORE); // for SEND/RECEIVE/FETCH - public static final RecordField TRANSIT_URI = new SimpleRecordField(Names.TRANSIT_URI, FieldType.STRING, ZERO_OR_ONE); - public static final RecordField SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = new SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField TRANSIT_URI = new SimpleRecordField(EventFieldNames.TRANSIT_URI, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = new SimpleRecordField(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE); // for ADD_INFO - public static final RecordField ALTERNATE_IDENTIFIER = new SimpleRecordField(Names.ALTERNATE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE); - public static final RecordField RELATIONSHIP = new SimpleRecordField(Names.RELATIONSHIP, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField ALTERNATE_IDENTIFIER = new SimpleRecordField(EventFieldNames.ALTERNATE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE); + public static final RecordField RELATIONSHIP = new SimpleRecordField(EventFieldNames.RELATIONSHIP, FieldType.STRING, ZERO_OR_ONE); }