continuing to implement
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5f557ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5f557ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5f557ad Branch: refs/heads/journaling-prov-repo Commit: a5f557ad966c4fa70ae0a0239e3bf70dcd788ff0 Parents: b95e756 Author: Mark Payne <marka...@hotmail.com> Authored: Sun Feb 15 10:45:29 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Sun Feb 15 10:45:29 2015 -0500 ---------------------------------------------------------------------- .../JournalingProvenanceRepository.java | 30 ++- .../config/JournalingRepositoryConfig.java | 5 + .../journaling/index/EventIndexSearcher.java | 2 + .../journaling/index/IndexManager.java | 41 +++ .../journaling/index/LuceneIndexManager.java | 178 +++++++++++++ .../journaling/index/LuceneIndexSearcher.java | 23 ++ .../journaling/index/LuceneIndexWriter.java | 7 +- .../journaling/index/MultiIndexSearcher.java | 112 ++++++++ .../journals/StandardJournalWriter.java | 40 ++- .../partition/JournalingPartition.java | 254 ++++++++++--------- .../journaling/partition/PartitionManager.java | 13 + .../partition/QueuingPartitionManager.java | 16 +- .../journaling/tasks/CompressionTask.java | 2 +- .../journaling/toc/StandardTocWriter.java | 25 +- 14 files changed, 607 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java index 7911d73..cc97ee9 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java @@ -30,9 +30,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -47,6 +48,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.StorageLocation; import org.apache.nifi.provenance.StoredProvenanceEvent; import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.index.IndexManager; +import org.apache.nifi.provenance.journaling.index.LuceneIndexManager; import org.apache.nifi.provenance.journaling.index.QueryUtils; import org.apache.nifi.provenance.journaling.journals.JournalReader; import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; @@ -75,11 +78,12 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository private final JournalingRepositoryConfig config; private final AtomicLong idGenerator = new AtomicLong(0L); - private final ExecutorService executor; + private final ScheduledExecutorService executor; private EventReporter eventReporter; // effectively final private PartitionManager partitionManager; // effectively final private QueryManager queryManager; // effectively final + private IndexManager indexManager; // effectively final public JournalingProvenanceRepository() throws IOException { this(createConfig()); @@ -87,7 +91,16 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException { this.config = config; - this.executor = Executors.newFixedThreadPool(config.getThreadPoolSize()); + this.executor = Executors.newScheduledThreadPool(config.getThreadPoolSize(), new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = defaultFactory.newThread(r); + thread.setName("Provenance Repository Worker Thread"); + return thread; + } + }); } @@ -156,7 +169,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository public synchronized void initialize(final EventReporter eventReporter) throws IOException { this.eventReporter = eventReporter; - this.partitionManager = new QueuingPartitionManager(config, executor); + this.indexManager = new LuceneIndexManager(config, executor); + this.partitionManager = new QueuingPartitionManager(indexManager, config, executor); this.queryManager = new StandardQueryManager(partitionManager, config, 10); } @@ -312,7 +326,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository @Override public Long getMaxEventId() throws IOException { - final Set<Long> maxIds = partitionManager.withEachPartition(new PartitionAction<Long>() { + final Set<Long> maxIds = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() { @Override public Long perform(final Partition partition) throws IOException { return partition.getMaxEventId(); @@ -374,6 +388,10 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository partitionManager.shutdown(); } + indexManager.close(); + + // TODO: make sure that all are closed here! + executor.shutdown(); } @@ -390,7 +408,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository @Override public Long getEarliestEventTime() throws IOException { // Get the earliest event timestamp for each partition - final Set<Long> earliestTimes = partitionManager.withEachPartition(new PartitionAction<Long>() { + final Set<Long> earliestTimes = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() { @Override public Long perform(final Partition partition) throws IOException { return partition.getEarliestEventTime(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java index 8998932..18871c7 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java @@ -35,6 +35,7 @@ public class JournalingRepositoryConfig { private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private int partitionCount = 16; private int blockSize = 5000; + private int indexesPerContainer = 2; private List<SearchableField> searchableFields = new ArrayList<>(); private List<SearchableField> searchableAttributes = new ArrayList<>(); @@ -51,6 +52,10 @@ public class JournalingRepositoryConfig { return readOnly; } + public int getIndexesPerContainer() { + return indexesPerContainer; + } + /** * Specifies where the repository will store data * http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java index b669c53..753378d 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java @@ -33,4 +33,6 @@ public interface EventIndexSearcher extends Closeable { SearchResult search(Query query) throws IOException; List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) throws IOException; + + Long getMaxEventId(String container, String section) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java new file mode 100644 index 0000000..141b84a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.Closeable; +import java.io.IOException; + +public interface IndexManager extends Closeable { + + /** + * Returns an EventIndexWriter for the given container. + * @param container + * @return + */ + EventIndexWriter getIndexWriter(final String container); + + /** + * Returns the max event ID that has been indexed for the given container and section. + * + * @param container + * @param section + * @return + */ + Long getMaxEventId(String container, String section) throws IOException; + + EventIndexSearcher newIndexSearcher(String containerName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java new file mode 100644 index 0000000..d10fd00 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LuceneIndexManager implements IndexManager { + private static final Logger logger = LoggerFactory.getLogger(LuceneIndexManager.class); + + private final JournalingRepositoryConfig config; + private final ScheduledExecutorService executor; + + private final Map<String, List<LuceneIndexWriter>> writers = new HashMap<>(); + private final Map<String, AtomicLong> writerIndexes = new HashMap<>(); + + public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService executor) throws IOException { + this.config = config; + this.executor = executor; + + final int rolloverSeconds = (int) config.getJournalRolloverPeriod(TimeUnit.SECONDS); + if ( !config.isReadOnly() ) { + for ( final Map.Entry<String, File> entry : config.getContainers().entrySet() ) { + final String containerName = entry.getKey(); + final File container = entry.getValue(); + + final List<LuceneIndexWriter> writerList = new ArrayList<>(config.getIndexesPerContainer()); + writers.put(containerName, writerList); + writerIndexes.put(containerName, new AtomicLong(0L)); + + for ( int i=0; i < config.getIndexesPerContainer(); i++ ){ + final File indexDir = new File(container, "indices/" + i); + writerList.add(new LuceneIndexWriter(indexDir, config)); + } + + executor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + sync(containerName); + } catch (final Throwable t) { + logger.error("Failed to sync Provenance Repository Container {} due to {}", containerName, t); + if ( logger.isDebugEnabled() ) { + logger.error("", t); + } + } + } + }, rolloverSeconds, rolloverSeconds, TimeUnit.SECONDS); + } + } + } + + @Override + public EventIndexSearcher newIndexSearcher(final String containerName) throws IOException { + final File containerDir = config.getContainers().get(containerName); + if ( containerDir == null ) { + throw new IllegalArgumentException(); + } + + final List<EventIndexSearcher> searchers = new ArrayList<>(); + + try { + if (config.isReadOnly()) { + for (int i=0; i < config.getIndexesPerContainer(); i++) { + final File indexDir = new File(containerName, "indices/" + i); + searchers.add(new LuceneIndexSearcher(indexDir)); + } + } else { + final List<LuceneIndexWriter> writerList = writers.get(containerName); + for ( final LuceneIndexWriter writer : writerList ) { + searchers.add(writer.newIndexSearcher()); + } + } + } catch (final IOException ioe) { + // If we failed to create a searcher, we need to close all that we've already created. + for ( final EventIndexSearcher searcher : searchers ) { + try { + searcher.close(); + } catch (final IOException ioe2) { + ioe.addSuppressed(ioe2); + } + } + + throw ioe; + } + + return new MultiIndexSearcher(searchers); + } + + @Override + public LuceneIndexWriter getIndexWriter(final String container) { + if (config.isReadOnly() ) { + throw new IllegalStateException("Cannot obtain Index Writer because repository is read-only"); + } + + final AtomicLong index = writerIndexes.get(container); + if (index == null ) { + throw new IllegalArgumentException(); + } + + final long curVal = index.get(); + final List<LuceneIndexWriter> writerList = writers.get(container); + return writerList.get((int) (curVal % writerList.size())); + } + + @Override + public Long getMaxEventId(final String container, final String section) throws IOException { + final List<LuceneIndexWriter> writerList = writers.get(container); + if ( writerList == null ) { + return null; + } + + Long max = null; + for ( final LuceneIndexWriter writer : writerList ) { + try (final EventIndexSearcher searcher = writer.newIndexSearcher()) { + final Long maxForWriter = searcher.getMaxEventId(container, section); + if ( maxForWriter != null ) { + if (max == null || maxForWriter.longValue() > max.longValue() ) { + max = maxForWriter; + } + } + } + } + + return max; + } + + + private void sync(final String containerName) throws IOException { + final AtomicLong index = writerIndexes.get(containerName); + final long curIndex = index.getAndIncrement(); + + final List<LuceneIndexWriter> writerList = writers.get(containerName); + final EventIndexWriter toSync = writerList.get((int) (curIndex % writerList.size())); + toSync.sync(); + } + + @Override + public void close() throws IOException { + for ( final List<LuceneIndexWriter> writerList : writers.values() ) { + for ( final LuceneIndexWriter writer : writerList ) { + try { + writer.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", writer, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java index 32dc7c3..a9dd1a5 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; @@ -31,6 +32,7 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortField.Type; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.FSDirectory; import org.apache.nifi.provenance.journaling.JournaledStorageLocation; @@ -110,4 +112,25 @@ public class LuceneIndexSearcher implements EventIndexSearcher { return getLocations(topDocs); } + @Override + public Long getMaxEventId(final String container, final String section) throws IOException { + final BooleanQuery query = new BooleanQuery(); + + if ( container != null ) { + query.add(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, container)), Occur.MUST); + } + + if ( section != null ) { + query.add(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST); + } + + final TopDocs topDocs = searcher.search(query, 1, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true))); + final List<JournaledStorageLocation> locations = getLocations(topDocs); + if ( locations.isEmpty() ) { + return null; + } + + return locations.get(0).getEventId(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java index e955ae5..b61ad34 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java @@ -18,9 +18,11 @@ package org.apache.nifi.provenance.journaling.index; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -117,6 +119,7 @@ public class LuceneIndexWriter implements EventIndexWriter { public void index(final Collection<JournaledProvenanceEvent> events) throws IOException { long maxId = this.indexMaxId.get(); + final List<Document> documents = new ArrayList<>(events.size()); for ( final JournaledProvenanceEvent event : events ) { maxId = event.getEventId(); @@ -189,8 +192,10 @@ public class LuceneIndexWriter implements EventIndexWriter { } } - indexWriter.addDocument(doc); + documents.add(doc); } + + indexWriter.addDocuments(documents); // Update the index's max id boolean updated = false; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java new file mode 100644 index 0000000..d086ff5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.search.Query; + +public class MultiIndexSearcher implements EventIndexSearcher { + private final List<EventIndexSearcher> searchers; + + public MultiIndexSearcher(final List<EventIndexSearcher> searchers) { + this.searchers = searchers; + } + + @Override + public void close() throws IOException { + IOException suppressed = null; + + for ( final EventIndexSearcher searcher : searchers ) { + try { + searcher.close(); + } catch (final IOException ioe) { + if ( suppressed == null ) { + suppressed = ioe; + } else { + suppressed.addSuppressed(ioe); + } + } + } + + if ( suppressed != null ) { + throw suppressed; + } + } + + @Override + public SearchResult search(final Query query) throws IOException { + int totalHitCount = 0; + final List<JournaledStorageLocation> locations = new ArrayList<>(); + + for ( final EventIndexSearcher searcher : searchers ) { + final SearchResult result = searcher.search(query); + totalHitCount += result.getTotalCount(); + locations.addAll(result.getLocations()); + } + + return new SearchResult(locations, totalHitCount); + } + + @Override + public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxResults) throws IOException { + final List<JournaledStorageLocation> locations = new ArrayList<>(); + int results = 0; + + // Perform search against all searchers and aggregate results. + for ( final EventIndexSearcher searcher : searchers ) { + final List<JournaledStorageLocation> searchLocations = searcher.getEvents(minEventId, maxResults); + locations.addAll(searchLocations); + if ( !searchLocations.isEmpty() ) { + results++; + } + } + + // Results from this call are sorted. If we have only 0 or 1 searchers that had results, then + // we don't need to sort anything. Otherwise, we need to sort and return just the first X + // number of results. + if ( results > 1 ) { + Collections.sort(locations); + } + + if ( locations.size() > maxResults ) { + return locations.subList(0, maxResults); + } + + return locations; + } + + @Override + public Long getMaxEventId(final String container, final String section) throws IOException { + Long max = null; + for ( final EventIndexSearcher searcher : searchers ) { + final Long maxForWriter = searcher.getMaxEventId(container, section); + if ( maxForWriter != null ) { + if (max == null || maxForWriter.longValue() > max.longValue() ) { + max = maxForWriter; + } + } + } + + return max; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java index 5a289fe..af5f8de 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.provenance.journaling.journals; -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -29,6 +27,8 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.journaling.io.Serializer; import org.apache.nifi.remote.io.CompressionOutputStream; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; @@ -96,6 +96,9 @@ public class StandardJournalWriter implements JournalWriter { private OutputStream compressedStream; private ByteCountingOutputStream out; + private long recordBytes = 256L; + private long recordCount = 1L; + public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException { this.journalId = journalId; @@ -132,16 +135,38 @@ public class StandardJournalWriter implements JournalWriter { @Override public void close() throws IOException { finishBlock(); - - if ( compressedStream != null ) { + + IOException suppressed = null; + try { compressedStream.flush(); compressedStream.close(); + } catch (final IOException ioe) { + suppressed = ioe; + } + + try { + try { + uncompressedStream.flush(); + } finally { + uncompressedStream.close(); + } + } catch (final IOException ioe) { + if ( suppressed != null ) { + ioe.addSuppressed(suppressed); + } + throw ioe; + } + + if ( suppressed != null ) { + throw suppressed; } } @Override public void write(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final int avgRecordSize = (int) (recordBytes / recordCount); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(avgRecordSize); final DataOutputStream serializerDos = new DataOutputStream(baos); final BufferedOutputStream bos = new BufferedOutputStream(out); @@ -153,10 +178,13 @@ public class StandardJournalWriter implements JournalWriter { serializer.serialize(event, serializerDos); serializerDos.flush(); - final int recordLength = 8 + baos.size(); // record length is length of ID (8 bytes) plus length of serialized record + final int serializedLength = baos.size(); + final int recordLength = 8 + serializedLength; // record length is length of ID (8 bytes) plus length of serialized record outDos.writeInt(recordLength); outDos.writeLong(id++); baos.writeTo(outDos); + recordBytes += recordLength; + recordCount++; baos.reset(); eventCount++; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java index 651c41e..1ace37f 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java @@ -35,8 +35,11 @@ import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; import org.apache.nifi.provenance.journaling.JournaledStorageLocation; import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; import org.apache.nifi.provenance.journaling.index.EventIndexSearcher; +import org.apache.nifi.provenance.journaling.index.EventIndexWriter; +import org.apache.nifi.provenance.journaling.index.IndexManager; import org.apache.nifi.provenance.journaling.index.LuceneIndexSearcher; import org.apache.nifi.provenance.journaling.index.LuceneIndexWriter; +import org.apache.nifi.provenance.journaling.index.MultiIndexSearcher; import org.apache.nifi.provenance.journaling.index.QueryUtils; import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; import org.apache.nifi.provenance.journaling.journals.JournalReader; @@ -55,13 +58,12 @@ public class JournalingPartition implements Partition { private static final String JOURNAL_FILE_EXTENSION = ".journal"; private final String containerName; - private final String sectionName; + private final int sectionIndex; private final File section; private final File journalsDir; private final JournalingRepositoryConfig config; private final ExecutorService executor; - private final LuceneIndexWriter indexWriter; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -73,9 +75,12 @@ public class JournalingPartition implements Partition { private volatile long maxEventId = -1L; private volatile Long earliestEventTime = null; - public JournalingPartition(final String containerName, final String sectionName, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException { + private final IndexManager indexManager; + + public JournalingPartition(final IndexManager indexManager, final String containerName, final int sectionIndex, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException { + this.indexManager = indexManager; this.containerName = containerName; - this.sectionName = sectionName; + this.sectionIndex = sectionIndex; this.section = sectionDir; this.journalsDir = new File(section, "journals"); this.config = config; @@ -88,22 +93,11 @@ public class JournalingPartition implements Partition { if ( journalsDir.exists() && journalsDir.isFile() ) { throw new IOException("Could not create directory " + section + " because a file already exists with this name"); } - - if ( config.isReadOnly() ) { - indexWriter = null; - } else { - final File indexDir = new File(section, "index"); - indexWriter = new LuceneIndexWriter(indexDir, config); - } } public EventIndexSearcher newIndexSearcher() throws IOException { - if (config.isReadOnly()) { - return new LuceneIndexSearcher(new File(section, "index")); - } - - return indexWriter.newIndexSearcher(); + return indexManager.newIndexSearcher(containerName); } protected JournalWriter getJournalWriter(final long firstEventId) throws IOException { @@ -118,6 +112,11 @@ public class JournalingPartition implements Partition { return journalWriter; } + // MUST be called with writeLock or readLock held. + private EventIndexWriter getIndexWriter() { + return indexManager.getIndexWriter(containerName); + } + @Override public List<JournaledProvenanceEvent> registerEvents(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException { writeLock.lock(); @@ -139,12 +138,13 @@ public class JournalingPartition implements Partition { final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(events.size()); long id = firstEventId; for (final ProvenanceEventRecord event : events) { - final JournaledStorageLocation location = new JournaledStorageLocation(containerName, sectionName, + final JournaledStorageLocation location = new JournaledStorageLocation(containerName, String.valueOf(sectionIndex), String.valueOf(writer.getJournalId()), tocWriter.getCurrentBlockIndex(), id++); final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location); storedEvents.add(storedEvent); } + final EventIndexWriter indexWriter = getIndexWriter(); indexWriter.index(storedEvents); if ( config.isAlwaysSync() ) { @@ -196,13 +196,28 @@ public class JournalingPartition implements Partition { // MUST be called with write lock held. private void rollover(final long firstEventId) throws IOException { + // TODO: Rework how rollover works because we now have index manager!! + // if we have a writer already, close it and initiate rollover actions if ( journalWriter != null ) { journalWriter.finishBlock(); journalWriter.close(); tocWriter.close(); - indexWriter.sync(); - + + final EventIndexWriter curWriter = getIndexWriter(); + executor.submit(new Runnable() { + @Override + public void run() { + try { + curWriter.sync(); + } catch (final IOException e) { + + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + if ( config.isCompressOnRollover() ) { final File finishedFile = journalWriter.getJournalFile(); final File finishedTocFile = tocWriter.getFile(); @@ -213,7 +228,7 @@ public class JournalingPartition implements Partition { // create new writers and reset state. final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION); journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer()); - tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false); + tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync()); tocWriter.addBlockOffset(journalWriter.getSize()); numEventsAtEndOfLastBlock = 0; } @@ -237,112 +252,123 @@ public class JournalingPartition implements Partition { @Override public void restore() throws IOException { - // delete or rename files if stopped during rollover; compress any files that haven't been compressed - if ( !config.isReadOnly() ) { - final File[] children = journalsDir.listFiles(); - if ( children != null ) { - // find the latest journal. - File latestJournal = null; - long latestJournalId = -1L; - - final List<File> journalFiles = new ArrayList<>(); - - // find any journal files that either haven't been compressed or were partially compressed when - // we last shutdown and then restart compression. - for ( final File file : children ) { - final String filename = file.getName(); - if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) { - continue; - } + writeLock.lock(); + try { + // delete or rename files if stopped during rollover; compress any files that haven't been compressed + if ( !config.isReadOnly() ) { + final File[] children = journalsDir.listFiles(); + if ( children != null ) { + // find the latest journal. + File latestJournal = null; + long latestJournalId = -1L; + + final List<File> journalFiles = new ArrayList<>(); - final Long journalId = getJournalId(file); - if ( journalId != null && journalId > latestJournalId ) { - latestJournal = file; - latestJournalId = journalId; + // find any journal files that either haven't been compressed or were partially compressed when + // we last shutdown and then restart compression. + for ( final File file : children ) { + final String filename = file.getName(); + if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) { + continue; + } + + final Long journalId = getJournalId(file); + if ( journalId != null && journalId > latestJournalId ) { + latestJournal = file; + latestJournalId = journalId; + } + + journalFiles.add(file); + + if ( !config.isCompressOnRollover() ) { + continue; + } + + if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) { + final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, "")); + if ( uncompressedFile.exists() ) { + // both the compressed and uncompressed version of this journal exist. The Compression Task was + // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task. + final File tocFile = QueryUtils.getTocFile(uncompressedFile); + executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile)); + } else { + // The compressed file exists but the uncompressed file does not. This means that we have finished + // writing the compressed file and deleted the original journal file but then shutdown before + // renaming the compressed file to the original filename. We can simply rename the compressed file + // to the original file and then address the TOC file. + final boolean rename = CompressionTask.rename(file, uncompressedFile); + if ( !rename ) { + logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile); + continue; + } + + // Check if the compressed TOC file exists. If not, we are finished. + // If it does exist, then we know that it is complete, as described above, so we will go + // ahead and replace the uncompressed version. + final File tocFile = QueryUtils.getTocFile(uncompressedFile); + final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION); + if ( !compressedTocFile.exists() ) { + continue; + } + + tocFile.delete(); + + final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile); + if ( !renamedTocFile ) { + logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile); + } + } + } } - journalFiles.add(file); + // Get the first event in the earliest journal file so that we know what the earliest time available is + Collections.sort(journalFiles, new Comparator<File>() { + @Override + public int compare(final File o1, final File o2) { + return Long.compare(getJournalId(o1), getJournalId(o2)); + } + }); - if ( !config.isCompressOnRollover() ) { - continue; + for ( final File journal : journalFiles ) { + try (final JournalReader reader = new StandardJournalReader(journal)) { + final ProvenanceEventRecord record = reader.nextEvent(); + this.earliestEventTime = record.getEventTime(); + break; + } catch (final IOException ioe) { + } } - if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) { - final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, "")); - if ( uncompressedFile.exists() ) { - // both the compressed and uncompressed version of this journal exist. The Compression Task was - // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task. - final File tocFile = QueryUtils.getTocFile(uncompressedFile); - executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile)); - } else { - // The compressed file exists but the uncompressed file does not. This means that we have finished - // writing the compressed file and deleted the original journal file but then shutdown before - // renaming the compressed file to the original filename. We can simply rename the compressed file - // to the original file and then address the TOC file. - final boolean rename = CompressionTask.rename(file, uncompressedFile); - if ( !rename ) { - logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile); - continue; - } - - // Check if the compressed TOC file exists. If not, we are finished. - // If it does exist, then we know that it is complete, as described above, so we will go - // ahead and replace the uncompressed version. - final File tocFile = QueryUtils.getTocFile(uncompressedFile); - final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION); - if ( !compressedTocFile.exists() ) { - continue; - } - - tocFile.delete(); - - final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile); - if ( !renamedTocFile ) { - logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile); - } + // Whatever was the last journal for this partition, we need to remove anything for that journal + // from the index and re-add them, and then sync the index. This allows us to avoid syncing + // the index each time (we sync only on rollover) but allows us to still ensure that we index + // all events. + if ( latestJournal != null ) { + try { + reindex(latestJournal); + } catch (final EOFException eof) { } } } - - // Get the first event in the earliest journal file so that we know what the earliest time available is - Collections.sort(journalFiles, new Comparator<File>() { - @Override - public int compare(final File o1, final File o2) { - return Long.compare(getJournalId(o1), getJournalId(o2)); - } - }); - - for ( final File journal : journalFiles ) { - try (final JournalReader reader = new StandardJournalReader(journal)) { - final ProvenanceEventRecord record = reader.nextEvent(); - this.earliestEventTime = record.getEventTime(); - break; - } catch (final IOException ioe) { - } - } - - // Whatever was the last journal for this partition, we need to remove anything for that journal - // from the index and re-add them, and then sync the index. This allows us to avoid syncing - // the index each time (we sync only on rollover) but allows us to still ensure that we index - // all events. - if ( latestJournal != null ) { - try { - reindex(latestJournal); - } catch (final EOFException eof) { - } - } } + } finally { + writeLock.unlock(); } } private void reindex(final File journalFile) throws IOException { - try (final TocJournalReader reader = new TocJournalReader(containerName, sectionName, String.valueOf(getJournalId(journalFile)), journalFile)) { - indexWriter.delete(containerName, sectionName, String.valueOf(getJournalId(journalFile))); + // TODO: Rework how recovery works because we now have index manager!! + try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)), journalFile)) { + // We don't know which index contains the data for this journal, so remove the journal + // from both. + for (final LuceneIndexWriter indexWriter : indexWriters ) { + indexWriter.delete(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile))); + } long maxId = -1L; final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(1000); JournaledProvenanceEvent event; + final LuceneIndexWriter indexWriter = indexWriters[0]; while ((event = reader.nextJournaledEvent()) != null ) { storedEvents.add(event); maxId = event.getEventId(); @@ -365,7 +391,7 @@ public class JournalingPartition implements Partition { @Override public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxRecords) throws IOException { - try (final EventIndexSearcher searcher = indexWriter.newIndexSearcher()) { + try (final EventIndexSearcher searcher = newIndexSearcher()) { return searcher.getEvents(minEventId, maxRecords); } } @@ -401,16 +427,6 @@ public class JournalingPartition implements Partition { } } - if ( indexWriter != null ) { - try { - indexWriter.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close {} due to {}", indexWriter, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } } @Override @@ -425,6 +441,6 @@ public class JournalingPartition implements Partition { @Override public String toString() { - return "Partition[section=" + sectionName + "]"; + return "Partition[section=" + sectionIndex + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java index edbf75b..c0a56c4 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java @@ -55,6 +55,19 @@ public interface PartitionManager { <T> Set<T> withEachPartition(PartitionAction<T> action) throws IOException; /** + * Performs the given Action on each partition and returns the set of results. Unlike + * {@link #withEachPartition(PartitionAction))}, this method does not use the thread pool + * in order to perform the request in parallel. This is desirable for very quick functions, + * as the thread pool can be fully utilized, resulting in a quick function taking far longer + * than it should. + * + * @param action the action to perform + * @param writeAction specifies whether or not the action writes to the repository + * @return + */ + <T> Set<T> withEachPartitionSerially(PartitionAction<T> action) throws IOException; + + /** * Performs the given Action to each partition, optionally waiting for the action to complete * @param action * @param writeAction http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java index 51d90e2..10af697 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.index.IndexManager; import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class QueuingPartitionManager implements PartitionManager { private final AtomicInteger blacklistedCount = new AtomicInteger(0); - public QueuingPartitionManager(final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException { + public QueuingPartitionManager(final IndexManager indexManager, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException { this.config = config; this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount()); this.partitionArray = new JournalingPartition[config.getPartitionCount()]; @@ -64,7 +65,7 @@ public class QueuingPartitionManager implements PartitionManager { final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size()); final File section = new File(tuple.getValue(), String.valueOf(i)); - final JournalingPartition partition = new JournalingPartition(tuple.getKey(), String.valueOf(i), section, config, executor); + final JournalingPartition partition = new JournalingPartition(indexManager, tuple.getKey(), i, section, config, executor); partitionQueue.offer(partition); partitionArray[i] = partition; } @@ -183,6 +184,17 @@ public class QueuingPartitionManager implements PartitionManager { } @Override + public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action) throws IOException { + // TODO: Do not use blacklisted partitions. + final Set<T> results = new HashSet<>(partitionArray.length); + for ( final Partition partition : partitionArray ) { + results.add( action.perform(partition) ); + } + + return results; + } + + @Override public void withEachPartition(final VoidPartitionAction action, final boolean async) { // TODO: Do not use blacklisted partitions. final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java index c23a405..a6a487b 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java @@ -133,7 +133,7 @@ public class CompressionTask implements Runnable { try (final JournalReader journalReader = new StandardJournalReader(journalFile); final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer()); final TocReader tocReader = new StandardTocReader(tocFile); - final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true)) { + final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) { compress(journalReader, compressedWriter, tocReader, compressedTocWriter); compressedWriter.sync(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java index 6058282..fea6057 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java @@ -40,6 +40,7 @@ public class StandardTocWriter implements TocWriter { private final File file; private final FileOutputStream fos; + private final boolean alwaysSync; private int index = 0; /** @@ -48,7 +49,7 @@ public class StandardTocWriter implements TocWriter { * @param compressionFlag whether or not the journal is compressed * @throws FileNotFoundException */ - public StandardTocWriter(final File file, final boolean compressionFlag) throws IOException { + public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException { if ( file.exists() ) { throw new FileAlreadyExistsException(file.getAbsolutePath()); } @@ -59,11 +60,17 @@ public class StandardTocWriter implements TocWriter { this.file = file; fos = new FileOutputStream(file); - - fos.write(VERSION); - fos.write(compressionFlag ? 1 : 0); + this.alwaysSync = alwaysSync; + + final byte[] header = new byte[2]; + header[0] = VERSION; + header[1] = (byte) (compressionFlag ? 1 : 0); + fos.write(header); fos.flush(); - fos.getFD().sync(); + + if ( alwaysSync ) { + fos.getFD().sync(); + } } @Override @@ -73,7 +80,9 @@ public class StandardTocWriter implements TocWriter { dos.writeLong(offset); dos.flush(); - fos.getFD().sync(); + if ( alwaysSync ) { + fos.getFD().sync(); + } } @Override @@ -83,6 +92,10 @@ public class StandardTocWriter implements TocWriter { @Override public void close() throws IOException { + if (alwaysSync) { + fos.getFD().sync(); + } + fos.close(); }