http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java new file mode 100644 index 0000000..ba4acea --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.authorization.EventTransformer; +import org.apache.nifi.provenance.index.EventIndex; +import org.apache.nifi.provenance.serialization.StorageSummary; + +/** + * <p> + * An Event Store is responsible for storing Provenance Events and retrieving them at a later time. + * </p> + */ +public interface EventStore extends Closeable { + + /** + * Performs any initialization routines that need to happen before the store is used + * + * @throws IOException if unable to perform initialization + */ + void initialize() throws IOException; + + /** + * Adds the given events to the store. All events will be written to the same Storage Location. + * I.e., all of the {@link StorageSummary} objects that are provided when calling the {@link StorageResult#getStorageLocations()} + * method will return the same value for the {@link StorageSummary#getStorageLocation()}. Each one, however, will + * have a different Event ID and potentially a different Block Index. + * + * @param events the events to add + * @return a mapping of event to the location where it was stored + * @throws IOException if unable to add the events + */ + StorageResult addEvents(Iterable<ProvenanceEventRecord> events) throws IOException; + + /** + * @return the number of bytes occupied by the events in the store + * @throws IOException if unable to determine the size of the store + */ + long getSize() throws IOException; + + /** + * @return the largest Event ID that has been written to this store, or -1 if no events have yet been stored. + */ + long getMaxEventId(); + + /** + * Retrieves the event with the given ID + * + * @param id the ID of the event to retrieve + * @return an Optional containing the Event with the given ID, or an empty optional if the event cannot be found + * @throws IOException if unable to read the event from storage + */ + Optional<ProvenanceEventRecord> getEvent(long id) throws IOException; + + /** + * Retrieves up to maxRecords events from the store, starting with the event whose ID is equal to firstRecordId. If that + * event cannot be found, then the first event will be the oldest event in the store whose ID is greater than firstRecordId. + * All events will be returned in the order that they were written to the store. I.e., all events will have monotonically + * increasing Event ID's. No events will be filtered out, since there is no EventAuthorizer provided. + * + * @param firstRecordId the ID of the first event to retrieve + * @param maxRecords the maximum number of records to retrieve. The actual number of results returned may be less than this. + * @return a List of ProvenanceEventRecord's + * @throws IOException if unable to retrieve records from the store + */ + List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords) throws IOException; + + /** + * Retrieves up to maxRecords events from the store, starting with the event whose ID is equal to firstRecordId. If that + * event cannot be found, then the first event will be the oldest event in the store whose ID is greater than firstRecordId. + * All events will be returned in the order that they were written to the store. I.e., all events will have monotonically + * increasing Event ID's. + * + * @param firstRecordId the ID of the first event to retrieve + * @param maxRecords the maximum number of records to retrieve. The actual number of results returned may be less than this. + * @param authorizer the authorizer that should be used to filter out any events that the user doesn't have access to + * @param unauthorizedTransformer the transformer to apply to unauthorized events + * @return a List of ProvenanceEventRecord's + * @throws IOException if unable to retrieve records from the store + */ + List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords, EventAuthorizer authorizer, EventTransformer unauthorizedTransformer) throws IOException; + + /** + * Given a List of Event ID's, returns a List of Provenance Events that contain the events that have those corresponding + * Event ID's. If any events cannot be found, a warning will be logged but no Exception will be thrown. + * + * @param eventIds a Stream of Event ID's + * @param authorizer the authorizer that should be used to filter out any events that the user doesn't have access to + * @param unauthorizedTransformer the transformer to apply to unauthorized events + * @return a List of events that correspond to the given ID's + * @throws IOException if unable to retrieve records from the store + */ + List<ProvenanceEventRecord> getEvents(List<Long> eventIds, EventAuthorizer authorizer, EventTransformer unauthorizedTransformer) throws IOException; + + /** + * Causes the latest events in this store to be re-indexed by the given Event Index + * + * @param eventIndex the EventIndex to use for indexing events + */ + void reindexLatestEvents(EventIndex eventIndex); +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java new file mode 100644 index 0000000..ccb94f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.store.iterator.EventIterator; + +public interface EventStorePartition extends Closeable { + /** + * Performs any initialization routines that need to happen before the store is used + * + * @throws IOException if unable to perform initialization + */ + void initialize() throws IOException; + + /** + * Adds the given events to the store + * + * @param events the events to add + * @return a mapping of event to the location where it was stored + * @throws IOException if unable to add the events + */ + StorageResult addEvents(Iterable<ProvenanceEventRecord> events) throws IOException; + + /** + * @return the number of bytes occupied by the events in the store + */ + long getSize(); + + /** + * @return the largest Event ID that has been written to this store, or -1 if no events have yet been stored. + */ + long getMaxEventId(); + + /** + * Retrieves the event with the given ID + * + * @param id the ID of the event to retrieve + * @return the Event with the given ID, or <code>null</code> if the event cannot be found + * @throws IOException if unable to read the event from storage + */ + Optional<ProvenanceEventRecord> getEvent(long id) throws IOException; + + /** + * Retrieves up to maxRecords events from the store, starting with the event whose ID is equal to firstRecordId. If that + * event cannot be found, then the first event will be the oldest event in the store whose ID is greater than firstRecordId. + * All events will be returned in the order that they were written to the store. I.e., all events will have monotonically + * increasing Event ID's. + * + * @param firstRecordId the ID of the first event to retrieve + * @param maxEvents the maximum number of events to retrieve. The actual number of results returned may be less than this. + * @param authorizer the authorizer that should be used to filter out any events that the user doesn't have access to + * @return a List of ProvenanceEventRecord's + * @throws IOException if unable to retrieve records from the store + */ + List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxEvents, EventAuthorizer authorizer) throws IOException; + + /** + * Returns an {@link EventIterator} that is capable of iterating over the events in the store beginning with the given + * record id. The events returned by the EventIterator will be provided in the order in which they were stored in the + * partition. All events retrieved from this EventIterator will have monotonically increasing Event ID's. + * + * @param minimumEventId the minimum value of any Event ID that should be returned + * @return an EventIterator that is capable of iterating over events in the store + */ + EventIterator createEventIterator(long minimumEventId); + + /** + * Returns an {@link EventIterator} that iterates over the given event ID's and returns one ProvenanceEventRecord for + * each given, if the ID given can be found. If a given ID cannot be found, it will be skipped and no error will be reported. + * + * @param eventIds the ID's of the events to retrieve + * @return an EventIterator that iterates over the given event ID's + */ + EventIterator createEventIterator(List<Long> eventIds); + + /** + * Purges any events from the partition that are older than the given amount of time + * + * @param olderThan the amount of time for which any event older than this should be removed + * @param timeUnit the unit of time that applies to the first argument + */ + void purgeOldEvents(long olderThan, TimeUnit timeUnit); + + /** + * Purges some number of events from the partition. The oldest events will be purged. + * + * @return the number of bytes purged from the partition + */ + long purgeOldestEvents(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java new file mode 100644 index 0000000..5f922dd --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import org.apache.lucene.util.NamedThreadFactory; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.authorization.EventTransformer; +import org.apache.nifi.provenance.store.iterator.AuthorizingEventIterator; +import org.apache.nifi.provenance.store.iterator.EventIterator; +import org.apache.nifi.provenance.util.DirectoryUtils; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class PartitionedEventStore implements EventStore { + private static final Logger logger = LoggerFactory.getLogger(PartitionedEventStore.class); + private static final String EVENT_CATEGORY = "Provenance Repository"; + + private final AtomicLong partitionIndex = new AtomicLong(0L); + private final RepositoryConfiguration repoConfig; + private final EventReporter eventReporter; + private ScheduledExecutorService maintenanceExecutor; + + public PartitionedEventStore(final RepositoryConfiguration config, final EventReporter eventReporter) { + this.repoConfig = config; + this.eventReporter = eventReporter; + } + + + @Override + public void initialize() throws IOException { + maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance")); + maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES); + + for (final EventStorePartition partition : getPartitions()) { + partition.initialize(); + } + } + + @Override + public void close() throws IOException { + if (maintenanceExecutor != null) { + maintenanceExecutor.shutdownNow(); + } + + IOException thrown = null; + + for (final EventStorePartition partition : getPartitions()) { + try { + partition.close(); + } catch (final IOException ioe) { + if (thrown == null) { + thrown = ioe; + } else { + thrown.addSuppressed(ioe); + } + } + } + + if (thrown != null) { + throw thrown; + } + } + + + @Override + public StorageResult addEvents(final Iterable<ProvenanceEventRecord> events) throws IOException { + final List<? extends EventStorePartition> partitions = getPartitions(); + final int index = (int) (partitionIndex.getAndIncrement() % partitions.size()); + final EventStorePartition partition = partitions.get(index); + return partition.addEvents(events); + } + + @Override + public long getSize() { + long size = 0; + for (final EventStorePartition partition : getPartitions()) { + size += partition.getSize(); + } + + return size; + } + + private long getRepoSize() { + long total = 0L; + + for (final File storageDir : repoConfig.getStorageDirectories().values()) { + total += DirectoryUtils.getSize(storageDir); + } + + return total; + } + + @Override + public long getMaxEventId() { + return getPartitions().stream() + .mapToLong(part -> part.getMaxEventId()) + .max() + .orElse(-1L); + } + + @Override + public Optional<ProvenanceEventRecord> getEvent(final long id) throws IOException { + for (final EventStorePartition partition : getPartitions()) { + final Optional<ProvenanceEventRecord> option = partition.getEvent(id); + if (option.isPresent()) { + return option; + } + } + + return Optional.empty(); + } + + @Override + public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException { + return getEvents(firstRecordId, maxRecords, EventAuthorizer.GRANT_ALL, EventTransformer.EMPTY_TRANSFORMER); + } + + @Override + public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final EventAuthorizer authorizer, + final EventTransformer transformer) throws IOException { + if (firstRecordId + maxRecords < 1 || maxRecords < 1 || firstRecordId > getMaxEventId()) { + return Collections.emptyList(); + } + + return getEvents(maxRecords, authorizer, part -> part.createEventIterator(firstRecordId), transformer); + } + + @Override + public List<ProvenanceEventRecord> getEvents(final List<Long> eventIds, final EventAuthorizer authorizer, final EventTransformer transformer) throws IOException { + if (eventIds == null || eventIds.isEmpty()) { + return Collections.emptyList(); + } + + return getEvents(eventIds.size(), authorizer, part -> part.createEventIterator(eventIds), transformer); + } + + private List<ProvenanceEventRecord> getEvents(final int maxRecords, final EventAuthorizer authorizer, + final Function<EventStorePartition, EventIterator> eventIteratorFactory, final EventTransformer transformer) throws IOException { + + if (maxRecords < 1) { + return Collections.emptyList(); + } + + final List<ProvenanceEventRecord> selectedEvents = new ArrayList<>(); + + // Create a Map so that the key is the next record available from a partition and the value is the EventIterator from which + // the record came. This sorted map is then used so that we are able to always get the first entry, which is the next + // lowest record id among all partitions. + final SortedMap<ProvenanceEventRecord, EventIterator> recordToIteratorMap = new TreeMap<>( + (o1, o2) -> Long.compare(o1.getEventId(), o2.getEventId())); + + try { + // Seed our map with the first event in each Partition. + for (final EventStorePartition partition : getPartitions()) { + final EventAuthorizer nonNullAuthorizer = authorizer == null ? EventAuthorizer.GRANT_ALL : authorizer; + final EventIterator partitionIterator = eventIteratorFactory.apply(partition); + final EventIterator iterator = new AuthorizingEventIterator(partitionIterator, nonNullAuthorizer, transformer); + + final Optional<ProvenanceEventRecord> option = iterator.nextEvent(); + if (option.isPresent()) { + recordToIteratorMap.put(option.get(), iterator); + } + } + + // If no records found, just return the empty list. + if (recordToIteratorMap.isEmpty()) { + return selectedEvents; + } + + // Get the event with the next-lowest ID. Add it to the list of selected events, + // then read the next event from the same EventIterator that this event came from. + // This ensures that our map is always populated with the next event for each + // EventIterator, which also ensures that the first key in our map is the event + // with the lowest ID (since all events from a given EventIterator have monotonically + // increasing Event ID's). + ProvenanceEventRecord nextEvent = recordToIteratorMap.firstKey(); + while (nextEvent != null && selectedEvents.size() < maxRecords) { + selectedEvents.add(nextEvent); + + final EventIterator iterator = recordToIteratorMap.remove(nextEvent); + final Optional<ProvenanceEventRecord> nextRecordFromIterator = iterator.nextEvent(); + if (nextRecordFromIterator.isPresent()) { + recordToIteratorMap.put(nextRecordFromIterator.get(), iterator); + } + + nextEvent = recordToIteratorMap.isEmpty() ? null : recordToIteratorMap.firstKey(); + } + + return selectedEvents; + } finally { + // Ensure that we close all record readers that have been created + for (final EventIterator iterator : recordToIteratorMap.values()) { + try { + iterator.close(); + } catch (final Exception e) { + if (logger.isDebugEnabled()) { + logger.warn("Failed to close Record Reader {}", iterator, e); + } else { + logger.warn("Failed to close Record Reader {}", iterator); + } + } + } + } + } + + + void performMaintenance() { + try { + final long maxFileLife = repoConfig.getMaxRecordLife(TimeUnit.MILLISECONDS); + for (final EventStorePartition partition : getPartitions()) { + try { + partition.purgeOldEvents(maxFileLife, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + logger.error("Failed to purge expired events from " + partition, e); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, + "Failed to purge expired events from Provenance Repository. See logs for more information."); + } + } + + final long maxStorageCapacity = repoConfig.getMaxStorageCapacity(); + long currentSize; + try { + currentSize = getRepoSize(); + } catch (final Exception e) { + logger.error("Could not determine size of Provenance Repository. Will not expire any data due to storage limits", e); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to determine size of Provenance Repository. " + + "No data will be expired due to storage limits at this time. See logs for more information."); + return; + } + + while (currentSize > maxStorageCapacity) { + for (final EventStorePartition partition : getPartitions()) { + try { + final long removed = partition.purgeOldestEvents(); + currentSize -= removed; + } catch (final Exception e) { + logger.error("Failed to purge oldest events from " + partition, e); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, + "Failed to purge oldest events from Provenance Repository. See logs for more information."); + } + } + } + } catch (final Exception e) { + logger.error("Failed to perform periodic maintenance", e); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, + "Failed to perform periodic maintenace for Provenance Repository. See logs for more information."); + } + } + + protected abstract List<? extends EventStorePartition> getPartitions(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java new file mode 100644 index 0000000..14de80e --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.util.NamedThreadFactory; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.index.EventIndex; +import org.apache.nifi.provenance.serialization.EventFileCompressor; + +public class PartitionedWriteAheadEventStore extends PartitionedEventStore { + private final BlockingQueue<File> filesToCompress; + private final List<WriteAheadStorePartition> partitions; + private final RepositoryConfiguration repoConfig; + + private final ExecutorService compressionExecutor; + private final List<EventFileCompressor> fileCompressors = Collections.synchronizedList(new ArrayList<>()); + private final EventReporter eventReporter; + private final EventFileManager fileManager; + + public PartitionedWriteAheadEventStore(final RepositoryConfiguration repoConfig, final RecordWriterFactory recordWriterFactory, + final RecordReaderFactory recordReaderFactory, final EventReporter eventReporter, final EventFileManager fileManager) { + super(repoConfig, eventReporter); + this.repoConfig = repoConfig; + this.eventReporter = eventReporter; + this.filesToCompress = new LinkedBlockingQueue<>(100); + final AtomicLong idGenerator = new AtomicLong(0L); + this.partitions = createPartitions(repoConfig, recordWriterFactory, recordReaderFactory, idGenerator); + this.fileManager = fileManager; + + // Creates tasks to compress data on rollover + if (repoConfig.isCompressOnRollover()) { + compressionExecutor = Executors.newFixedThreadPool(repoConfig.getIndexThreadPoolSize(), new NamedThreadFactory("Compress Provenance Logs")); + } else { + compressionExecutor = null; + } + } + + private List<WriteAheadStorePartition> createPartitions(final RepositoryConfiguration repoConfig, final RecordWriterFactory recordWriterFactory, + final RecordReaderFactory recordReaderFactory, final AtomicLong idGenerator) { + final Map<String, File> storageDirectories = repoConfig.getStorageDirectories(); + final List<WriteAheadStorePartition> partitions = new ArrayList<>(storageDirectories.size()); + + for (final Map.Entry<String, File> entry : storageDirectories.entrySet()) { + // Need to ensure that the same partition directory always gets the same partition index. + // If we don't, then we will end up re-indexing the events from 1 index into another index, and + // this will result in a lot of duplicates (up to a million per index per restart). This is the reason + // that we use a partition name here based on the properties file. + final String partitionName = entry.getKey(); + final File storageDirectory = entry.getValue(); + partitions.add(new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, + recordWriterFactory, recordReaderFactory, filesToCompress, idGenerator, eventReporter)); + } + + return partitions; + } + + @Override + public void initialize() throws IOException { + if (repoConfig.isCompressOnRollover()) { + for (int i = 0; i < repoConfig.getIndexThreadPoolSize(); i++) { + final EventFileCompressor compressor = new EventFileCompressor(filesToCompress, fileManager); + compressionExecutor.submit(compressor); + fileCompressors.add(compressor); + } + } + + super.initialize(); + } + + @Override + public void close() throws IOException { + super.close(); + + for (final EventFileCompressor compressor : fileCompressors) { + compressor.shutdown(); + } + + if (compressionExecutor != null) { + compressionExecutor.shutdown(); + } + } + + @Override + public void reindexLatestEvents(final EventIndex eventIndex) { + final List<WriteAheadStorePartition> partitions = getPartitions(); + final int numPartitions = partitions.size(); + + final List<Future<?>> futures = new ArrayList<>(numPartitions); + final ExecutorService executor = Executors.newFixedThreadPool(numPartitions); + + for (final WriteAheadStorePartition partition : partitions) { + futures.add(executor.submit(() -> partition.reindexLatestEvents(eventIndex))); + } + + executor.shutdown(); + for (final Future<?> future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to re-index events because Thread was interrupted", e); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to re-index events", e); + } + } + } + + @Override + protected List<WriteAheadStorePartition> getPartitions() { + return partitions; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java new file mode 100644 index 0000000..ddb8165 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; + +import org.apache.nifi.provenance.serialization.RecordReader; + +public interface RecordReaderFactory { + RecordReader newRecordReader(File file, Collection<Path> provenanceLogFiles, int maxAttributeChars) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java new file mode 100644 index 0000000..89da603 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.provenance.serialization.RecordWriter; + +public interface RecordWriterFactory { + RecordWriter createWriter(final File file, final AtomicLong idGenerator, final boolean compressed, final boolean createToc) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java new file mode 100644 index 0000000..8543d2b --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RecordWriterLease { + private final Logger logger = LoggerFactory.getLogger(RecordWriterLease.class); + + private final RecordWriter writer; + private final long maxBytes; + private final int maxEvents; + private long usageCounter; + private boolean markedRollable = false; + private boolean closed = false; + + public RecordWriterLease(final RecordWriter writer, final long maxBytes) { + this(writer, maxBytes, Integer.MAX_VALUE); + } + + public RecordWriterLease(final RecordWriter writer, final long maxBytes, final int maxEvents) { + this.writer = writer; + this.maxBytes = maxBytes; + this.maxEvents = maxEvents; + } + + public RecordWriter getWriter() { + return writer; + } + + public synchronized boolean tryClaim() { + if (markedRollable || writer.isClosed() || writer.isDirty() || writer.getBytesWritten() >= maxBytes || writer.getRecordsWritten() >= maxEvents) { + return false; + } + + usageCounter++; + return true; + } + + public synchronized void relinquishClaim() { + usageCounter--; + + if (closed && usageCounter < 1) { + try { + writer.close(); + } catch (final Exception e) { + logger.warn("Failed to close " + writer, e); + } + } + } + + public synchronized boolean shouldRoll() { + if (markedRollable) { + return true; + } + + if (usageCounter < 1 && (writer.isClosed() || writer.isDirty() || writer.getBytesWritten() >= maxBytes || writer.getRecordsWritten() >= maxEvents)) { + markedRollable = true; + return true; + } + + return false; + } + + public synchronized void close() { + closed = true; + + if (usageCounter < 1) { + try { + writer.close(); + } catch (final Exception e) { + logger.warn("Failed to close " + writer, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java new file mode 100644 index 0000000..94b1ece --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.util.Collections; +import java.util.Map; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.serialization.StorageSummary; + +public interface StorageResult { + /** + * @return a map of each Provenance Event Record to the location where it was stored + */ + Map<ProvenanceEventRecord, StorageSummary> getStorageLocations(); + + /** + * Indicates whether or not the storage of events triggered the store to roll over + * the storage location that it is storing data to + * + * @return <code>true</code> if the store rolled over to a new storage location, <code>false</code> otherwise + */ + boolean triggeredRollover(); + + /** + * @return the number of events that were stored in the storage location that was rolled over, or + * <code>null</code> if no storage locations were rolled over. + */ + Integer getEventsRolledOver(); + + public static StorageResult EMPTY = new StorageResult() { + @Override + public Map<ProvenanceEventRecord, StorageSummary> getStorageLocations() { + return Collections.emptyMap(); + } + + @Override + public boolean triggeredRollover() { + return false; + } + + @Override + public Integer getEventsRolledOver() { + return null; + } + + @Override + public String toString() { + return "StorageResult.EMPTY"; + } + }; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java new file mode 100644 index 0000000..a25043a --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -0,0 +1,637 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.index.EventIndex; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.provenance.store.iterator.EventIterator; +import org.apache.nifi.provenance.store.iterator.SelectiveRecordReaderEventIterator; +import org.apache.nifi.provenance.store.iterator.SequentialRecordReaderEventIterator; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.util.DirectoryUtils; +import org.apache.nifi.provenance.util.NamedThreadFactory; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WriteAheadStorePartition implements EventStorePartition { + private static final Logger logger = LoggerFactory.getLogger(WriteAheadStorePartition.class); + + + private final RepositoryConfiguration config; + private final File partitionDirectory; + private final String partitionName; + private final RecordWriterFactory recordWriterFactory; + private final RecordReaderFactory recordReaderFactory; + private final BlockingQueue<File> filesToCompress; + private final AtomicLong idGenerator; + private final AtomicLong maxEventId = new AtomicLong(-1L); + private volatile boolean closed = false; + + private AtomicReference<RecordWriterLease> eventWriterLeaseRef = new AtomicReference<>(); + + private final SortedMap<Long, File> minEventIdToPathMap = new TreeMap<>(); // guarded by synchronizing on object + + public WriteAheadStorePartition(final File storageDirectory, final String partitionName, final RepositoryConfiguration repoConfig, final RecordWriterFactory recordWriterFactory, + final RecordReaderFactory recordReaderFactory, final BlockingQueue<File> filesToCompress, final AtomicLong idGenerator, final EventReporter eventReporter) { + + this.partitionName = partitionName; + this.config = repoConfig; + this.idGenerator = idGenerator; + this.partitionDirectory = storageDirectory; + this.recordWriterFactory = recordWriterFactory; + this.recordReaderFactory = recordReaderFactory; + this.filesToCompress = filesToCompress; + } + + @Override + public void close() throws IOException { + closed = true; + + final RecordWriterLease lease = eventWriterLeaseRef.get(); + if (lease != null) { + lease.close(); + } + } + + @Override + public synchronized void initialize() throws IOException { + if (!partitionDirectory.exists()) { + Files.createDirectories(partitionDirectory.toPath()); + } + + final File[] files = partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER); + if (files == null) { + throw new IOException("Could not access files in the " + partitionDirectory + " directory"); + } + + // We need to determine what the largest Event ID is in this partition. To do this, we + // iterate over all files starting with the file that has the greatest ID, and try to find + // the largest Event ID in that file. Once we successfully determine the greatest Event ID + // in any one of the files, we are done, since we are iterating over the files in order of + // the Largest Event ID to the smallest. + long maxEventId = -1L; + final List<File> fileList = Arrays.asList(files); + Collections.sort(fileList, DirectoryUtils.LARGEST_ID_FIRST); + for (final File file : fileList) { + try { + final RecordReader reader = recordReaderFactory.newRecordReader(file, Collections.emptyList(), Integer.MAX_VALUE); + final long eventId = reader.getMaxEventId(); + if (eventId > maxEventId) { + maxEventId = eventId; + break; + } + } catch (final IOException ioe) { + logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, ioe); + } + } + + synchronized (minEventIdToPathMap) { + for (final File file : fileList) { + final long minEventId = DirectoryUtils.getMinId(file); + minEventIdToPathMap.put(minEventId, file); + } + } + + this.maxEventId.set(maxEventId); + + // If configured to compress, compress any files that are not yet compressed. + if (config.isCompressOnRollover()) { + final File[] uncompressedFiles = partitionDirectory.listFiles(f -> f.getName().endsWith(".prov")); + if (uncompressedFiles != null) { + for (final File file : uncompressedFiles) { + // If we have both a compressed file and an uncompressed file for the same .prov file, then + // we must have been in the process of compressing it when NiFi was restarted. Delete the partial + // .gz file and we will start compressing it again. + final File compressed = new File(file.getParentFile(), file.getName() + ".gz"); + if (compressed.exists()) { + compressed.delete(); + } + } + } + } + + // Update the ID Generator to the max of the ID Generator or maxEventId + final long nextPartitionId = maxEventId + 1; + final long updatedId = idGenerator.updateAndGet(curVal -> Math.max(curVal, nextPartitionId)); + logger.info("After recovering {}, next Event ID to be generated will be {}", partitionDirectory, updatedId); + } + + + @Override + public StorageResult addEvents(final Iterable<ProvenanceEventRecord> events) throws IOException { + if (closed) { + throw new IOException(this + " is closed"); + } + + // Claim a Record Writer Lease so that we have a writer to persist the events to + boolean claimed = false; + RecordWriterLease lease = null; + while (!claimed) { + lease = getLease(); + claimed = lease.tryClaim(); + + if (claimed) { + break; + } + + if (lease.shouldRoll()) { + tryRollover(lease); + } + } + + // Add the events to the writer and ensure that we always + // relinquish the claim that we've obtained on the writer + Map<ProvenanceEventRecord, StorageSummary> storageMap; + final RecordWriter writer = lease.getWriter(); + try { + storageMap = addEvents(events, writer); + } finally { + lease.relinquishClaim(); + } + + // Roll over the writer if necessary + Integer eventsRolledOver = null; + final boolean shouldRoll = lease.shouldRoll(); + try { + if (shouldRoll && tryRollover(lease)) { + eventsRolledOver = writer.getRecordsWritten(); + } + } catch (final IOException ioe) { + logger.error("Updated {} but failed to rollover to a new Event File", this, ioe); + } + + final Integer rolloverCount = eventsRolledOver; + return new StorageResult() { + @Override + public Map<ProvenanceEventRecord, StorageSummary> getStorageLocations() { + return storageMap; + } + + @Override + public boolean triggeredRollover() { + return rolloverCount != null; + } + + @Override + public Integer getEventsRolledOver() { + return rolloverCount; + } + + @Override + public String toString() { + return getStorageLocations().toString(); + } + }; + } + + private RecordWriterLease getLease() throws IOException { + while (true) { + final RecordWriterLease lease = eventWriterLeaseRef.get(); + if (lease != null) { + return lease; + } + + if (tryRollover(null)) { + return eventWriterLeaseRef.get(); + } + } + } + + private synchronized boolean tryRollover(final RecordWriterLease lease) throws IOException { + if (!Objects.equals(lease, eventWriterLeaseRef.get())) { + return false; + } + + final long nextEventId = idGenerator.get(); + final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov"); + final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true); + final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); + final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); + + if (updated) { + updatedWriter.writeHeader(nextEventId); + + synchronized (minEventIdToPathMap) { + minEventIdToPathMap.put(nextEventId, updatedEventFile); + } + + if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { + boolean offered = false; + while (!offered && !closed) { + try { + offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); + } + } + } + + return true; + } else { + try { + updatedWriter.close(); + } catch (final Exception e) { + logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); + } + + updatedEventFile.delete(); + return false; + } + } + + private Map<ProvenanceEventRecord, StorageSummary> addEvents(final Iterable<ProvenanceEventRecord> events, final RecordWriter writer) throws IOException { + final Map<ProvenanceEventRecord, StorageSummary> locationMap = new HashMap<>(); + + try { + long maxId = -1L; + int numEvents = 0; + for (final ProvenanceEventRecord nextEvent : events) { + final StorageSummary writerSummary = writer.writeRecord(nextEvent); + final StorageSummary summaryWithIndex = new StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), this.partitionName, + writerSummary.getBlockIndex(), writerSummary.getSerializedLength(), writerSummary.getBytesWritten()); + locationMap.put(nextEvent, summaryWithIndex); + maxId = summaryWithIndex.getEventId(); + numEvents++; + } + + if (numEvents == 0) { + return locationMap; + } + + writer.flush(); + + // Update max event id to be equal to be the greater of the current value or the + // max value just written. + final long maxIdWritten = maxId; + this.maxEventId.getAndUpdate(cur -> maxIdWritten > cur ? maxIdWritten : cur); + + if (config.isAlwaysSync()) { + writer.sync(); + } + } catch (final Exception e) { + // We need to set the repoDirty flag before we release the lock for this journal. + // Otherwise, another thread may write to this journal -- this is a problem because + // the journal contains part of our record but not all of it. Writing to the end of this + // journal will result in corruption! + writer.markDirty(); + throw e; + } + + return locationMap; + } + + + @Override + public long getSize() { + return getEventFilesFromDisk() + .collect(Collectors.summarizingLong(file -> file.length())) + .getSum(); + } + + private Stream<File> getEventFilesFromDisk() { + final File[] files = partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER); + return files == null ? Stream.empty() : Arrays.stream(files); + } + + @Override + public long getMaxEventId() { + return maxEventId.get(); + } + + @Override + public Optional<ProvenanceEventRecord> getEvent(final long id) throws IOException { + final Optional<File> option = getPathForEventId(id); + if (!option.isPresent()) { + return Optional.empty(); + } + + try (final RecordReader reader = recordReaderFactory.newRecordReader(option.get(), Collections.emptyList(), config.getMaxAttributeChars())) { + final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(id); + if (!eventOption.isPresent()) { + return eventOption; + } + + // If an event is returned, the event may be the one we want, or it may be an event with a + // higher event ID, if the desired event is not in the record reader. So we need to get the + // event and check the Event ID to know whether to return the empty optional or the Optional + // that was returned. + final ProvenanceEventRecord event = eventOption.get(); + if (event.getEventId() == id) { + return eventOption; + } else { + return Optional.empty(); + } + } + } + + @Override + public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxEvents, final EventAuthorizer authorizer) throws IOException { + final List<ProvenanceEventRecord> events = new ArrayList<>(Math.min(maxEvents, 1000)); + try (final EventIterator iterator = createEventIterator(firstRecordId)) { + Optional<ProvenanceEventRecord> eventOption; + while ((eventOption = iterator.nextEvent()).isPresent() && events.size() < maxEvents) { + final ProvenanceEventRecord event = eventOption.get(); + if (authorizer.isAuthorized(event)) { + events.add(event); + } + } + } + + return events; + } + + @Override + public EventIterator createEventIterator(final long minDesiredId) { + final List<File> filesOfInterest = new ArrayList<>(); + synchronized (minEventIdToPathMap) { + File lastFile = null; + + for (final Map.Entry<Long, File> entry : minEventIdToPathMap.entrySet()) { + final long minFileId = entry.getKey(); + + // If the minimum ID for the file is greater than the minDesiredId, then + // that means that we will want to iterate over this file. + if (minFileId > minDesiredId) { + // The minimum ID for this file is greater than the desired ID, so + // that means that the last file we saw may have the minimum desired + // ID and any number of more events before we get to this file. So + // if we've not already added the lastFile, add it now. + if (filesOfInterest.isEmpty() && lastFile != null) { + filesOfInterest.add(lastFile); + } + + filesOfInterest.add(entry.getValue()); + } + + lastFile = entry.getValue(); + } + + // We don't know the max ID of the last file, so we always want to include it, since it may contain + // an event with an ID greater than minDesiredId. + if (lastFile != null && !filesOfInterest.contains(lastFile)) { + filesOfInterest.add(lastFile); + } + } + + if (filesOfInterest.isEmpty()) { + return EventIterator.EMPTY; + } + + return new SequentialRecordReaderEventIterator(filesOfInterest, recordReaderFactory, minDesiredId, config.getMaxAttributeChars()); + } + + + @Override + public EventIterator createEventIterator(final List<Long> eventIds) { + final List<File> allFiles; + synchronized (minEventIdToPathMap) { + allFiles = new ArrayList<>(minEventIdToPathMap.values()); + } + + if (allFiles.isEmpty()) { + return EventIterator.EMPTY; + } + + return new SelectiveRecordReaderEventIterator(allFiles, recordReaderFactory, eventIds, config.getMaxAttributeChars()); + } + + private Optional<File> getPathForEventId(final long id) { + File lastFile = null; + + synchronized (minEventIdToPathMap) { + for (final Map.Entry<Long, File> entry : minEventIdToPathMap.entrySet()) { + final long minId = entry.getKey(); + if (minId > id) { + break; + } + + lastFile = entry.getValue(); + } + } + + return Optional.ofNullable(lastFile); + } + + + @Override + public void purgeOldEvents(final long olderThan, final TimeUnit unit) { + final long timeCutoff = System.currentTimeMillis() - unit.toMillis(olderThan); + + getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff) + .sorted(DirectoryUtils.SMALLEST_ID_FIRST) + .forEach(file -> delete(file)); + } + + + @Override + public long purgeOldestEvents() { + final List<File> eventFiles = getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList()); + if (eventFiles.isEmpty()) { + return 0L; + } + + for (final File eventFile : eventFiles) { + final long fileSize = eventFile.length(); + + if (delete(eventFile)) { + logger.debug("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize)); + return fileSize; + } else { + logger.warn("{} Failed to delete oldest event file {}. This file should be cleaned up manually.", this, eventFile); + continue; + } + } + + return 0L; + } + + private boolean delete(final File file) { + final long firstEventId = DirectoryUtils.getMinId(file); + synchronized (minEventIdToPathMap) { + minEventIdToPathMap.remove(firstEventId); + } + + if (!file.delete()) { + logger.warn("Failed to remove Provenance Event file {}; this file should be cleaned up manually", file); + return false; + } + + final File tocFile = TocUtil.getTocFile(file); + if (tocFile.exists() && !tocFile.delete()) { + logger.warn("Failed to remove Provenance Table-of-Contents file {}; this file should be cleaned up manually", tocFile); + } + + return true; + } + + void reindexLatestEvents(final EventIndex eventIndex) { + final List<File> eventFiles = getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList()); + if (eventFiles.isEmpty()) { + return; + } + + final long minEventIdToReindex = eventIndex.getMinimumEventIdToReindex(partitionName); + final long maxEventId = getMaxEventId(); + final long eventsToReindex = maxEventId - minEventIdToReindex; + + logger.info("The last Provenance Event indexed for partition {} is {}, but the last event written to partition has ID {}. " + + "Re-indexing up to the last {} events to ensure that the Event Index is accurate and up-to-date", + partitionName, minEventIdToReindex, maxEventId, eventsToReindex, partitionDirectory); + + // Find the first event file that we care about. + int firstEventFileIndex = 0; + for (int i = eventFiles.size() - 1; i >= 0; i--) { + final File eventFile = eventFiles.get(i); + final long minIdInFile = DirectoryUtils.getMinId(eventFile); + if (minIdInFile <= minEventIdToReindex) { + firstEventFileIndex = i; + break; + } + } + + // Create a subList that contains the files of interest + final List<File> eventFilesToReindex = eventFiles.subList(firstEventFileIndex, eventFiles.size()); + + final ExecutorService executor = Executors.newFixedThreadPool(Math.min(4, eventFilesToReindex.size()), new NamedThreadFactory("Re-Index Provenance Events", true)); + final List<Future<?>> futures = new ArrayList<>(eventFilesToReindex.size()); + final AtomicLong reindexedCount = new AtomicLong(0L); + + // Re-Index the last bunch of events. + // We don't use an Event Iterator here because it's possible that one of the event files could be corrupt (for example, if NiFi does while + // writing to the file, a record may be incomplete). We don't want to prevent us from moving on and continuing to index the rest of the + // un-indexed events. So we just use a List of files and create a reader for each one. + final long start = System.nanoTime(); + int fileCount = 0; + for (final File eventFile : eventFilesToReindex) { + final boolean skipToEvent; + if (fileCount++ == 0) { + skipToEvent = true; + } else { + skipToEvent = false; + } + + final Runnable reindexTask = new Runnable() { + @Override + public void run() { + final Map<ProvenanceEventRecord, StorageSummary> storageMap = new HashMap<>(1000); + + try (final RecordReader recordReader = recordReaderFactory.newRecordReader(eventFile, Collections.emptyList(), Integer.MAX_VALUE)) { + if (skipToEvent) { + final Optional<ProvenanceEventRecord> eventOption = recordReader.skipToEvent(minEventIdToReindex); + if (!eventOption.isPresent()) { + return; + } + } + + StandardProvenanceEventRecord event = null; + while (true) { + final long startBytesConsumed = recordReader.getBytesConsumed(); + + event = recordReader.nextRecord(); + if (event == null) { + eventIndex.reindexEvents(storageMap); + reindexedCount.addAndGet(storageMap.size()); + storageMap.clear(); + break; // stop reading from this file + } else { + final long eventSize = recordReader.getBytesConsumed() - startBytesConsumed; + storageMap.put(event, new StorageSummary(event.getEventId(), eventFile.getName(), partitionName, recordReader.getBlockIndex(), eventSize, 0L)); + + if (storageMap.size() == 1000) { + eventIndex.reindexEvents(storageMap); + reindexedCount.addAndGet(storageMap.size()); + storageMap.clear(); + } + } + } + } catch (final EOFException eof) { + // Ran out of data. Continue on. + logger.warn("Failed to find event with ID {} in Event File {} due to {}", minEventIdToReindex, eventFile, eof.toString()); + } catch (final Exception e) { + logger.error("Failed to index Provenance Events found in {}", eventFile, e); + } + } + }; + + futures.add(executor.submit(reindexTask)); + } + + for (final Future<?> future : futures) { + try { + future.get(); + } catch (final ExecutionException ee) { + logger.error("Failed to re-index some Provenance events. These events may not be query-able via the Provenance interface", ee.getCause()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while waiting for Provenance events to be re-indexed", e); + break; + } + } + + try { + eventIndex.commitChanges(partitionName); + } catch (final IOException e) { + logger.error("Failed to re-index Provenance Events for partition " + partitionName, e); + } + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + final long seconds = millis / 1000L; + final long millisRemainder = millis % 1000L; + logger.info("Finished re-indexing {} events across {} files for {} in {}.{} seconds", + reindexedCount.get(), eventFilesToReindex.size(), partitionDirectory, seconds, millisRemainder); + } + + @Override + public String toString() { + return "Provenance Event Store Partition[directory=" + partitionDirectory + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java new file mode 100644 index 0000000..7ff2be7 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store.iterator; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.authorization.EventTransformer; + +public class AuthorizingEventIterator implements EventIterator { + private final EventIterator iterator; + private final EventAuthorizer authorizer; + private final EventTransformer transformer; + + public AuthorizingEventIterator(final EventIterator iterator, final EventAuthorizer authorizer, + final EventTransformer unauthorizedTransformer) { + this.iterator = iterator; + this.authorizer = authorizer; + this.transformer = unauthorizedTransformer; + } + + @Override + public void close() throws IOException { + iterator.close(); + } + + @Override + public Optional<ProvenanceEventRecord> nextEvent() throws IOException { + while (true) { + final Optional<ProvenanceEventRecord> next = iterator.nextEvent(); + if (!next.isPresent()) { + return next; + } + + if (authorizer.isAuthorized(next.get())) { + return next; + } + + final Optional<ProvenanceEventRecord> eventOption = transformer.transform(next.get()); + if (eventOption.isPresent()) { + return eventOption; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java new file mode 100644 index 0000000..79acda3 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store.iterator; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Optional; + +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface EventIterator extends Closeable { + + Optional<ProvenanceEventRecord> nextEvent() throws IOException; + + public static EventIterator EMPTY = new EventIterator() { + @Override + public void close() throws IOException { + } + + @Override + public Optional<ProvenanceEventRecord> nextEvent() { + return Optional.empty(); + } + }; + + public static EventIterator of(final ProvenanceEventRecord... events) { + final Iterator<ProvenanceEventRecord> itr = Arrays.asList(events).iterator(); + return new EventIterator() { + @Override + public void close() throws IOException { + } + + @Override + public Optional<ProvenanceEventRecord> nextEvent() { + return itr.hasNext() ? Optional.empty() : Optional.of(itr.next()); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java new file mode 100644 index 0000000..c4a130b --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store.iterator; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.store.RecordReaderFactory; +import org.apache.nifi.provenance.util.DirectoryUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SelectiveRecordReaderEventIterator implements EventIterator { + private static final Logger logger = LoggerFactory.getLogger(SelectiveRecordReaderEventIterator.class); + private final List<File> files; + private final RecordReaderFactory readerFactory; + private final List<Long> eventIds; + private final Iterator<Long> idIterator; + private final int maxAttributeChars; + + private boolean closed = false; + private RecordReader reader; + private File currentFile; + + public SelectiveRecordReaderEventIterator(final List<File> filesToRead, final RecordReaderFactory readerFactory, final List<Long> eventIds, final int maxAttributeChars) { + this.readerFactory = readerFactory; + + this.eventIds = new ArrayList<>(eventIds); + Collections.sort(this.eventIds); + idIterator = this.eventIds.iterator(); + + // Make a copy of the list of files and prune out any Files that are not relevant to the Event ID's that we were given. + if (eventIds.isEmpty() || filesToRead.isEmpty()) { + this.files = Collections.emptyList(); + } else { + this.files = filterUnneededFiles(filesToRead, this.eventIds); + } + + this.maxAttributeChars = maxAttributeChars; + } + + protected static List<File> filterUnneededFiles(final List<File> filesToRead, final List<Long> eventIds) { + final List<File> files = new ArrayList<>(); + final Long firstEventId = eventIds.get(0); + final Long lastEventId = eventIds.get(eventIds.size() - 1); + + final List<File> sortedFileList = new ArrayList<>(filesToRead); + Collections.sort(sortedFileList, DirectoryUtils.SMALLEST_ID_FIRST); + + File lastFile = null; + for (final File file : filesToRead) { + final long firstIdInFile = DirectoryUtils.getMinId(file); + if (firstIdInFile > lastEventId) { + continue; + } + + if (firstIdInFile > firstEventId) { + if (files.isEmpty() && lastFile != null) { + files.add(lastFile); + } + + files.add(file); + } + + lastFile = file; + } + + if (files.isEmpty() && lastFile != null) { + files.add(lastFile); + } + + return files; + } + + @Override + public void close() throws IOException { + closed = true; + + if (reader != null) { + reader.close(); + } + } + + + @Override + public Optional<ProvenanceEventRecord> nextEvent() throws IOException { + if (closed) { + throw new IOException("EventIterator is already closed"); + } + + final long start = System.nanoTime(); + try { + while (idIterator.hasNext()) { + // Determine the next event ID to fetch + final long eventId = idIterator.next(); + + // Determine which file the event should be in. + final File fileForEvent = getFileForEventId(eventId); + if (fileForEvent == null) { + continue; + } + + // If we determined which file the event should be in, and that's not the file that + // we are currently reading from, rotate the reader to the appropriate one. + if (!fileForEvent.equals(currentFile)) { + if (reader != null) { + try { + reader.close(); + } catch (final Exception e) { + logger.warn("Failed to close {}; some resources may not be cleaned up appropriately", reader); + } + } + + reader = readerFactory.newRecordReader(fileForEvent, Collections.emptyList(), maxAttributeChars); + this.currentFile = fileForEvent; + } + + final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(eventId); + if (eventOption.isPresent() && eventOption.get().getEventId() == eventId) { + reader.nextRecord(); // consume the event from the stream. + return eventOption; + } + + continue; + } + + return Optional.empty(); + } finally { + final long ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.trace("Took {} ms to read next event", ms); + } + } + + private File getFileForEventId(final long eventId) { + File lastFile = null; + for (final File file : files) { + final long firstEventId = DirectoryUtils.getMinId(file); + if (firstEventId == eventId) { + return file; + } + + if (firstEventId > eventId) { + return lastFile; + } + + lastFile = file; + } + + return lastFile; + } +}