http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java new file mode 100644 index 0000000..2130e73 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.SearchableFieldParser; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.StorageLocation; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.index.QueryUtils; +import org.apache.nifi.provenance.journaling.journals.JournalReader; +import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; +import org.apache.nifi.provenance.journaling.partition.Partition; +import org.apache.nifi.provenance.journaling.partition.PartitionAction; +import org.apache.nifi.provenance.journaling.partition.PartitionManager; +import org.apache.nifi.provenance.journaling.partition.QueuingPartitionManager; +import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction; +import org.apache.nifi.provenance.journaling.toc.StandardTocReader; +import org.apache.nifi.provenance.journaling.toc.TocReader; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JournalingProvenanceRepository implements ProvenanceEventRepository { + private static final Logger logger = LoggerFactory.getLogger(JournalingProvenanceRepository.class); + + private final JournalingRepositoryConfig config; + private final PartitionManager partitionManager; + private final AtomicLong idGenerator = new AtomicLong(0L); + + private EventReporter eventReporter; // effectively final + private final ExecutorService executor; + + + public JournalingProvenanceRepository() throws IOException { + this(createConfig()); + } + + public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException { + this.config = config; + this.executor = Executors.newFixedThreadPool(config.getThreadPoolSize()); + this.partitionManager = new QueuingPartitionManager(config, executor); + } + + + private static JournalingRepositoryConfig createConfig() { + final NiFiProperties properties = NiFiProperties.getInstance(); + final Map<String, Path> storageDirectories = properties.getProvenanceRepositoryPaths(); + if (storageDirectories.isEmpty()) { + storageDirectories.put("provenance_repository", Paths.get("provenance_repository")); + } + final String storageTime = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours"); + final String storageSize = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB"); + final String rolloverTime = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins"); + final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); + final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); + final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); + final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); + + final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS); + final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue(); + final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS); + final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, DataUnit.B).longValue(); + + final boolean compressOnRollover = Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)); + final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); + final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); + + final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false")); + + final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true); + final List<SearchableField> searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false); + + // We always want to index the Event Time. + if (!searchableFields.contains(SearchableFields.EventTime)) { + searchableFields.add(SearchableFields.EventTime); + } + + final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); + + final Map<String, File> containers = new HashMap<>(storageDirectories.size()); + for ( final Map.Entry<String, Path> entry : storageDirectories.entrySet() ) { + containers.put(entry.getKey(), entry.getValue().toFile()); + } + config.setContainers(containers); + config.setCompressOnRollover(compressOnRollover); + config.setSearchableFields(searchableFields); + config.setSearchableAttributes(searchableAttributes); + config.setJournalCapacity(rolloverBytes); + config.setJournalRolloverPeriod(rolloverMillis, TimeUnit.MILLISECONDS); + config.setEventExpiration(storageMillis, TimeUnit.MILLISECONDS); + config.setMaxStorageCapacity(maxStorageBytes); + config.setThreadPoolSize(queryThreads); + config.setPartitionCount(journalCount); + + if (shardSize != null) { + config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue()); + } + + config.setAlwaysSync(alwaysSync); + + return config; + } + + @Override + public synchronized void initialize(final EventReporter eventReporter) throws IOException { + this.eventReporter = eventReporter; + } + + @Override + public ProvenanceEventBuilder eventBuilder() { + return new StandardProvenanceEventRecord.Builder(); + } + + @Override + public void registerEvent(final ProvenanceEventRecord event) throws IOException { + registerEvents(Collections.singleton(event)); + } + + @Override + public void registerEvents(final Collection<ProvenanceEventRecord> events) throws IOException { + partitionManager.withPartition(new VoidPartitionAction() { + @Override + public void perform(final Partition partition) throws IOException { + partition.registerEvents(events, idGenerator.getAndAdd(events.size())); + } + }, true); + } + + @Override + public StoredProvenanceEvent getEvent(final long id) throws IOException { + final List<StoredProvenanceEvent> events = getEvents(id, 1); + if ( events.isEmpty() ) { + return null; + } + + // We have to check the id of the event returned, because we are requesting up to 1 record + // starting with the given id. However, if that ID doesn't exist, we could get a record + // with a larger id. + final StoredProvenanceEvent event = events.get(0); + if ( event.getEventId() == id ) { + return event; + } + + return null; + } + + @Override + public List<StoredProvenanceEvent> getEvents(final long firstRecordId, final int maxRecords) throws IOException { + // Must generate query to determine the appropriate StorageLocation objects and then call + // getEvent(List<StorageLocation>) + final Set<List<JournaledStorageLocation>> resultSet = partitionManager.withEachPartition( + new PartitionAction<List<JournaledStorageLocation>>() { + @Override + public List<JournaledStorageLocation> perform(final Partition partition) throws IOException { + return partition.getEvents(firstRecordId, maxRecords); + } + }); + + final ArrayList<JournaledStorageLocation> locations = new ArrayList<>(maxRecords); + for ( final List<JournaledStorageLocation> list : resultSet ) { + for ( final JournaledStorageLocation location : list ) { + locations.add(location); + } + } + + Collections.sort(locations, new Comparator<JournaledStorageLocation>() { + @Override + public int compare(final JournaledStorageLocation o1, final JournaledStorageLocation o2) { + return Long.compare(o1.getEventId(), o2.getEventId()); + } + }); + + locations.trimToSize(); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + final List<StorageLocation> storageLocations = (List<StorageLocation>) ((List) locations); + return getEvents(storageLocations); + } + + @Override + public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException { + final List<StoredProvenanceEvent> storedEvents = getEvents(Collections.singletonList(location)); + return (storedEvents == null || storedEvents.isEmpty()) ? null : storedEvents.get(0); + } + + + + @Override + public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> locations) throws IOException { + // Group the locations by journal files because we want a single thread, at most, per journal file. + final Map<File, List<JournaledStorageLocation>> orderedLocations = QueryUtils.orderLocations(locations, config); + + // Go through each journal file and create a callable that can lookup the records for that journal file. + final List<Future<List<StoredProvenanceEvent>>> futures = new ArrayList<>(); + for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : orderedLocations.entrySet() ) { + final File journalFile = entry.getKey(); + final List<JournaledStorageLocation> locationsForFile = entry.getValue(); + + final Callable<List<StoredProvenanceEvent>> callable = new Callable<List<StoredProvenanceEvent>>() { + @Override + public List<StoredProvenanceEvent> call() throws Exception { + try(final TocReader tocReader = new StandardTocReader(new File(journalFile.getParentFile(), journalFile.getName() + ".toc")); + final JournalReader reader = new StandardJournalReader(journalFile)) + { + final List<StoredProvenanceEvent> storedEvents = new ArrayList<>(locationsForFile.size()); + + for ( final JournaledStorageLocation location : locationsForFile ) { + final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex()); + final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId()); + + storedEvents.add(new JournaledProvenanceEvent(event, location)); + } + + return storedEvents; + } + } + }; + + final Future<List<StoredProvenanceEvent>> future = executor.submit(callable); + futures.add(future); + } + + // Get all of the events from the futures, waiting for them to finish. + final Map<StorageLocation, StoredProvenanceEvent> locationToEventMap = new HashMap<>(locations.size()); + for ( final Future<List<StoredProvenanceEvent>> future : futures ) { + try { + final List<StoredProvenanceEvent> events = future.get(); + + // Map the location to the event, so that we can then re-order the events in the same order + // that the locations were passed to us. + for ( final StoredProvenanceEvent event : events ) { + locationToEventMap.put(event.getStorageLocation(), event); + } + } catch (final ExecutionException ee) { + final Throwable cause = ee.getCause(); + if ( cause instanceof IOException ) { + throw (IOException) cause; + } else { + throw new RuntimeException(cause); + } + } catch (final InterruptedException ie) { + throw new RuntimeException(ie); + } + } + + // Sort Events by the order of the provided locations. + final List<StoredProvenanceEvent> sortedEvents = new ArrayList<>(locations.size()); + for ( final StorageLocation location : locations ) { + final StoredProvenanceEvent event = locationToEventMap.get(location); + if ( event != null ) { + sortedEvents.add(event); + } + } + + return sortedEvents; + } + + + @Override + public Long getMaxEventId() throws IOException { + final Set<Long> maxIds = partitionManager.withEachPartition(new PartitionAction<Long>() { + @Override + public Long perform(final Partition partition) throws IOException { + return partition.getMaxEventId(); + } + }); + + Long maxId = null; + for ( final Long id : maxIds ) { + if ( id == null ) { + continue; + } + + if ( maxId == null || id > maxId ) { + maxId = id; + } + } + + return maxId; + } + + + @Override + public QuerySubmission submitQuery(final Query query) { + // TODO Auto-generated method stub + return null; + } + + @Override + public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ComputeLineageSubmission retrieveLineageSubmission(final String lineageIdentifier) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ComputeLineageSubmission submitExpandParents(final long eventId) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ComputeLineageSubmission submitExpandChildren(final long eventId) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void close() throws IOException { + partitionManager.shutdown(); + executor.shutdown(); + } + + @Override + public List<SearchableField> getSearchableFields() { + return config.getSearchableFields(); + } + + @Override + public List<SearchableField> getSearchableAttributes() { + return config.getSearchableAttributes(); + } + + @Override + public Long getEarliestEventTime() throws IOException { + // Get the earliest event timestamp for each partition + final Set<Long> earliestTimes = partitionManager.withEachPartition(new PartitionAction<Long>() { + @Override + public Long perform(final Partition partition) throws IOException { + return partition.getEarliestEventTime(); + } + }); + + // Find the latest timestamp for each of the "earliest" timestamps. + // This is a bit odd, but we're doing it for a good reason: + // The UI is going to show the earliest time available. Because we have a partitioned write-ahead + // log, if we just return the timestamp of the earliest event available, we could end up returning + // a time for an event that exists but the next event in its lineage does not exist because it was + // already aged off of a different journal. To avoid this, we return the "latest of the earliest" + // timestamps. This way, we know that no event with a larger ID has been aged off from any of the + // partitions. + Long latest = null; + for ( final Long earliestTime : earliestTimes ) { + if ( earliestTime == null ) { + continue; + } + + if ( latest == null || earliestTime > latest ) { + latest = earliestTime; + } + } + + return latest; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java new file mode 100644 index 0000000..6dd7be9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.config; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.search.SearchableField; + +public class JournalingRepositoryConfig { + private Map<String, File> containers = new HashMap<>(); + private long expirationMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS); + private long storageCapacity = 1024L * 1024L * 1024L; // 1 GB + private long rolloverMillis = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES); + private long journalCapacity = 1024L * 1024L * 5L; // 5 MB + private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB + private int partitionCount = 16; + private int blockSize = 100; + + private List<SearchableField> searchableFields = new ArrayList<>(); + private List<SearchableField> searchableAttributes = new ArrayList<>(); + private boolean compress = true; + private boolean alwaysSync = false; + private int threadPoolSize = 4; + private boolean readOnly = false; + + public void setReadOnly(final boolean readOnly) { + this.readOnly = readOnly; + } + + public boolean isReadOnly() { + return readOnly; + } + + /** + * Specifies where the repository will store data + * + * @return + */ + public Map<String, File> getContainers() { + return Collections.unmodifiableMap(containers); + } + + /** + * Specifies where the repository should store data + * + * @param storageDirectory + */ + public void setContainers(final Map<String, File> containers) { + this.containers = new HashMap<>(containers); + } + + /** + * Returns the maximum amount of time that a given record will stay in the + * repository + * + * @param timeUnit + * @return + */ + public long getEventExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(expirationMillis, TimeUnit.MILLISECONDS); + } + + /** + * Specifies how long a record should stay in the repository + * + * @param expiration + * @param timeUnit + */ + public void setEventExpiration(final long expiration, final TimeUnit timeUnit) { + this.expirationMillis = TimeUnit.MILLISECONDS.convert(expiration, timeUnit); + } + + /** + * Returns the maximum amount of data to store in the repository (in bytes) + * + * @return + */ + public long getMaxStorageCapacity() { + return storageCapacity; + } + + /** + * Sets the maximum amount of data to store in the repository (in bytes) + * @param maxStorageCapacity + */ + public void setMaxStorageCapacity(final long maxStorageCapacity) { + this.storageCapacity = maxStorageCapacity; + } + + /** + * Returns the maximum amount of time to write to a single event file + * + * @param timeUnit + * @return + */ + public long getJournalRolloverPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(rolloverMillis, TimeUnit.MILLISECONDS); + } + + /** + * Sets the maximum amount of time to write to a single event file + * + * @param rolloverPeriod + * @param timeUnit + */ + public void setJournalRolloverPeriod(final long rolloverPeriod, final TimeUnit timeUnit) { + this.rolloverMillis = TimeUnit.MILLISECONDS.convert(rolloverPeriod, timeUnit); + } + + /** + * Returns the number of bytes (pre-compression) that will be + * written to a single journal file before the file is rolled over + * + * @return + */ + public long getJournalCapacity() { + return journalCapacity; + } + + /** + * Sets the number of bytes (pre-compression) that will be written + * to a single journal file before the file is rolled over + * + * @param journalCapacity + */ + public void setJournalCapacity(final long journalCapacity) { + this.journalCapacity = journalCapacity; + } + + /** + * Returns the fields that can be indexed + * + * @return + */ + public List<SearchableField> getSearchableFields() { + return Collections.unmodifiableList(searchableFields); + } + + /** + * Sets the fields to index + * + * @param searchableFields + */ + public void setSearchableFields(final List<SearchableField> searchableFields) { + this.searchableFields = new ArrayList<>(searchableFields); + } + + /** + * Returns the FlowFile attributes that can be indexed + * + * @return + */ + public List<SearchableField> getSearchableAttributes() { + return Collections.unmodifiableList(searchableAttributes); + } + + /** + * Sets the FlowFile attributes to index + * + * @param searchableAttributes + */ + public void setSearchableAttributes(final List<SearchableField> searchableAttributes) { + this.searchableAttributes = new ArrayList<>(searchableAttributes); + } + + /** + * Indicates whether or not event files will be compressed when they are + * rolled over + * + * @return + */ + public boolean isCompressOnRollover() { + return compress; + } + + /** + * Specifies whether or not to compress event files on rollover + * + * @param compress + */ + public void setCompressOnRollover(final boolean compress) { + this.compress = compress; + } + + public int getThreadPoolSize() { + return threadPoolSize; + } + + public void setThreadPoolSize(final int queryThreadPoolSize) { + if (queryThreadPoolSize < 1) { + throw new IllegalArgumentException(); + } + this.threadPoolSize = queryThreadPoolSize; + } + + /** + * <p> + * Specifies the desired size of each Provenance Event index shard, in + * bytes. We shard the index for a few reasons: + * </p> + * + * <ol> + * <li> + * A very large index requires a significant amount of Java heap space to + * search. As the size of the shard increases, the required Java heap space + * also increases. + * </li> + * <li> + * By having multiple shards, we have the ability to use multiple concurrent + * threads to search the individual shards, resulting in far less latency + * when performing a search across millions or billions of records. + * </li> + * <li> + * We keep track of which time ranges each index shard spans. As a result, + * we are able to determine which shards need to be searched if a search + * provides a date range. This can greatly increase the speed of a search + * and reduce resource utilization. + * </li> + * </ol> + * + * @param bytes + */ + public void setDesiredIndexSize(final long bytes) { + this.desiredIndexBytes = bytes; + } + + /** + * Returns the desired size of each index shard. See the + * {@Link #setDesiredIndexSize} method for an explanation of why we choose + * to shard the index. + * + * @return + */ + public long getDesiredIndexSize() { + return desiredIndexBytes; + } + + /** + * Sets the number of Journal files to use when persisting records. + * + * @param numJournals + */ + public void setPartitionCount(final int numJournals) { + if (numJournals < 1) { + throw new IllegalArgumentException(); + } + + this.partitionCount = numJournals; + } + + /** + * Returns the number of Journal files that will be used when persisting + * records. + * + * @return + */ + public int getPartitionCount() { + return partitionCount; + } + + /** + * Specifies whether or not the Repository should sync all updates to disk. + * + * @return + */ + public boolean isAlwaysSync() { + return alwaysSync; + } + + /** + * Configures whether or not the Repository should sync all updates to disk. + * Setting this value to true means that updates are guaranteed to be + * persisted across restarted, even if there is a power failure or a sudden + * Operating System crash, but it can be very expensive. + * + * @param alwaysSync + */ + public void setAlwaysSync(boolean alwaysSync) { + this.alwaysSync = alwaysSync; + } + + /** + * Returns the minimum number of Provenance Events that should be written to a single Block. + * Events are written out in blocks, which are later optionally compressed. A larger block size + * will potentially result in better compression. However, a smaller block size will result + * in better performance when reading the data. The default value is 100 events per block. + * + * @return + */ + public int getBlockSize() { + return blockSize; + } + + /** + * Sets the minimum number of Provenance Events that should be written to a single Block. + * Events are written out in blocks, which are later optionally compressed. A larger block size + * will potentially result in better compression. However, a smaller block size will result + * in better performance when reading the data. The default value is 100 events per block. + * + * @return + */ + public void setBlockSize(final int blockSize) { + if ( blockSize < 1 ) { + throw new IllegalArgumentException("Cannot set block size to " + blockSize + "; must be a positive integer"); + } + this.blockSize = blockSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java new file mode 100644 index 0000000..b669c53 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.search.Query; + +public interface EventIndexSearcher extends Closeable { + /** + * Searches the repository for any events that match the provided query and returns the locations + * where those events are stored + * @param query + * @return + */ + SearchResult search(Query query) throws IOException; + + List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java new file mode 100644 index 0000000..1f231e9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; + +public interface EventIndexWriter extends Closeable { + + /** + * Adds all of the events to the index. + * @param events + * @throws IOException + */ + void index(final Collection<JournaledProvenanceEvent> events) throws IOException; + + /** + * Forces all updates to the index to be pushed to disk. + * @throws IOException + */ + void sync() throws IOException; + + /** + * Deletes any records that belong to the given container/section/journal + * @param containerName + * @param section + * @param journalId + * @throws IOException + */ + void delete(String containerName, String section, String journalId) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java new file mode 100644 index 0000000..977df9f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +public class IndexedFieldNames { + + public static final String CONTAINER_NAME = "containerName"; + public static final String SECTION_NAME = "sectionName"; + public static final String JOURNAL_ID = "journalId"; + public static final String BLOCK_INDEX = "blockIndex"; + public static final String EVENT_ID = "eventId"; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java new file mode 100644 index 0000000..9ec9f5d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import org.apache.nifi.provenance.NamedSearchableField; +import org.apache.nifi.provenance.search.SearchableField; + +public class JournalingSearchableFields { + public static SearchableField CONTAINER_NAME = new NamedSearchableField(IndexedFieldNames.CONTAINER_NAME, IndexedFieldNames.CONTAINER_NAME, "Container Name", false); + public static SearchableField SECTION_NAME = new NamedSearchableField(IndexedFieldNames.SECTION_NAME, IndexedFieldNames.SECTION_NAME, "Section Name", false); + public static SearchableField JOURNAL_ID = new NamedSearchableField(IndexedFieldNames.JOURNAL_ID, IndexedFieldNames.JOURNAL_ID, "Journal ID", false); + public static SearchableField BLOCK_INDEX = new NamedSearchableField(IndexedFieldNames.BLOCK_INDEX, IndexedFieldNames.BLOCK_INDEX, "Block Index", false); + public static SearchableField EVENT_ID = new NamedSearchableField(IndexedFieldNames.EVENT_ID, IndexedFieldNames.EVENT_ID, "Event ID", false); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java new file mode 100644 index 0000000..32dc7c3 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortField.Type; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.search.Query; + +public class LuceneIndexSearcher implements EventIndexSearcher { + private final DirectoryReader reader; + private final IndexSearcher searcher; + private final FSDirectory fsDirectory; + + public LuceneIndexSearcher(final File indexDirectory) throws IOException { + this.fsDirectory = FSDirectory.open(indexDirectory); + this.reader = DirectoryReader.open(fsDirectory); + this.searcher = new IndexSearcher(reader); + } + + public LuceneIndexSearcher(final DirectoryReader reader) { + this.reader = reader; + this.searcher = new IndexSearcher(reader); + this.fsDirectory = null; + } + + @Override + public void close() throws IOException { + IOException suppressed = null; + try { + reader.close(); + } catch (final IOException ioe) { + suppressed = ioe; + } + + if ( fsDirectory != null ) { + fsDirectory.close(); + } + + if ( suppressed != null ) { + throw suppressed; + } + } + + private JournaledStorageLocation createLocation(final Document document) { + final String containerName = document.get(IndexedFieldNames.CONTAINER_NAME); + final String sectionName = document.get(IndexedFieldNames.SECTION_NAME); + final String journalId = document.get(IndexedFieldNames.JOURNAL_ID); + final int blockIndex = document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue(); + final long eventId = document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue(); + + return new JournaledStorageLocation(containerName, sectionName, journalId, blockIndex, eventId); + } + + private List<JournaledStorageLocation> getLocations(final TopDocs topDocs) throws IOException { + final ScoreDoc[] scoreDocs = topDocs.scoreDocs; + final List<JournaledStorageLocation> locations = new ArrayList<>(scoreDocs.length); + for ( final ScoreDoc scoreDoc : scoreDocs ) { + final Document document = reader.document(scoreDoc.doc); + locations.add(createLocation(document)); + } + + return locations; + } + + @Override + public SearchResult search(final Query provenanceQuery) throws IOException { + final org.apache.lucene.search.Query luceneQuery = QueryUtils.convertQueryToLucene(provenanceQuery); + final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final List<JournaledStorageLocation> locations = getLocations(topDocs); + + return new SearchResult(locations, topDocs.totalHits); + } + + @Override + public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxResults) throws IOException { + final BooleanQuery query = new BooleanQuery(); + query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.EVENT_ID, minEventId, null, true, true), Occur.MUST); + + final TopDocs topDocs = searcher.search(query, maxResults, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG))); + return getLocations(topDocs); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java new file mode 100644 index 0000000..e955ae5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.LongField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Version; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.search.SearchableField; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LuceneIndexWriter implements EventIndexWriter { + private static final Logger logger = LoggerFactory.getLogger(LuceneIndexWriter.class); + + @SuppressWarnings("unused") + private final JournalingRepositoryConfig config; + private final Set<SearchableField> nonAttributeSearchableFields; + private final Set<SearchableField> attributeSearchableFields; + + private final Directory directory; + private final Analyzer analyzer; + private final IndexWriter indexWriter; + private final AtomicLong indexMaxId = new AtomicLong(-1L); + + public LuceneIndexWriter(final File indexDir, final JournalingRepositoryConfig config) throws IOException { + this.config = config; + + attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableAttributes())); + nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableFields())); + + directory = FSDirectory.open(indexDir); + analyzer = new StandardAnalyzer(); + final IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LATEST, analyzer); + indexWriter = new IndexWriter(directory, writerConfig); + } + + public EventIndexSearcher newIndexSearcher() throws IOException { + final DirectoryReader reader = DirectoryReader.open(indexWriter, false); + return new LuceneIndexSearcher(reader); + } + + @Override + public void close() throws IOException { + IOException suppressed = null; + try { + indexWriter.close(); + } catch (final IOException ioe) { + suppressed = ioe; + } + + analyzer.close(); + + try { + directory.close(); + } catch (final IOException ioe) { + if ( suppressed != null ) { + ioe.addSuppressed(suppressed); + } + + throw ioe; + } + } + + + private void addField(final Document doc, final SearchableField field, final String value, final Store store) { + if (value == null || (!nonAttributeSearchableFields.contains(field) && !field.isAttribute())) { + return; + } + + doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store)); + } + + @Override + public void index(final Collection<JournaledProvenanceEvent> events) throws IOException { + long maxId = this.indexMaxId.get(); + + for ( final JournaledProvenanceEvent event : events ) { + maxId = event.getEventId(); + + final Map<String, String> attributes = event.getAttributes(); + + final Document doc = new Document(); + addField(doc, SearchableFields.FlowFileUUID, event.getFlowFileUuid(), Store.NO); + addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO); + addField(doc, SearchableFields.ComponentID, event.getComponentId(), Store.NO); + addField(doc, SearchableFields.AlternateIdentifierURI, event.getAlternateIdentifierUri(), Store.NO); + addField(doc, SearchableFields.EventType, event.getEventType().name(), Store.NO); + addField(doc, SearchableFields.Relationship, event.getRelationship(), Store.NO); + addField(doc, SearchableFields.Details, event.getDetails(), Store.NO); + addField(doc, SearchableFields.ContentClaimSection, event.getContentClaimSection(), Store.NO); + addField(doc, SearchableFields.ContentClaimContainer, event.getContentClaimContainer(), Store.NO); + addField(doc, SearchableFields.ContentClaimIdentifier, event.getContentClaimIdentifier(), Store.NO); + addField(doc, SearchableFields.SourceQueueIdentifier, event.getSourceQueueIdentifier(), Store.NO); + + if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) { + addField(doc, SearchableFields.TransitURI, event.getTransitUri(), Store.NO); + } + + for (final SearchableField searchableField : attributeSearchableFields) { + addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO); + } + + // Index the fields that we always index (unless there's nothing else to index at all) + doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), event.getLineageStartDate(), Store.NO)); + doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), event.getEventTime(), Store.NO)); + doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), event.getFileSize(), Store.NO)); + + final JournaledStorageLocation location = event.getStorageLocation(); + doc.add(new StringField(IndexedFieldNames.CONTAINER_NAME, location.getContainerName(), Store.YES)); + doc.add(new StringField(IndexedFieldNames.SECTION_NAME, location.getSectionName(), Store.YES)); + doc.add(new StringField(IndexedFieldNames.JOURNAL_ID, location.getJournalId(), Store.YES)); + doc.add(new LongField(IndexedFieldNames.BLOCK_INDEX, location.getBlockIndex(), Store.YES)); + doc.add(new LongField(IndexedFieldNames.EVENT_ID, location.getEventId(), Store.YES)); + + for (final String lineageIdentifier : event.getLineageIdentifiers()) { + addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO); + } + + // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. + if (event.getEventType() == ProvenanceEventType.FORK || event.getEventType() == ProvenanceEventType.CLONE || event.getEventType() == ProvenanceEventType.REPLAY) { + for (final String uuid : event.getChildUuids()) { + if (!uuid.equals(event.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + } + } + } else if (event.getEventType() == ProvenanceEventType.JOIN) { + for (final String uuid : event.getParentUuids()) { + if (!uuid.equals(event.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + } + } + } else if (event.getEventType() == ProvenanceEventType.RECEIVE && event.getSourceSystemFlowFileIdentifier() != null) { + // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID + // that the Source System uses to refer to the data. + final String sourceIdentifier = event.getSourceSystemFlowFileIdentifier(); + final String sourceFlowFileUUID; + final int lastColon = sourceIdentifier.lastIndexOf(":"); + if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) { + sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1); + } else { + sourceFlowFileUUID = null; + } + + if (sourceFlowFileUUID != null) { + addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO); + } + } + + indexWriter.addDocument(doc); + } + + // Update the index's max id + boolean updated = false; + do { + long curMax = indexMaxId.get(); + if ( maxId > curMax ) { + updated = indexMaxId.compareAndSet(curMax, maxId); + } else { + updated = true; + } + } while (!updated); + } + + + @Override + public void delete(final String containerName, final String section, final String journalId) throws IOException { + final BooleanQuery query = new BooleanQuery(); + query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST)); + query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST)); + query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.JOURNAL_ID, journalId)), Occur.MUST)); + + indexWriter.deleteDocuments(query); + } + + + @Override + public void sync() throws IOException { + indexWriter.commit(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java new file mode 100644 index 0000000..4ae4b16 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.WildcardQuery; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StorageLocation; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.search.SearchTerm; + +public class QueryUtils { + public static org.apache.lucene.search.Query convertQueryToLucene(final org.apache.nifi.provenance.search.Query query) { + if (query.getStartDate() == null && query.getEndDate() == null && query.getSearchTerms().isEmpty()) { + return new MatchAllDocsQuery(); + } + + final BooleanQuery luceneQuery = new BooleanQuery(); + for (final SearchTerm searchTerm : query.getSearchTerms()) { + final String searchValue = searchTerm.getValue(); + if (searchValue == null) { + throw new IllegalArgumentException("Empty search value not allowed (for term '" + searchTerm.getSearchableField().getFriendlyName() + "')"); + } + + if (searchValue.contains("*") || searchValue.contains("?")) { + luceneQuery.add(new BooleanClause(new WildcardQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST)); + } else { + luceneQuery.add(new BooleanClause(new TermQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST)); + } + } + + final Long minBytes = query.getMinFileSize() == null ? null : DataUnit.parseDataSize(query.getMinFileSize(), DataUnit.B).longValue(); + final Long maxBytes = query.getMaxFileSize() == null ? null : DataUnit.parseDataSize(query.getMaxFileSize(), DataUnit.B).longValue(); + if (minBytes != null || maxBytes != null) { + luceneQuery.add(NumericRangeQuery.newLongRange(SearchableFields.FileSize.getSearchableFieldName(), minBytes, maxBytes, true, true), Occur.MUST); + } + + final Long minDateTime = query.getStartDate() == null ? null : query.getStartDate().getTime(); + final Long maxDateTime = query.getEndDate() == null ? null : query.getEndDate().getTime(); + if (maxDateTime != null || minDateTime != null) { + luceneQuery.add(NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(), minDateTime, maxDateTime, true, true), Occur.MUST); + } + + return luceneQuery; + } + + + private static File getJournalFile(final JournaledStorageLocation location, final JournalingRepositoryConfig config) throws FileNotFoundException { + final File containerDir = config.getContainers().get(location.getContainerName()); + if ( containerDir == null ) { + throw new FileNotFoundException("Could not find Container with name " + location.getContainerName()); + } + + final String sectionName = location.getSectionName(); + final File sectionFile = new File(containerDir, sectionName); + final File journalDir = new File(sectionFile, "journals"); + final File journalFile = new File(journalDir, location.getJournalId() + ".journal"); + + return journalFile; + } + + + public static Map<File, List<JournaledStorageLocation>> orderLocations(final List<StorageLocation> locations, final JournalingRepositoryConfig config) throws FileNotFoundException, IOException { + final Map<File, List<JournaledStorageLocation>> map = new HashMap<>(); + + for ( final StorageLocation location : locations ) { + if ( !(location instanceof JournaledStorageLocation) ) { + throw new IllegalArgumentException(location + " is not a valid StorageLocation for this repository"); + } + + final JournaledStorageLocation journaledLocation = (JournaledStorageLocation) location; + final File journalFile = getJournalFile(journaledLocation, config); + List<JournaledStorageLocation> list = map.get(journalFile); + if ( list == null ) { + list = new ArrayList<>(); + map.put(journalFile, list); + } + + list.add(journaledLocation); + } + + for ( final List<JournaledStorageLocation> list : map.values() ) { + Collections.sort(list); + } + + return map; + } + + public static File getTocFile(final File journalFile) { + return new File(journalFile.getParentFile(), journalFile.getName() + ".toc"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java new file mode 100644 index 0000000..ac82438 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.util.List; + +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; + +public class SearchResult { + private final int totalCount; + private final List<JournaledStorageLocation> locations; + + public SearchResult(final List<JournaledStorageLocation> locations, final int totalCount) { + this.totalCount = totalCount; + this.locations = locations; + } + + public int getTotalCount() { + return totalCount; + } + + public List<JournaledStorageLocation> getLocations() { + return locations; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java new file mode 100644 index 0000000..67b1cb6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.io; + +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface Deserializer { + + String getCodecName(); + + ProvenanceEventRecord deserialize(DataInputStream in, int serializationVersion) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java new file mode 100644 index 0000000..4be87e3 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.io; + +public class Deserializers { + + public static Deserializer getDeserializer(final String codecName) { + switch (codecName) { + case StandardEventDeserializer.CODEC_NAME: + return new StandardEventDeserializer(); + default: + throw new IllegalArgumentException("Unknown Provenance Serialization Codec: " + codecName); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java new file mode 100644 index 0000000..8219b4c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.io; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface Serializer { + + /** + * Returns the serialization version that is used to serialize records + * @return + */ + int getVersion(); + + /** + * Returns the name of the codec used to serialize the records + * @return + */ + String getCodecName(); + + /** + * Serializes the given even to the given DataOutputStream. + * This method should NOT serialize the ID, as the ID is not yet known. The ID will instead by + * serialized to the stream appropriately by the JournalWriter. + * + * @param event + * @param out + * @throws IOException + */ + void serialize(ProvenanceEventRecord event, DataOutputStream out) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java new file mode 100644 index 0000000..fb537ee --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.io; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.IdEnrichedProvenanceEvent; +import org.apache.nifi.stream.io.StreamUtils; + +public class StandardEventDeserializer implements Deserializer { + public static final String CODEC_NAME = StandardEventSerializer.CODEC_NAME; + + @Override + public String getCodecName() { + return CODEC_NAME; + } + + @Override + public ProvenanceEventRecord deserialize(final DataInputStream in, final int serializationVersion) throws IOException { + final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder(); + + final long eventId = in.readLong(); + final String eventTypeName = in.readUTF(); + final ProvenanceEventType eventType = ProvenanceEventType.valueOf(eventTypeName); + builder.setEventType(eventType); + builder.setEventTime(in.readLong()); + + final Long flowFileEntryDate = in.readLong(); + builder.setEventDuration(in.readLong()); + + final Set<String> lineageIdentifiers = new HashSet<>(); + final int numLineageIdentifiers = in.readInt(); + for (int i = 0; i < numLineageIdentifiers; i++) { + lineageIdentifiers.add(readUUID(in)); + } + + final long lineageStartDate = in.readLong(); + + builder.setComponentId(readNullableString(in)); + builder.setComponentType(readNullableString(in)); + + final String uuid = readUUID(in); + builder.setFlowFileUUID(uuid); + builder.setDetails(readNullableString(in)); + + // Read in the FlowFile Attributes + final Map<String, String> previousAttrs = readAttributes(in, false); + final Map<String, String> attrUpdates = readAttributes(in, true); + builder.setAttributes(previousAttrs, attrUpdates); + + final boolean hasContentClaim = in.readBoolean(); + if (hasContentClaim) { + builder.setCurrentContentClaim(in.readUTF(), in.readUTF(), in.readUTF(), in.readLong(), in.readLong()); + } else { + builder.setCurrentContentClaim(null, null, null, null, 0L); + } + + final boolean hasPreviousClaim = in.readBoolean(); + if (hasPreviousClaim) { + builder.setPreviousContentClaim(in.readUTF(), in.readUTF(), in.readUTF(), in.readLong(), in.readLong()); + } + + builder.setSourceQueueIdentifier(readNullableString(in)); + + // Read Event-Type specific fields. + if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) { + final int numParents = in.readInt(); + for (int i = 0; i < numParents; i++) { + builder.addParentUuid(readUUID(in)); + } + + final int numChildren = in.readInt(); + for (int i = 0; i < numChildren; i++) { + builder.addChildUuid(readUUID(in)); + } + } else if (eventType == ProvenanceEventType.RECEIVE) { + builder.setTransitUri(readNullableString(in)); + builder.setSourceSystemFlowFileIdentifier(readNullableString(in)); + } else if (eventType == ProvenanceEventType.SEND) { + builder.setTransitUri(readNullableString(in)); + } else if (eventType == ProvenanceEventType.ADDINFO) { + builder.setAlternateIdentifierUri(readNullableString(in)); + } else if (eventType == ProvenanceEventType.ROUTE) { + builder.setRelationship(readNullableString(in)); + } + + builder.setFlowFileEntryDate(flowFileEntryDate); + builder.setLineageIdentifiers(lineageIdentifiers); + builder.setLineageStartDate(lineageStartDate); + final ProvenanceEventRecord event = builder.build(); + + return new IdEnrichedProvenanceEvent(event, eventId); + } + + + private static Map<String, String> readAttributes(final DataInputStream dis, final boolean valueNullable) throws IOException { + final int numAttributes = dis.readInt(); + final Map<String, String> attrs = new HashMap<>(); + for (int i = 0; i < numAttributes; i++) { + final String key = readLongString(dis); + final String value = valueNullable ? readLongNullableString(dis) : readLongString(dis); + attrs.put(key, value); + } + + return attrs; + } + + private static String readUUID(final DataInputStream in) throws IOException { + final long msb = in.readLong(); + final long lsb = in.readLong(); + return new UUID(msb, lsb).toString(); + } + + private static String readNullableString(final DataInputStream in) throws IOException { + final boolean valueExists = in.readBoolean(); + if (valueExists) { + return in.readUTF(); + } else { + return null; + } + } + + private static String readLongNullableString(final DataInputStream in) throws IOException { + final boolean valueExists = in.readBoolean(); + if (valueExists) { + return readLongString(in); + } else { + return null; + } + } + + private static String readLongString(final DataInputStream in) throws IOException { + final int length = in.readInt(); + final byte[] strBytes = new byte[length]; + StreamUtils.fillBuffer(in, strBytes); + return new String(strBytes, StandardCharsets.UTF_8); + } +}