http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index b60d187,01ad941..c1100c8 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@@ -51,23 -52,30 +52,35 @@@ import org.apache.nifi.connectable.Posi import org.apache.nifi.connectable.Size; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; + import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; ++import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; + import org.apache.nifi.controller.service.ControllerServiceLoader; + import org.apache.nifi.controller.service.ControllerServiceNode; + import org.apache.nifi.controller.service.ControllerServiceState; + import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.BulletinFactory; - import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.fingerprint.FingerprintException; import org.apache.nifi.fingerprint.FingerprintFactory; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; ++import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; ++import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; ++import org.apache.nifi.reporting.InitializationException; ++import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; + import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.NiFiProperties; + import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; + import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; @@@ -313,7 -346,114 +351,124 @@@ public class StandardFlowSynchronizer i return baos.toByteArray(); } + + + private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) { + final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); + + final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); + final boolean dtoEnabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); + + final ControllerServiceNode serviceNode = controller.getControllerServiceNode(dto.getId()); + final ControllerServiceState serviceState = serviceNode.getState(); + final boolean serviceEnabled = (serviceState == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); + + if (dtoEnabled && !serviceEnabled) { + controller.enableControllerService(controller.getControllerServiceNode(dto.getId())); + } else if (!dtoEnabled && serviceEnabled) { + controller.disableControllerService(controller.getControllerServiceNode(dto.getId())); + } + } + + private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException { + final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); + + final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); + reportingTask.setName(dto.getName()); + reportingTask.setComments(dto.getComment()); + reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod()); + reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); + + reportingTask.setAnnotationData(dto.getAnnotationData()); + + for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { + if (entry.getValue() == null) { + reportingTask.removeProperty(entry.getKey()); + } else { + reportingTask.setProperty(entry.getKey(), entry.getValue()); + } + } + ++ final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); ++ final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), ++ SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); ++ ++ try { ++ reportingTask.getReportingTask().initialize(config); ++ } catch (final InitializationException ie) { ++ throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie); ++ } ++ + if ( autoResumeState ) { + if ( ScheduledState.RUNNING.name().equals(dto.getState()) ) { + try { + controller.startReportingTask(reportingTask); + } catch (final Exception e) { + logger.error("Failed to start {} due to {}", reportingTask, e); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( + "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e)); + } + } else if ( ScheduledState.DISABLED.name().equals(dto.getState()) ) { + try { + controller.disableReportingTask(reportingTask); + } catch (final Exception e) { + logger.error("Failed to mark {} as disabled due to {}", reportingTask, e); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( + "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e)); + } + } + } + } + private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) { + final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); + final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId()); + + if (!taskNode.getScheduledState().name().equals(dto.getState())) { + try { + switch (ScheduledState.valueOf(dto.getState())) { + case DISABLED: + if ( taskNode.isRunning() ) { + controller.stopReportingTask(taskNode); + } + controller.disableReportingTask(taskNode); + break; + case RUNNING: + if ( taskNode.getScheduledState() == ScheduledState.DISABLED ) { + controller.enableReportingTask(taskNode); + } + controller.startReportingTask(taskNode); + break; + case STOPPED: + if (taskNode.getScheduledState() == ScheduledState.DISABLED) { + controller.enableReportingTask(taskNode); + } else if (taskNode.getScheduledState() == ScheduledState.RUNNING) { + controller.stopReportingTask(taskNode); + } + break; + } + } catch (final IllegalStateException ise) { + logger.error("Failed to change Scheduled State of {} from {} to {} due to {}", taskNode, taskNode.getScheduledState().name(), dto.getState(), ise.toString()); + logger.error("", ise); + + // create bulletin for the Processor Node + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node Reconnection", Severity.ERROR.name(), + "Failed to change Scheduled State of " + taskNode + " from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString())); + + // create bulletin at Controller level. + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node Reconnection", Severity.ERROR.name(), + "Failed to change Scheduled State of " + taskNode + " from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString())); + } + } + } + + private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException { // get the parent group ID
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 7c3734a,272c0ba..6b3cc95 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@@ -44,9 -48,9 +48,9 @@@ public abstract class AbstractReporting private final ControllerServiceLookup serviceLookup; private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN); -- private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins"); - private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY); - ++ private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("1 mins"); + + private volatile String comment; private volatile ScheduledState scheduledState = ScheduledState.STOPPED; public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 1627994,087ec68..7347632 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@@ -177,16 -182,11 +182,11 @@@ public final class StandardProcessSched } break; - } catch (final InvocationTargetException ite) { - LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); - LOG.error("", ite.getTargetException()); - - try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { - } } catch (final Exception e) { + final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; - final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); ++ final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), reportingTask); + componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); + LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); try { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java index fc197e8,0000000..12f2911 mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java @@@ -1,744 -1,0 +1,748 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.pql.ProvenanceQuery; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.SearchableFieldParser; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.StorageLocation; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException; +import org.apache.nifi.provenance.journaling.index.EventIndexSearcher; +import org.apache.nifi.provenance.journaling.index.IndexAction; +import org.apache.nifi.provenance.journaling.index.IndexManager; +import org.apache.nifi.provenance.journaling.index.LuceneIndexManager; +import org.apache.nifi.provenance.journaling.index.QueryUtils; +import org.apache.nifi.provenance.journaling.journals.JournalReader; +import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; +import org.apache.nifi.provenance.journaling.partition.Partition; +import org.apache.nifi.provenance.journaling.partition.PartitionAction; +import org.apache.nifi.provenance.journaling.partition.PartitionManager; +import org.apache.nifi.provenance.journaling.partition.QueuingPartitionManager; +import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction; +import org.apache.nifi.provenance.journaling.query.QueryManager; +import org.apache.nifi.provenance.journaling.query.StandardQueryManager; +import org.apache.nifi.provenance.journaling.toc.StandardTocReader; +import org.apache.nifi.provenance.journaling.toc.TocReader; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.query.ProvenanceQueryResult; +import org.apache.nifi.provenance.query.ProvenanceQuerySubmission; +import org.apache.nifi.provenance.query.ProvenanceResultSet; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO: read-only is not checked everywhere! +public class JournalingProvenanceRepository implements ProvenanceEventRepository { + public static final String WORKER_THREAD_POOL_SIZE = "nifi.provenance.repository.worker.threads"; + public static final String BLOCK_SIZE = "nifi.provenance.repository.writer.block.size"; + + private static final Logger logger = LoggerFactory.getLogger(JournalingProvenanceRepository.class); + + private final JournalingRepositoryConfig config; + private final AtomicLong idGenerator = new AtomicLong(0L); + + // the follow member variables are effectively final. They are initialized + // in the initialize method rather than the constructor because we want to ensure + // that they are not created every time that the Java Service Loader instantiates the class. + private ScheduledExecutorService workerExecutor; + private ExecutorService queryExecutor; + private ExecutorService compressionExecutor; + private EventReporter eventReporter; + private PartitionManager partitionManager; + private QueryManager queryManager; + private IndexManager indexManager; + + public JournalingProvenanceRepository() throws IOException { + this(createConfig()); + } + + public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException { + this.config = config; + } + + private static ThreadFactory createThreadFactory(final String namePrefix) { + final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + final AtomicInteger counter = new AtomicInteger(0); + + return new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread thread = defaultFactory.newThread(r); + thread.setName(namePrefix + "-" + counter.incrementAndGet()); + return thread; + } + }; + } + + private static JournalingRepositoryConfig createConfig() { + final NiFiProperties properties = NiFiProperties.getInstance(); + final Map<String, Path> storageDirectories = properties.getProvenanceRepositoryPaths(); + if (storageDirectories.isEmpty()) { + storageDirectories.put("provenance_repository", Paths.get("provenance_repository")); + } + final String storageTime = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours"); + final String storageSize = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB"); + final String rolloverTime = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins"); + final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); + final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); + final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); + final int workerThreads = properties.getIntegerProperty(WORKER_THREAD_POOL_SIZE, 4); + final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); + + final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS); + final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue(); + final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS); + final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, DataUnit.B).longValue(); + + final boolean compressOnRollover = Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)); + final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); + final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); + final int blockSize = properties.getIntegerProperty(BLOCK_SIZE, 5000); + + final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false")); + + final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true); + final List<SearchableField> searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false); + + // We always want to index the Event Time. + if (!searchableFields.contains(SearchableFields.EventTime)) { + searchableFields.add(SearchableFields.EventTime); + } + + final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); + + final Map<String, File> containers = new HashMap<>(storageDirectories.size()); + for ( final Map.Entry<String, Path> entry : storageDirectories.entrySet() ) { + containers.put(entry.getKey(), entry.getValue().toFile()); + } + config.setContainers(containers); + config.setCompressOnRollover(compressOnRollover); + config.setSearchableFields(searchableFields); + config.setSearchableAttributes(searchableAttributes); + config.setJournalCapacity(rolloverBytes); + config.setJournalRolloverPeriod(rolloverMillis, TimeUnit.MILLISECONDS); + config.setEventExpiration(storageMillis, TimeUnit.MILLISECONDS); + config.setMaxStorageCapacity(maxStorageBytes); + config.setQueryThreadPoolSize(queryThreads); + config.setWorkerThreadPoolSize(workerThreads); + config.setPartitionCount(journalCount); + config.setBlockSize(blockSize); + + if (shardSize != null) { + config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue()); + } + + config.setAlwaysSync(alwaysSync); + + return config; + } + + @Override + public synchronized void initialize(final EventReporter eventReporter) throws IOException { + this.eventReporter = eventReporter; + + // Ensure that the number of partitions specified by the config is at least as large as the + // number of sections that we have. If not, update the config to be equal to the number of + // sections that we have. + final Pattern numberPattern = Pattern.compile("\\d+"); + int numSections = 0; + for ( final File container : config.getContainers().values() ) { + final String[] sections = container.list(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String name) { + return numberPattern.matcher(name).matches(); + } + }); + + if ( sections != null ) { + numSections += sections.length; + } + } + + if ( config.getPartitionCount() < numSections ) { + logger.warn("Configured number of partitions for Provenance Repository is {}, but {} partitions already exist. Using {} partitions instead of {}.", + config.getPartitionCount(), numSections, numSections, config.getPartitionCount()); + config.setPartitionCount(numSections); + } + + // We use 3 different thread pools here because we don't want to threads from 1 pool to interfere with + // each other. This is because the worker threads can be long running, and they shouldn't tie up the + // compression threads. Likewise, there may be MANY compression tasks, which could delay the worker + // threads. And the query threads need to run immediately when a user submits a query - they cannot + // wait until we finish compressing data and sync'ing the repository! + final int workerThreadPoolSize = Math.max(2, config.getWorkerThreadPoolSize()); + this.workerExecutor = Executors.newScheduledThreadPool(workerThreadPoolSize, createThreadFactory("Provenance Repository Worker Thread")); + + final int queryThreadPoolSize = Math.max(2, config.getQueryThreadPoolSize()); + this.queryExecutor = Executors.newScheduledThreadPool(queryThreadPoolSize, createThreadFactory("Provenance Repository Query Thread")); + + final int compressionThreads = Math.max(1, config.getCompressionThreadPoolSize()); + this.compressionExecutor = Executors.newFixedThreadPool(compressionThreads, createThreadFactory("Provenance Repository Compression Thread")); + + this.indexManager = new LuceneIndexManager(this, config, workerExecutor, queryExecutor); + this.partitionManager = new QueuingPartitionManager(indexManager, idGenerator, config, workerExecutor, compressionExecutor); + this.queryManager = new StandardQueryManager(indexManager, queryExecutor, config, 10); + + final Long maxEventId = getMaxEventId(); + if ( maxEventId != null && maxEventId > 0 ) { + this.idGenerator.set(maxEventId); // maxEventId returns 1 greater than the last event id written + } + + // the partition manager may have caused journals to be re-indexed. We will sync the + // index manager to make sure that we are completely in sync before allowing any new data + // to be written to the repo. + indexManager.sync(); + + final long expirationFrequencyNanos = config.getExpirationFrequency(TimeUnit.NANOSECONDS); + workerExecutor.scheduleWithFixedDelay(new ExpireOldEvents(), expirationFrequencyNanos, expirationFrequencyNanos, TimeUnit.NANOSECONDS); + + workerExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + partitionManager.deleteEventsBasedOnSize(); + } + }, expirationFrequencyNanos, expirationFrequencyNanos, TimeUnit.NANOSECONDS); + } + + @Override + public ProvenanceEventBuilder eventBuilder() { + return new StandardProvenanceEventRecord.Builder(); + } + + @Override + public void registerEvent(final ProvenanceEventRecord event) throws IOException { + registerEvents(Collections.singleton(event)); + } + + @Override + public void registerEvents(final Collection<ProvenanceEventRecord> events) throws IOException { + try { + partitionManager.withPartition(new VoidPartitionAction() { + @Override + public void perform(final Partition partition) throws IOException { + partition.registerEvents(events, idGenerator.getAndAdd(events.size())); + } + }, true); + } catch (final IOException ioe) { + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.ERROR, "Provenance Repository", "Failed to persist " + events.size() + " events to Provenance Repository due to " + ioe); + } + throw ioe; + } + } + + @Override + public StoredProvenanceEvent getEvent(final long id) throws IOException { + final List<StoredProvenanceEvent> events = getEvents(id, 1); + if ( events.isEmpty() ) { + return null; + } + + // We have to check the id of the event returned, because we are requesting up to 1 record + // starting with the given id. However, if that ID doesn't exist, we could get a record + // with a larger id. + final StoredProvenanceEvent event = events.get(0); + if ( event.getEventId() == id ) { + return event; + } + + return null; + } + + @Override + public List<StoredProvenanceEvent> getEvents(final long firstRecordId, final int maxRecords) throws IOException { + // Must generate query to determine the appropriate StorageLocation objects and then call + // getEvent(List<StorageLocation>) + final Set<List<JournaledStorageLocation>> resultSet = indexManager.withEachIndex( + new IndexAction<List<JournaledStorageLocation>>() { + @Override + public List<JournaledStorageLocation> perform(final EventIndexSearcher searcher) throws IOException { + return searcher.getEvents(firstRecordId, maxRecords); + } + }); + + final ArrayList<JournaledStorageLocation> locations = new ArrayList<>(maxRecords); + for ( final List<JournaledStorageLocation> list : resultSet ) { + for ( final JournaledStorageLocation location : list ) { + locations.add(location); + } + } + + Collections.sort(locations, new Comparator<JournaledStorageLocation>() { + @Override + public int compare(final JournaledStorageLocation o1, final JournaledStorageLocation o2) { + return Long.compare(o1.getEventId(), o2.getEventId()); + } + }); + + locations.trimToSize(); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + final List<StorageLocation> storageLocations = (List<StorageLocation>) ((List) locations); + return getEvents(storageLocations); + } + + @Override + public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException { + final List<StoredProvenanceEvent> storedEvents = getEvents(Collections.singletonList(location)); + return (storedEvents == null || storedEvents.isEmpty()) ? null : storedEvents.get(0); + } + + + + @Override + public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> locations) throws IOException { + // Group the locations by journal files because we want a single thread, at most, per journal file. + final Map<File, List<JournaledStorageLocation>> orderedLocations = QueryUtils.orderLocations(locations, config); + + // Go through each journal file and create a callable that can lookup the records for that journal file. + final List<Future<List<StoredProvenanceEvent>>> futures = new ArrayList<>(); + for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : orderedLocations.entrySet() ) { + final File journalFile = entry.getKey(); + final List<JournaledStorageLocation> locationsForFile = entry.getValue(); + + final Callable<List<StoredProvenanceEvent>> callable = new Callable<List<StoredProvenanceEvent>>() { + @Override + public List<StoredProvenanceEvent> call() throws Exception { + final File tocFile = QueryUtils.getTocFile(journalFile); + if ( !journalFile.exists() || !tocFile.exists() ) { + return Collections.emptyList(); + } + + try(final TocReader tocReader = new StandardTocReader(tocFile); + final JournalReader reader = new StandardJournalReader(journalFile)) + { + final List<StoredProvenanceEvent> storedEvents = new ArrayList<>(locationsForFile.size()); + + for ( final JournaledStorageLocation location : locationsForFile ) { + final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex()); + final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId()); + + storedEvents.add(new JournaledProvenanceEvent(event, location)); + } + + return storedEvents; + } + } + }; + + final Future<List<StoredProvenanceEvent>> future = queryExecutor.submit(callable); + futures.add(future); + } + + // Get all of the events from the futures, waiting for them to finish. + final Map<StorageLocation, StoredProvenanceEvent> locationToEventMap = new HashMap<>(locations.size()); + for ( final Future<List<StoredProvenanceEvent>> future : futures ) { + try { + final List<StoredProvenanceEvent> events = future.get(); + + // Map the location to the event, so that we can then re-order the events in the same order + // that the locations were passed to us. + for ( final StoredProvenanceEvent event : events ) { + locationToEventMap.put(event.getStorageLocation(), event); + } + } catch (final ExecutionException ee) { + final Throwable cause = ee.getCause(); + if ( cause instanceof IOException ) { + throw (IOException) cause; + } else { + throw new RuntimeException(cause); + } + } catch (final InterruptedException ie) { + throw new RuntimeException(ie); + } + } + + // Sort Events by the order of the provided locations. + final List<StoredProvenanceEvent> sortedEvents = new ArrayList<>(locations.size()); + for ( final StorageLocation location : locations ) { + final StoredProvenanceEvent event = locationToEventMap.get(location); + if ( event != null ) { + sortedEvents.add(event); + } + } + + return sortedEvents; + } + + + @Override + public Long getMaxEventId() throws IOException { + final Set<Long> maxIds = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() { + @Override + public Long perform(final Partition partition) throws IOException { + return partition.getMaxEventId(); + } + }, false); + + Long maxId = null; + for ( final Long id : maxIds ) { + if ( id == null ) { + continue; + } + + if ( maxId == null || id > maxId ) { + maxId = id; + } + } + + return maxId; + } + - ProgressAwareIterator<? extends StoredProvenanceEvent> selectMatchingEvents(final String query, final AtomicLong lastTimeProgressMade) throws IOException { ++ ProgressAwareIterator<? extends StoredProvenanceEvent> selectMatchingEvents(final String query, final Set<String> referencedFields, final AtomicLong lastTimeProgressMade) throws IOException { + final Set<EventIndexSearcher> searchers = indexManager.getSearchers(); + final Iterator<EventIndexSearcher> searchItr = searchers.iterator(); + + return new ProgressAwareIterator<StoredProvenanceEvent>() { + private Iterator<LazyInitializedProvenanceEvent> eventItr; + private int searchersComplete = 0; + private EventIndexSearcher currentSearcher; + + @Override + public int getPercentComplete() { + return searchers.isEmpty() ? 100 : searchersComplete / searchers.size() * 100; + } + + @Override + public boolean hasNext() { + // while the event iterator has no information... + while ( eventItr == null || !eventItr.hasNext() ) { + // if there's not another searcher then we're out of events. + if ( !searchItr.hasNext() ) { + return false; + } + + // we're finished with this searcher. Close it. + if ( currentSearcher != null ) { + try { + currentSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", currentSearcher, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + // We have a searcher. get events from it. If there are no matches, + // then our while loop will keep going. + currentSearcher = searchItr.next(); + searchersComplete++; + + try { - eventItr = currentSearcher.select(query); ++ eventItr = currentSearcher.select(query, referencedFields); + } catch (final IOException ioe) { + throw new EventNotFoundException("Could not find next event", ioe); + } + } + + // the event iterator has no events, and the search iterator has no more + // searchers. There are no more events. + return eventItr != null && eventItr.hasNext(); + } + + @Override + public StoredProvenanceEvent next() { + lastTimeProgressMade.set(System.nanoTime()); + return eventItr.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + public ProvenanceResultSet query(final String query) throws IOException { + final ProvenanceQuerySubmission submission = submitQuery(query); + return submission.getResult().getResultSet(); + } + + + public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final String queryIdentifier) { + return queryManager.retrieveProvenanceQuerySubmission(queryIdentifier); + } + + @Override + public ProvenanceQuerySubmission submitQuery(final String query) { + ProvenanceQuerySubmission submission; + final AtomicLong lastTimeProgressMade = new AtomicLong(System.nanoTime()); + final long tenMinsInNanos = TimeUnit.MINUTES.toNanos(10); + + try { - final ProgressAwareIterator<? extends StoredProvenanceEvent> eventItr = selectMatchingEvents(query, lastTimeProgressMade); - final ProvenanceResultSet rs = ProvenanceQuery.compile(query, getSearchableFields(), getSearchableAttributes()).evaluate(eventItr); ++ final ProvenanceQuery provenanceQuery = ProvenanceQuery.compile(query, getSearchableFields(), getSearchableAttributes()); ++ ++ final Set<String> referencedFields = provenanceQuery.getReferencedFields(); ++// final Set<String> referencedFields = null; ++ final ProgressAwareIterator<? extends StoredProvenanceEvent> eventItr = selectMatchingEvents(query, referencedFields, lastTimeProgressMade); ++ final ProvenanceResultSet rs = provenanceQuery.evaluate(eventItr); + + submission = new JournalingRepoQuerySubmission(query, new ProvenanceQueryResult() { + @Override + public ProvenanceResultSet getResultSet() { + return rs; + } + + @Override + public Date getExpiration() { + return new Date(tenMinsInNanos + lastTimeProgressMade.get()); + } + + @Override + public String getError() { + return null; + } + + @Override + public int getPercentComplete() { + return eventItr.getPercentComplete(); + } + + @Override + public boolean isFinished() { + return eventItr.getPercentComplete() >= 100; + } + }); + } catch (final IOException ioe) { + logger.error("Failed to perform query {} due to {}", query, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + + submission = new JournalingRepoQuerySubmission(query, new ProvenanceQueryResult() { + @Override + public ProvenanceResultSet getResultSet() { + return null; + } + + @Override + public Date getExpiration() { + return new Date(tenMinsInNanos + lastTimeProgressMade.get()); + } + + @Override + public String getError() { + return "Failed to perform query due to " + ioe; + } + + @Override + public int getPercentComplete() { + return 0; + } + + @Override + public boolean isFinished() { + return true; + } + }); + } + + queryManager.registerSubmission(submission); + return submission; + } + + + @Override + public QuerySubmission submitQuery(final Query query) { + return queryManager.submitQuery(query); + } + + @Override + public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) { + return queryManager.retrieveQuerySubmission(queryIdentifier); + } + + @Override + public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid) { + return queryManager.submitLineageComputation(flowFileUuid); + } + + @Override + public ComputeLineageSubmission retrieveLineageSubmission(final String lineageIdentifier) { + return queryManager.retrieveLineageSubmission(lineageIdentifier); + } + + @Override + public ComputeLineageSubmission submitExpandParents(final long eventId) { + return queryManager.submitExpandParents(this, eventId); + } + + @Override + public ComputeLineageSubmission submitExpandChildren(final long eventId) { + return queryManager.submitExpandChildren(this, eventId); + } + + @Override + public void close() throws IOException { + if ( partitionManager != null ) { + partitionManager.shutdown(); + } + + if ( indexManager != null ) { + try { + indexManager.close(); + } catch (final IOException ioe) { + logger.warn("Failed to shutdown Index Manager due to {}", ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + if ( queryManager != null ) { + try { + queryManager.close(); + } catch (final IOException ioe) { + logger.warn("Failed to shutdown Query Manager due to {}", ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + compressionExecutor.shutdown(); + workerExecutor.shutdown(); + queryExecutor.shutdown(); + } + + @Override + public List<SearchableField> getSearchableFields() { + final List<SearchableField> searchableFields = new ArrayList<>(config.getSearchableFields()); + // we exclude the Event Time because it is always searchable and is a bit special in its handling + // because it dictates in some cases which index files we look at + searchableFields.remove(SearchableFields.EventTime); + return searchableFields; + } + + @Override + public List<SearchableField> getSearchableAttributes() { + return config.getSearchableAttributes(); + } + + @Override + public Long getEarliestEventTime() throws IOException { + // Get the earliest event timestamp for each partition + final Set<Long> earliestTimes = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() { + @Override + public Long perform(final Partition partition) throws IOException { + return partition.getEarliestEventTime(); + } + }, false); + + // Find the latest timestamp for each of the "earliest" timestamps. + // This is a bit odd, but we're doing it for a good reason: + // The UI is going to show the earliest time available. Because we have a partitioned write-ahead + // log, if we just return the timestamp of the earliest event available, we could end up returning + // a time for an event that exists but the next event in its lineage does not exist because it was + // already aged off of a different journal. To avoid this, we return the "latest of the earliest" + // timestamps. This way, we know that no event with a larger ID has been aged off from any of the + // partitions. + Long latest = null; + for ( final Long earliestTime : earliestTimes ) { + if ( earliestTime == null ) { + continue; + } + + if ( latest == null || earliestTime > latest ) { + latest = earliestTime; + } + } + + return latest; + } + + + + private class ExpireOldEvents implements Runnable { + @Override + public void run() { + final long now = System.currentTimeMillis(); + final long expirationThreshold = now - config.getEventExpiration(TimeUnit.MILLISECONDS); + + try { + indexManager.deleteOldEvents(expirationThreshold); + } catch (final IOException ioe) { + logger.error("Failed to delete expired events from index due to {}", ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + } + + try { + partitionManager.withEachPartitionSerially(new VoidPartitionAction() { + @Override + public void perform(final Partition partition) throws IOException { + try { + partition.deleteOldEvents(expirationThreshold); + } catch (final IOException ioe) { + logger.error("Failed to delete expired events from Partition {} due to {}", partition, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + } + } + }, false); + } catch (IOException ioe) { + logger.error("Failed to delete expired events from journals due to {}", ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java index d0562e2,0000000..1abd5c9 mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java @@@ -1,367 -1,0 +1,373 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexableField; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StorageLocation; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException; +import org.apache.nifi.provenance.journaling.index.IndexedFieldNames; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LazyInitializedProvenanceEvent implements StoredProvenanceEvent { + private static final Logger logger = LoggerFactory.getLogger(LazyInitializedProvenanceEvent.class); + + private final ProvenanceEventRepository repo; + private final StorageLocation storageLocation; + private final Document doc; + private ProvenanceEventRecord fullRecord; + + public LazyInitializedProvenanceEvent(final ProvenanceEventRepository repo, final StorageLocation storageLocation, final Document document) { + this.repo = repo; + this.storageLocation = storageLocation; + this.doc = document; + } + + @Override + public long getEventId() { + return doc.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue(); + } + + @Override + public StorageLocation getStorageLocation() { + return storageLocation; + } + + @Override + public long getEventTime() { + return doc.getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue(); + } + + private void ensureFullyLoaded() { + if ( fullRecord != null ) { + return; + } + + final long id = getEventId(); + try { + fullRecord = repo.getEvent(id); + } catch (final IOException ioe) { + final String containerName = doc.get(IndexedFieldNames.CONTAINER_NAME); + final String sectionName = doc.get(IndexedFieldNames.SECTION_NAME); + final String journalId = doc.get(IndexedFieldNames.JOURNAL_ID); + + final String error = "Failed to load event with ID " + id + " from container '" + containerName + "', section '" + sectionName + "', journal '" + journalId + "' due to " + ioe; + logger.error(error); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + + throw new EventNotFoundException(error); + } + } + + @Override + public long getFlowFileEntryDate() { + ensureFullyLoaded(); + return fullRecord.getFlowFileEntryDate(); + } + + @Override + public long getLineageStartDate() { + final IndexableField field = doc.getField(SearchableFields.LineageStartDate.getSearchableFieldName()); + if ( field != null ) { + return field.numericValue().longValue(); + } + + ensureFullyLoaded(); + return fullRecord.getLineageStartDate(); + } + + @Override + public Set<String> getLineageIdentifiers() { + ensureFullyLoaded(); + return fullRecord.getLineageIdentifiers(); + } + + @Override + public long getFileSize() { + final IndexableField field = doc.getField(SearchableFields.FileSize.getSearchableFieldName()); + if ( field != null ) { + return field.numericValue().longValue(); + } + + ensureFullyLoaded(); + return fullRecord.getFileSize(); + } + + @Override + public Long getPreviousFileSize() { + ensureFullyLoaded(); + return fullRecord.getPreviousFileSize(); + } + + @Override + public long getEventDuration() { + // TODO: Allow Event Duration to be indexed; it could be interesting for reporting. + ensureFullyLoaded(); + return fullRecord.getEventDuration(); + } + + @Override + public ProvenanceEventType getEventType() { + final String name = doc.get(SearchableFields.EventType.getSearchableFieldName()); + return ProvenanceEventType.valueOf(name.toUpperCase()); + } + + @Override + public Map<String, String> getAttributes() { + ensureFullyLoaded(); + return fullRecord.getAttributes(); + } + + @Override + public String getAttribute(final String attributeName) { + final String attr = doc.get(attributeName); + if ( attr == null ) { + ensureFullyLoaded(); + return fullRecord.getAttribute(attributeName); + } else { + return attr; + } + } + + @Override + public Map<String, String> getPreviousAttributes() { + ensureFullyLoaded(); + return fullRecord.getPreviousAttributes(); + } + + @Override + public Map<String, String> getUpdatedAttributes() { + ensureFullyLoaded(); + return fullRecord.getUpdatedAttributes(); + } + + @Override + public String getComponentId() { + final String componentId = doc.get(SearchableFields.ComponentID.getSearchableFieldName()); + if ( componentId == null ) { + ensureFullyLoaded(); + return fullRecord.getComponentId(); + } else { + return componentId; + } + } + + @Override + public String getComponentType() { + // TODO: Make indexable. + ensureFullyLoaded(); + return fullRecord.getComponentType(); + } + + @Override + public String getTransitUri() { + final String transitUri = doc.get(SearchableFields.TransitURI.getSearchableFieldName()); + if ( transitUri == null ) { + final ProvenanceEventType eventType = getEventType(); + switch (eventType) { + case RECEIVE: + case SEND: + ensureFullyLoaded(); + return fullRecord.getTransitUri(); + default: + return null; + } + } else { + return transitUri; + } + } + + @Override + public String getSourceSystemFlowFileIdentifier() { + ensureFullyLoaded(); + return fullRecord.getSourceSystemFlowFileIdentifier(); + } + + @Override + public String getFlowFileUuid() { - return doc.get(SearchableFields.FlowFileUUID.getSearchableFieldName()); ++ String uuid = doc.get(SearchableFields.FlowFileUUID.getSearchableFieldName()); ++ if ( uuid != null ) { ++ return uuid; ++ } ++ ++ ensureFullyLoaded(); ++ return fullRecord.getFlowFileUuid(); + } + + @Override + public List<String> getParentUuids() { + final IndexableField[] uuids = doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName()); - if ( uuids.length < 2 ) { ++ if ( uuids == null || uuids.length < 2 ) { + return Collections.emptyList(); + } + + switch (getEventType()) { + case JOIN: { + final List<String> parentUuids = new ArrayList<>(uuids.length - 1); + for (int i=1; i < uuids.length; i++) { + parentUuids.add(uuids[i].stringValue()); + } + return parentUuids; + } + default: + return Collections.emptyList(); + } + } + + @Override + public List<String> getChildUuids() { + final IndexableField[] uuids = doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName()); - if ( uuids.length < 2 ) { ++ if ( uuids == null || uuids.length < 2 ) { + return Collections.emptyList(); + } + + switch (getEventType()) { + case REPLAY: + case CLONE: + case FORK: { + final List<String> childUuids = new ArrayList<>(uuids.length - 1); + for (int i=1; i < uuids.length; i++) { + childUuids.add(uuids[i].stringValue()); + } + return childUuids; + } + default: + return Collections.emptyList(); + } + } + + @Override + public String getAlternateIdentifierUri() { + final String altId = doc.get(SearchableFields.AlternateIdentifierURI.getSearchableFieldName()); + if ( altId == null && getEventType() == ProvenanceEventType.ADDINFO ) { + ensureFullyLoaded(); + return fullRecord.getAlternateIdentifierUri(); + } else { + return null; + } + } + + @Override + public String getDetails() { + final String details = doc.get(SearchableFields.Details.getSearchableFieldName()); + if ( details == null ) { + ensureFullyLoaded(); + return fullRecord.getDetails(); + } + return null; + } + + @Override + public String getRelationship() { + final String relationship = doc.get(SearchableFields.Relationship.getSearchableFieldName()); + if ( relationship == null ) { + ensureFullyLoaded(); + return fullRecord.getRelationship(); + } + return null; + } + + @Override + public String getSourceQueueIdentifier() { + final String queueId = doc.get(SearchableFields.SourceQueueIdentifier.getSearchableFieldName()); + if ( queueId == null ) { + ensureFullyLoaded(); + return fullRecord.getSourceQueueIdentifier(); + } + return null; + } + + @Override + public String getContentClaimSection() { + final String claimSection = doc.get(SearchableFields.ContentClaimSection.getSearchableFieldName()); + if ( claimSection == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimSection(); + } + return null; + } + + @Override + public String getPreviousContentClaimSection() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimSection(); + } + + @Override + public String getContentClaimContainer() { + final String claimContainer = doc.get(SearchableFields.ContentClaimContainer.getSearchableFieldName()); + if ( claimContainer == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimContainer(); + } + return null; + } + + @Override + public String getPreviousContentClaimContainer() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimContainer(); + } + + @Override + public String getContentClaimIdentifier() { + final String claimIdentifier = doc.get(SearchableFields.ContentClaimIdentifier.getSearchableFieldName()); + if ( claimIdentifier == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimIdentifier(); + } + return null; + } + + @Override + public String getPreviousContentClaimIdentifier() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimIdentifier(); + } + + @Override + public Long getContentClaimOffset() { + final String claimOffset = doc.get(SearchableFields.ContentClaimOffset.getSearchableFieldName()); + if ( claimOffset == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimOffset(); + } + return null; + } + + @Override + public Long getPreviousContentClaimOffset() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimOffset(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java index 1761057,0000000..fbc5746 mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java @@@ -1,105 -1,0 +1,107 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; ++import java.util.Set; + +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent; +import org.apache.nifi.provenance.search.Query; + +public interface EventIndexSearcher extends Closeable { + /** + * Searches the repository for any events that match the provided query and returns the locations + * where those events are stored + * @param query + * @return + */ + SearchResult search(Query query) throws IOException; + + /** + * Returns the locations of all events for a FlowFile that has a FlowFile UUID in the collection of + * UUIDs provided, if the event time occurs between earliestTime and latestTime. The return value is + * ordered in the order in which the records should be read from the journals in order to obtain + * maximum efficiency + * + * @param flowFileUuids + * @param earliestTime + * @param latestTime + * + * @return + * @throws IOException + */ + List<JournaledStorageLocation> getEventsForFlowFiles(Collection<String> flowFileUuids, long earliestTime, long latestTime) throws IOException; + + /** + * Returns the locations of events that have Event ID's at least equal to minEventId, and returns + * up to the given number of results + * + * @param minEventId + * @param maxResults + * @return + * @throws IOException + */ + List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) throws IOException; + + /** + * Returns the largest event id that is known by the index being searched + * @param container + * @param section + * @return + * @throws IOException + */ + Long getMaxEventId(String container, String section) throws IOException; + + /** + * Returns the locations of the latest events for the index being searched + * @param numEvents + * @return + * @throws IOException + */ + List<JournaledStorageLocation> getLatestEvents(int numEvents) throws IOException; + + /** + * Returns the total number of events that exist for the index being searched + * @return + * @throws IOException + */ + long getNumberOfEvents() throws IOException; + + /** + * Evaluates the given query against the index, returning an iterator of lazily initialized provenance events + * + * @param query ++ * @param referencedFields the set of fields that are referenced in the query + * @throws IOException + */ - Iterator<LazyInitializedProvenanceEvent> select(String query) throws IOException; ++ Iterator<LazyInitializedProvenanceEvent> select(String query, Set<String> referencedFields) throws IOException; + + /** + * Evaluates the given query against the index, returning an iterator of locations from which the matching + * records can be retrieved + * + * @param query + * @return + * @throws IOException + */ + Iterator<JournaledStorageLocation> selectLocations(String query) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java index 3fb641e,0000000..9bc123f mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java @@@ -1,293 -1,0 +1,333 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; ++import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; ++import java.util.Set; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortField.Type; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.pql.LuceneTranslator; +import org.apache.nifi.pql.ProvenanceQuery; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent; +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.util.ObjectHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LuceneIndexSearcher implements EventIndexSearcher { + private static final Logger logger = LoggerFactory.getLogger(LuceneIndexSearcher.class); + + private final ProvenanceEventRepository repo; + private final DirectoryReader reader; + private final IndexSearcher searcher; + private final FSDirectory fsDirectory; + + private final String description; + ++ private static final Set<String> REQUIRED_FIELDS; ++ ++ static { ++ final Set<String> fields = new HashSet<>(); ++ fields.add(IndexedFieldNames.BLOCK_INDEX); ++ fields.add(IndexedFieldNames.CONTAINER_NAME); ++ fields.add(IndexedFieldNames.EVENT_ID); ++ fields.add(IndexedFieldNames.JOURNAL_ID); ++ fields.add(IndexedFieldNames.SECTION_NAME); ++ fields.add(SearchableFields.EventTime.getSearchableFieldName()); ++ fields.add(SearchableFields.EventType.getSearchableFieldName()); ++ REQUIRED_FIELDS = fields; ++ } ++ + public LuceneIndexSearcher(final ProvenanceEventRepository repo, final File indexDirectory) throws IOException { + this.repo = repo; + this.fsDirectory = FSDirectory.open(indexDirectory); + this.reader = DirectoryReader.open(fsDirectory); + this.searcher = new IndexSearcher(reader); + this.description = "LuceneIndexSearcher[indexDirectory=" + indexDirectory + "]"; + } + + public LuceneIndexSearcher(final ProvenanceEventRepository repo, final DirectoryReader reader, final File indexDirectory) { + this.repo = repo; + this.reader = reader; + this.searcher = new IndexSearcher(reader); + this.fsDirectory = null; + this.description = "LuceneIndexSearcher[indexDirectory=" + indexDirectory + "]"; + } + + @Override + public void close() throws IOException { + IOException suppressed = null; + try { + reader.close(); + } catch (final IOException ioe) { + suppressed = ioe; + } + + if ( fsDirectory != null ) { + fsDirectory.close(); + } + + if ( suppressed != null ) { + throw suppressed; + } + } + + + private List<JournaledStorageLocation> getOrderedLocations(final TopDocs topDocs) throws IOException { + final ScoreDoc[] scoreDocs = topDocs.scoreDocs; + final List<JournaledStorageLocation> locations = new ArrayList<>(scoreDocs.length); + populateLocations(topDocs, locations); + + return locations; + } + + + private void populateLocations(final TopDocs topDocs, final Collection<JournaledStorageLocation> locations) throws IOException { + for ( final ScoreDoc scoreDoc : topDocs.scoreDocs ) { + final Document document = reader.document(scoreDoc.doc); + locations.add(QueryUtils.createLocation(document)); + } + } + + + @Override + public SearchResult search(final Query provenanceQuery) throws IOException { + final org.apache.lucene.search.Query luceneQuery = QueryUtils.convertQueryToLucene(provenanceQuery); + final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs); + + return new SearchResult(locations, topDocs.totalHits); + } + + @Override + public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxResults) throws IOException { + final BooleanQuery query = new BooleanQuery(); + query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.EVENT_ID, minEventId, null, true, true), Occur.MUST); + + final TopDocs topDocs = searcher.search(query, maxResults, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG))); + return getOrderedLocations(topDocs); + } + + @Override + public Long getMaxEventId(final String container, final String section) throws IOException { + final BooleanQuery query = new BooleanQuery(); + + if ( container != null ) { + query.add(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, container)), Occur.MUST); + } + + if ( section != null ) { + query.add(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST); + } + + final TopDocs topDocs = searcher.search(query, 1, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true))); + final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs); + if ( locations.isEmpty() ) { + return null; + } + + return locations.get(0).getEventId(); + } + + @Override + public List<JournaledStorageLocation> getEventsForFlowFiles(final Collection<String> flowFileUuids, final long earliestTime, final long latestTime) throws IOException { + // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as + // "SHOULD" clauses and then setting the minimum required to 1. + final BooleanQuery flowFileIdQuery; + if (flowFileUuids == null || flowFileUuids.isEmpty()) { + flowFileIdQuery = null; + } else { + flowFileIdQuery = new BooleanQuery(); + for (final String flowFileUuid : flowFileUuids) { + flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD); + } + flowFileIdQuery.setMinimumNumberShouldMatch(1); + } + + flowFileIdQuery.add(NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(), + earliestTime, latestTime, true, true), Occur.MUST); + + final TopDocs topDocs = searcher.search(flowFileIdQuery, 1000); + return getOrderedLocations(topDocs); + } + + + @Override + public List<JournaledStorageLocation> getLatestEvents(final int numEvents) throws IOException { + final MatchAllDocsQuery query = new MatchAllDocsQuery(); + + final TopFieldDocs topDocs = searcher.search(query, numEvents, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true))); + final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs); + return locations; + } + + @Override + public String toString() { + return description; + } + + @Override + public long getNumberOfEvents() { + return reader.numDocs(); + } + + - private <T> Iterator<T> select(final String query, final DocumentTransformer<T> transformer) throws IOException { ++ private <T> Iterator<T> select(final String query, final Set<String> referencedFields, final DocumentTransformer<T> transformer) throws IOException { + final org.apache.lucene.search.Query luceneQuery = LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query, repo.getSearchableFields(), repo.getSearchableAttributes()).getWhereClause()); - final int batchSize = 1000; ++ final int batchSize = 1000000; ++ ++ final Set<String> fieldsToLoad; ++ if ( referencedFields == null ) { ++ fieldsToLoad = null; ++ } else { ++ fieldsToLoad = new HashSet<>(REQUIRED_FIELDS); ++ fieldsToLoad.addAll(referencedFields); ++ } + + final ObjectHolder<TopDocs> topDocsHolder = new ObjectHolder<>(null); + return new Iterator<T>() { + int fetched = 0; + int scoreDocIndex = 0; + + @Override + public boolean hasNext() { + if ( topDocsHolder.get() == null ) { + try { + topDocsHolder.set(searcher.search(luceneQuery, batchSize)); + } catch (final IOException ioe) { + throw new EventNotFoundException("Unable to obtain next record from " + LuceneIndexSearcher.this, ioe); + } + } + + final boolean hasNext = fetched < topDocsHolder.get().totalHits; + if ( !hasNext ) { + try { + LuceneIndexSearcher.this.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", this, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + return hasNext; + } + + @Override + public T next() { + if ( !hasNext() ) { + throw new NoSuchElementException(); + } + + TopDocs topDocs = topDocsHolder.get(); + ScoreDoc[] scoreDocs = topDocs.scoreDocs; + if ( scoreDocIndex >= scoreDocs.length ) { + try { - topDocs = searcher.searchAfter(scoreDocs[scoreDocs.length - 1], luceneQuery, batchSize); ++ topDocs = getTopDocs(scoreDocs[scoreDocs.length - 1], luceneQuery, batchSize); + topDocsHolder.set(topDocs); + scoreDocs = topDocs.scoreDocs; + scoreDocIndex = 0; + } catch (final IOException ioe) { + throw new EventNotFoundException("Unable to obtain next record from " + LuceneIndexSearcher.this, ioe); + } + } + + final ScoreDoc scoreDoc = scoreDocs[scoreDocIndex++]; + final Document document; + try { - document = searcher.doc(scoreDoc.doc); ++ document = getDocument(scoreDoc.doc, fieldsToLoad); + } catch (final IOException ioe) { + throw new EventNotFoundException("Unable to obtain next record from " + LuceneIndexSearcher.this, ioe); + } + fetched++; + + return transformer.transform(document); + } + ++ // this method exists solely for the use of a profiler so that I can see which methods are taking the longest when ++ // profiling only org.apache.nifi.* ++ private TopDocs getTopDocs(final ScoreDoc start, final org.apache.lucene.search.Query luceneQuery, final int batchSize) throws IOException { ++ return searcher.searchAfter(start, luceneQuery, batchSize); ++ } ++ ++ // this method exists solely for the use of a profiler so that I can see which methods are taking the longest when ++ // profiling only org.apache.nifi.* ++ private Document getDocument(final int docId, final Set<String> referencedFields) throws IOException { ++ if ( referencedFields == null || referencedFields.isEmpty() ) { ++ return searcher.doc(docId); ++ } else { ++ return searcher.doc(docId, referencedFields); ++ } ++ } ++ + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override - public Iterator<LazyInitializedProvenanceEvent> select(final String query) throws IOException { - return select(query, new DocumentTransformer<LazyInitializedProvenanceEvent>() { ++ public Iterator<LazyInitializedProvenanceEvent> select(final String query, final Set<String> referencedFields) throws IOException { ++ return select(query, referencedFields, new DocumentTransformer<LazyInitializedProvenanceEvent>() { + @Override + public LazyInitializedProvenanceEvent transform(final Document document) { + return new LazyInitializedProvenanceEvent(repo, QueryUtils.createLocation(document), document); + } + }); + } + + @Override + public Iterator<JournaledStorageLocation> selectLocations(final String query) throws IOException { - return select(query, new DocumentTransformer<JournaledStorageLocation>() { ++ return select(query, REQUIRED_FIELDS, new DocumentTransformer<JournaledStorageLocation>() { + @Override + public JournaledStorageLocation transform(final Document document) { + return QueryUtils.createLocation(document); + } + }); + } + + private static interface DocumentTransformer<T> { + T transform(Document document); + } +}