Repository: incubator-nifi Updated Branches: refs/heads/prov-query-language [created] 6ea2c72b9
NIFI-388: Finished TODOs Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cb057227 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cb057227 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cb057227 Branch: refs/heads/prov-query-language Commit: cb057227d82c63c6846bfa80c1f3d42763e7c105 Parents: b5638a8 Author: Mark Payne <marka...@hotmail.com> Authored: Mon Mar 2 12:44:53 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Mon Mar 2 12:44:53 2015 -0500 ---------------------------------------------------------------------- .../JournalingProvenanceRepository.java | 30 ++++- .../journaling/index/LuceneIndexManager.java | 33 +++++- .../journals/StandardJournalMagicHeader.java | 50 ++++++++ .../journals/StandardJournalReader.java | 3 + .../journals/StandardJournalWriter.java | 3 + .../journaling/partition/PartitionManager.java | 17 --- .../partition/QueuingPartitionManager.java | 116 ++++--------------- .../journals/TestStandardJournalReader.java | 1 + .../journals/TestStandardJournalWriter.java | 2 + .../partition/TestQueuingPartitionManager.java | 75 ++++++++++++ 10 files changed, 213 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java index 0aa20df..ffa9676 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java @@ -17,6 +17,7 @@ package org.apache.nifi.provenance.journaling; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; @@ -38,6 +39,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import org.apache.nifi.events.EventReporter; import org.apache.nifi.processor.DataUnit; @@ -77,10 +79,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO: Add header info to journals. Magic header. Prov repo implementation. Version. Encoding. Encoding Version. -// TODO: EXPIRE : backpressure if unable to delete fast enough! I.e., if size is greater than 110% of size specified -// TODO: Ensure number of partitions does not go below the current value. If it does, use the existing value -// so that we don't lose events. public class JournalingProvenanceRepository implements ProvenanceEventRepository { public static final String WORKER_THREAD_POOL_SIZE = "nifi.provenance.repository.worker.threads"; public static final String BLOCK_SIZE = "nifi.provenance.repository.writer.block.size"; @@ -190,6 +188,30 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository public synchronized void initialize(final EventReporter eventReporter) throws IOException { this.eventReporter = eventReporter; + // Ensure that the number of partitions specified by the config is at least as large as the + // number of sections that we have. If not, update the config to be equal to the number of + // sections that we have. + final Pattern numberPattern = Pattern.compile("\\d+"); + int numSections = 0; + for ( final File container : config.getContainers().values() ) { + final String[] sections = container.list(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String name) { + return numberPattern.matcher(name).matches(); + } + }); + + if ( sections != null ) { + numSections += sections.length; + } + } + + if ( config.getPartitionCount() < numSections ) { + logger.warn("Configured number of partitions for Provenance Repository is {}, but {} partitions already exist. Using {} partitions instead of {}.", + config.getPartitionCount(), numSections, numSections, config.getPartitionCount()); + config.setPartitionCount(numSections); + } + // We use 3 different thread pools here because we don't want to threads from 1 pool to interfere with // each other. This is because the worker threads can be long running, and they shouldn't tie up the // compression threads. Likewise, there may be MANY compression tasks, which could delay the worker http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java index e212342..bf2a495 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -46,6 +48,7 @@ public class LuceneIndexManager implements IndexManager { private final Map<String, List<LuceneIndexWriter>> writers = new HashMap<>(); private final Map<String, AtomicLong> writerIndexes = new HashMap<>(); + private final ConcurrentMap<String, IndexSize> indexSizes = new ConcurrentHashMap<>(); public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) throws IOException { this.config = config; @@ -356,10 +359,19 @@ public class LuceneIndexManager implements IndexManager { @Override public long getSize(final String containerName) { + // Cache index sizes so that we don't have to continually calculate it, as calculating it requires + // disk accesses, which are quite expensive. + final IndexSize indexSize = indexSizes.get(containerName); + if ( indexSize != null && !indexSize.isExpired() ) { + return indexSize.getSize(); + } + final File containerFile = config.getContainers().get(containerName); final File indicesDir = new File(containerFile, "indices"); - return getSize(indicesDir); + final long size = getSize(indicesDir); + indexSizes.put(containerName, new IndexSize(size)); + return size; } private long getSize(final File file) { @@ -378,4 +390,23 @@ public class LuceneIndexManager implements IndexManager { return file.length(); } } + + + private static class IndexSize { + private final long size; + private final long expirationTime; + + public IndexSize(final long size) { + this.size = size; + this.expirationTime = System.currentTimeMillis() + 5000L; // good for 5 seconds + } + + public long getSize() { + return size; + } + + public boolean isExpired() { + return System.currentTimeMillis() > expirationTime; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java new file mode 100644 index 0000000..5803100 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; + +import org.apache.nifi.stream.io.StreamUtils; + +public class StandardJournalMagicHeader { + private static final byte[] MAGIC_HEADER = "NiFiProvJournal_1".getBytes(); + + /** + * Writes the magic header to the output stream + * @param out + * @throws IOException + */ + static void write(final OutputStream out) throws IOException { + out.write(MAGIC_HEADER); + } + + /** + * Verifies that the magic header is present on the InputStream + * @param in + * @throws IOException + */ + static void read(final InputStream in) throws IOException { + final byte[] magicHeaderBuffer = new byte[MAGIC_HEADER.length]; + StreamUtils.fillBuffer(in, magicHeaderBuffer); + if ( !Arrays.equals(MAGIC_HEADER, magicHeaderBuffer) ) { + throw new IOException("File is not a Journaling Provenance Repository journal file"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java index df4a2d0..13878f8 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java @@ -64,6 +64,8 @@ public class StandardJournalReader implements JournalReader { compressedStream = new ByteCountingInputStream(bufferedIn); try { final DataInputStream dis = new DataInputStream(compressedStream); + + StandardJournalMagicHeader.read(dis); final String codecName = dis.readUTF(); serializationVersion = dis.readInt(); compressed = dis.readBoolean(); @@ -79,6 +81,7 @@ public class StandardJournalReader implements JournalReader { } } + private void resetDecompressedStream() throws IOException { if ( compressed ) { decompressedStream = new ByteCountingInputStream(new BufferedInputStream(new CompressionInputStream(compressedStream)), compressedStream.getBytesConsumed()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java index 8d322b9..a9cb361 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java @@ -117,6 +117,8 @@ public class StandardJournalWriter implements JournalWriter { try (final InputStream fis = new FileInputStream(journalFile); final InputStream bufferedIn = new BufferedInputStream(fis); final DataInputStream dis = new DataInputStream(bufferedIn) ) { + + StandardJournalMagicHeader.read(dis); dis.readUTF(); dis.readInt(); dis.readBoolean(); @@ -150,6 +152,7 @@ public class StandardJournalWriter implements JournalWriter { private void writeHeader(final OutputStream out) throws IOException { final DataOutputStream dos = new DataOutputStream(out); + StandardJournalMagicHeader.write(out); dos.writeUTF(serializer.getCodecName()); dos.writeInt(serializer.getVersion()); dos.writeBoolean(compressed); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java index 14d5b17..e751543 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java @@ -45,14 +45,6 @@ public interface PartitionManager { */ void withPartition(VoidPartitionAction action, boolean writeAction) throws IOException; - /** - * Performs the given Action on each partition and returns the set of results. - * - * @param action the action to perform - * @param writeAction specifies whether or not the action writes to the repository - * @return - */ -// <T> Set<T> withEachPartition(PartitionAction<T> action, boolean writeAction) throws IOException; /** * Performs the given Action on each partition and returns the set of results. This method does @@ -80,15 +72,6 @@ public interface PartitionManager { */ void withEachPartitionSerially(VoidPartitionAction action, boolean writeAction) throws IOException; - /** - * Performs the given Action to each partition, optionally waiting for the action to complete - * @param action - * @param writeAction - * @param async if <code>true</code>, will perform the action asynchronously; if <code>false</code>, will - * wait for the action to complete before returning - */ -// void withEachPartition(VoidPartitionAction action, boolean async); - void shutdown(); /** http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java index 9b5c442..0b68f00 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java @@ -99,7 +99,7 @@ public class QueuingPartitionManager implements PartitionManager { } } - private Partition nextPartition(final boolean writeAction) { + Partition nextPartition(final boolean writeAction, final boolean waitIfNeeded) { Partition partition = null; final List<Partition> partitionsSkipped = new ArrayList<>(); @@ -124,20 +124,24 @@ public class QueuingPartitionManager implements PartitionManager { partitionQueue.addAll(partitionsSkipped); partitionsSkipped.clear(); } else if (writeAction) { - // determine if the container is full. - final String containerName = partition.getContainerName(); - long desiredMaxContainerCapacity = config.getMaxCapacity(containerName); - - // If no max capacity set for the container itself, use 1/N of repo max - // where N is the number of containers - if ( desiredMaxContainerCapacity == config.getMaxStorageCapacity() ) { - desiredMaxContainerCapacity = config.getMaxStorageCapacity() / config.getContainers().size(); - } - - // if the partition is more than 10% over its desired capacity, we don't want to write to it. - if ( partition.getContainerSize() > 1.1 * desiredMaxContainerCapacity ) { - partitionsSkipped.add(partition); - continue; + if ( waitIfNeeded ) { + // determine if the container is full. + final String containerName = partition.getContainerName(); + long desiredMaxContainerCapacity = config.getMaxCapacity(containerName); + + // If no max capacity set for the container itself, use 1/N of repo max + // where N is the number of containers + if ( desiredMaxContainerCapacity == config.getMaxStorageCapacity() ) { + desiredMaxContainerCapacity = config.getMaxStorageCapacity() / config.getContainers().size(); + } + + // if the partition is more than 10% over its desired capacity, we don't want to write to it. + if ( partition.getContainerSize() > 1.1 * desiredMaxContainerCapacity ) { + partitionsSkipped.add(partition); + continue; + } + } else { + return null; } } } @@ -155,7 +159,7 @@ public class QueuingPartitionManager implements PartitionManager { @Override public <T> T withPartition(final PartitionAction<T> action, final boolean writeAction) throws IOException { - final Partition partition = nextPartition(writeAction); + final Partition partition = nextPartition(writeAction, true); boolean ioe = false; try { @@ -174,7 +178,7 @@ public class QueuingPartitionManager implements PartitionManager { @Override public void withPartition(final VoidPartitionAction action, final boolean writeAction) throws IOException { - final Partition partition = nextPartition(writeAction); + final Partition partition = nextPartition(writeAction, true); boolean ioe = false; try { @@ -192,46 +196,6 @@ public class QueuingPartitionManager implements PartitionManager { } -// @Override -// public <T> Set<T> withEachPartition(final PartitionAction<T> action) throws IOException { -// if ( writeAction && blackListedPartitions.size() > 0 ) { -// throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)"); -// } -// -// final Set<T> results = new HashSet<>(partitionArray.length); -// -// final Map<Partition, Future<T>> futures = new HashMap<>(partitionArray.length); -// for ( final Partition partition : partitionArray ) { -// final Callable<T> callable = new Callable<T>() { -// @Override -// public T call() throws Exception { -// return action.perform(partition); -// } -// }; -// -// final Future<T> future = executor.submit(callable); -// futures.put(partition, future); -// } -// -// for ( final Map.Entry<Partition, Future<T>> entry : futures.entrySet() ) { -// try { -// final T result = entry.getValue().get(); -// results.add(result); -// } catch (final ExecutionException ee) { -// final Throwable cause = ee.getCause(); -// if ( cause instanceof IOException ) { -// throw (IOException) cause; -// } else { -// throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause); -// } -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// } -// -// return results; -// } - @Override public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action, final boolean writeAction) throws IOException { if ( writeAction && blackListedPartitions.size() > 0 ) { @@ -257,44 +221,6 @@ public class QueuingPartitionManager implements PartitionManager { } } -// @Override -// public void withEachPartition(final VoidPartitionAction action, final boolean async) { -// // TODO: skip blacklisted partitions -// final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length); -// for ( final Partition partition : partitionArray ) { -// final Runnable runnable = new Runnable() { -// @Override -// public void run() { -// try { -// action.perform(partition); -// } catch (final Throwable t) { -// logger.error("Failed to perform action against " + partition + " due to " + t); -// if ( logger.isDebugEnabled() ) { -// logger.error("", t); -// } -// } -// } -// }; -// -// final Future<?> future = executor.submit(runnable); -// futures.put(partition, future); -// } -// -// if ( !async ) { -// for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) { -// try { -// // throw any exception thrown by runnable -// entry.getValue().get(); -// } catch (final ExecutionException ee) { -// final Throwable cause = ee.getCause(); -// throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// } -// } -// } - private long getTotalSize() { long totalSize = 0L; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java index e1ecf7d..9f0ba99 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java @@ -47,6 +47,7 @@ public class TestStandardJournalReader { dos = new DataOutputStream(baos); // Write out header: codec name and serialization version + StandardJournalMagicHeader.write(dos); dos.writeUTF(StandardEventSerializer.CODEC_NAME); dos.writeInt(0); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java index 43efa7e..e8a6787 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java @@ -90,6 +90,7 @@ public class TestStandardJournalWriter { final ByteArrayInputStream bais = new ByteArrayInputStream(data); final DataInputStream dis = new DataInputStream(bais); + StandardJournalMagicHeader.read(dis); final String codecName = dis.readUTF(); assertEquals(StandardEventSerializer.CODEC_NAME, codecName); @@ -134,6 +135,7 @@ public class TestStandardJournalWriter { final ByteArrayInputStream bais = new ByteArrayInputStream(data); final DataInputStream dis = new DataInputStream(bais); + StandardJournalMagicHeader.read(dis); final String codecName = dis.readUTF(); assertEquals(StandardEventSerializer.CODEC_NAME, codecName); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java new file mode 100644 index 0000000..683e40a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.partition; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.index.IndexManager; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestQueuingPartitionManager { + + @Test(timeout=5000) + public void testWriteWaitsForDeletion() throws IOException { + final Map<String, File> containers = new HashMap<>(); + containers.put("container1", new File("target/" + UUID.randomUUID().toString())); + + final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); + config.setCompressOnRollover(false); + config.setMaxStorageCapacity(50L); + config.setContainers(containers); + + final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + + final AtomicLong indexSize = new AtomicLong(0L); + final IndexManager indexManager = Mockito.mock(IndexManager.class); + Mockito.doAnswer(new Answer<Long>() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + return indexSize.get(); + } + }).when(indexManager).getSize(Mockito.any(String.class)); + + final QueuingPartitionManager mgr = new QueuingPartitionManager(indexManager, new AtomicLong(0L), config, exec, exec); + + Partition partition = mgr.nextPartition(true, true); + assertNotNull(partition); + + indexSize.set(1024L); + partition = mgr.nextPartition(true, false); + assertNull(partition); + + indexSize.set(0L); + partition = mgr.nextPartition(true, true); + assertNotNull(partition); + } + +}