Repository: incubator-nifi
Updated Branches:
  refs/heads/prov-query-language [created] 6ea2c72b9


NIFI-388: Finished TODOs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cb057227
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cb057227
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cb057227

Branch: refs/heads/prov-query-language
Commit: cb057227d82c63c6846bfa80c1f3d42763e7c105
Parents: b5638a8
Author: Mark Payne <marka...@hotmail.com>
Authored: Mon Mar 2 12:44:53 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon Mar 2 12:44:53 2015 -0500

----------------------------------------------------------------------
 .../JournalingProvenanceRepository.java         |  30 ++++-
 .../journaling/index/LuceneIndexManager.java    |  33 +++++-
 .../journals/StandardJournalMagicHeader.java    |  50 ++++++++
 .../journals/StandardJournalReader.java         |   3 +
 .../journals/StandardJournalWriter.java         |   3 +
 .../journaling/partition/PartitionManager.java  |  17 ---
 .../partition/QueuingPartitionManager.java      | 116 ++++---------------
 .../journals/TestStandardJournalReader.java     |   1 +
 .../journals/TestStandardJournalWriter.java     |   2 +
 .../partition/TestQueuingPartitionManager.java  |  75 ++++++++++++
 10 files changed, 213 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index 0aa20df..ffa9676 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.provenance.journaling;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -38,6 +39,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.processor.DataUnit;
@@ -77,10 +79,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-// TODO: Add header info to journals. Magic header. Prov repo implementation. 
Version. Encoding. Encoding Version.
-// TODO: EXPIRE : backpressure if unable to delete fast enough! I.e., if size 
is greater than 110% of size specified
-// TODO: Ensure number of partitions does not go below the current value. If 
it does, use the existing value
-// so that we don't lose events.
 public class JournalingProvenanceRepository implements 
ProvenanceEventRepository {
     public static final String WORKER_THREAD_POOL_SIZE = 
"nifi.provenance.repository.worker.threads";
     public static final String BLOCK_SIZE = 
"nifi.provenance.repository.writer.block.size";
@@ -190,6 +188,30 @@ public class JournalingProvenanceRepository implements 
ProvenanceEventRepository
     public synchronized void initialize(final EventReporter eventReporter) 
throws IOException {
         this.eventReporter = eventReporter;
         
+        // Ensure that the number of partitions specified by the config is at 
least as large as the 
+        // number of sections that we have. If not, update the config to be 
equal to the number of
+        // sections that we have.
+        final Pattern numberPattern = Pattern.compile("\\d+");
+        int numSections = 0;
+        for ( final File container : config.getContainers().values() ) {
+            final String[] sections = container.list(new FilenameFilter() {
+                @Override
+                public boolean accept(final File dir, final String name) {
+                    return numberPattern.matcher(name).matches();
+                }
+            });
+            
+            if ( sections != null ) {
+                numSections += sections.length;
+            }
+        }
+        
+        if ( config.getPartitionCount() < numSections ) {
+            logger.warn("Configured number of partitions for Provenance 
Repository is {}, but {} partitions already exist. Using {} partitions instead 
of {}.", 
+                    config.getPartitionCount(), numSections, numSections, 
config.getPartitionCount());
+            config.setPartitionCount(numSections);
+        }
+        
         // We use 3 different thread pools here because we don't want to 
threads from 1 pool to interfere with
         // each other. This is because the worker threads can be long running, 
and they shouldn't tie up the
         // compression threads. Likewise, there may be MANY compression tasks, 
which could delay the worker

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
index e212342..bf2a495 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -46,6 +48,7 @@ public class LuceneIndexManager implements IndexManager {
     
     private final Map<String, List<LuceneIndexWriter>> writers = new 
HashMap<>();
     private final Map<String, AtomicLong> writerIndexes = new HashMap<>();
+    private final ConcurrentMap<String, IndexSize> indexSizes = new 
ConcurrentHashMap<>();
     
     public LuceneIndexManager(final JournalingRepositoryConfig config, final 
ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) 
throws IOException {
         this.config = config;
@@ -356,10 +359,19 @@ public class LuceneIndexManager implements IndexManager {
     
     @Override
     public long getSize(final String containerName) {
+        // Cache index sizes so that we don't have to continually calculate 
it, as calculating it requires
+        // disk accesses, which are quite expensive.
+        final IndexSize indexSize = indexSizes.get(containerName);
+        if ( indexSize != null && !indexSize.isExpired() ) {
+            return indexSize.getSize();
+        }
+        
         final File containerFile = config.getContainers().get(containerName);
         final File indicesDir = new File(containerFile, "indices");
         
-        return getSize(indicesDir);
+        final long size = getSize(indicesDir);
+        indexSizes.put(containerName, new IndexSize(size));
+        return size;
     }
     
     private long getSize(final File file) {
@@ -378,4 +390,23 @@ public class LuceneIndexManager implements IndexManager {
             return file.length();
         }
     }
+    
+    
+    private static class IndexSize {
+        private final long size;
+        private final long expirationTime;
+        
+        public IndexSize(final long size) {
+            this.size = size;
+            this.expirationTime = System.currentTimeMillis() + 5000L;   // 
good for 5 seconds
+        }
+        
+        public long getSize() {
+            return size;
+        }
+        
+        public boolean isExpired() {
+            return System.currentTimeMillis() > expirationTime;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java
new file mode 100644
index 0000000..5803100
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalMagicHeader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class StandardJournalMagicHeader {
+    private static final byte[] MAGIC_HEADER = "NiFiProvJournal_1".getBytes();
+    
+    /**
+     * Writes the magic header to the output stream
+     * @param out
+     * @throws IOException
+     */
+    static void write(final OutputStream out) throws IOException {
+        out.write(MAGIC_HEADER);
+    }
+    
+    /**
+     * Verifies that the magic header is present on the InputStream
+     * @param in
+     * @throws IOException
+     */
+    static void read(final InputStream in) throws IOException {
+        final byte[] magicHeaderBuffer = new byte[MAGIC_HEADER.length];
+        StreamUtils.fillBuffer(in, magicHeaderBuffer);
+        if ( !Arrays.equals(MAGIC_HEADER, magicHeaderBuffer) ) {
+            throw new IOException("File is not a Journaling Provenance 
Repository journal file");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index df4a2d0..13878f8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -64,6 +64,8 @@ public class StandardJournalReader implements JournalReader {
         compressedStream = new ByteCountingInputStream(bufferedIn);
         try {
             final DataInputStream dis = new DataInputStream(compressedStream);
+            
+            StandardJournalMagicHeader.read(dis);
             final String codecName = dis.readUTF();
             serializationVersion = dis.readInt();
             compressed = dis.readBoolean();
@@ -79,6 +81,7 @@ public class StandardJournalReader implements JournalReader {
         }
     }
     
+    
     private void resetDecompressedStream() throws IOException {
         if ( compressed ) {
             decompressedStream = new ByteCountingInputStream(new 
BufferedInputStream(new CompressionInputStream(compressedStream)), 
compressedStream.getBytesConsumed());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index 8d322b9..a9cb361 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -117,6 +117,8 @@ public class StandardJournalWriter implements JournalWriter 
{
             try (final InputStream fis = new FileInputStream(journalFile);
                  final InputStream bufferedIn = new BufferedInputStream(fis);
                  final DataInputStream dis = new DataInputStream(bufferedIn) ) 
{
+                
+                StandardJournalMagicHeader.read(dis);
                 dis.readUTF();
                 dis.readInt();
                 dis.readBoolean();
@@ -150,6 +152,7 @@ public class StandardJournalWriter implements JournalWriter 
{
 
     private void writeHeader(final OutputStream out) throws IOException {
         final DataOutputStream dos = new DataOutputStream(out);
+        StandardJournalMagicHeader.write(out);
         dos.writeUTF(serializer.getCodecName());
         dos.writeInt(serializer.getVersion());
         dos.writeBoolean(compressed);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
index 14d5b17..e751543 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
@@ -45,14 +45,6 @@ public interface PartitionManager {
      */
     void withPartition(VoidPartitionAction action, boolean writeAction) throws 
IOException;
     
-    /**
-     * Performs the given Action on each partition and returns the set of 
results.
-     * 
-     * @param action the action to perform
-     * @param writeAction specifies whether or not the action writes to the 
repository
-     * @return
-     */
-//    <T> Set<T> withEachPartition(PartitionAction<T> action, boolean 
writeAction) throws IOException;
     
     /**
      * Performs the given Action on each partition and returns the set of 
results. This method does 
@@ -80,15 +72,6 @@ public interface PartitionManager {
      */
     void withEachPartitionSerially(VoidPartitionAction action, boolean 
writeAction) throws IOException;
     
-    /**
-     * Performs the given Action to each partition, optionally waiting for the 
action to complete
-     * @param action
-     * @param writeAction
-     * @param async if <code>true</code>, will perform the action 
asynchronously; if <code>false</code>, will
-     *  wait for the action to complete before returning
-     */
-//    void withEachPartition(VoidPartitionAction action, boolean async);
-    
     void shutdown();
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
index 9b5c442..0b68f00 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -99,7 +99,7 @@ public class QueuingPartitionManager implements 
PartitionManager {
         }
     }
     
-    private Partition nextPartition(final boolean writeAction) {
+    Partition nextPartition(final boolean writeAction, final boolean 
waitIfNeeded) {
         Partition partition = null;
         
         final List<Partition> partitionsSkipped = new ArrayList<>();
@@ -124,20 +124,24 @@ public class QueuingPartitionManager implements 
PartitionManager {
                     partitionQueue.addAll(partitionsSkipped);
                     partitionsSkipped.clear();
                 } else if (writeAction) {
-                    // determine if the container is full.
-                    final String containerName = partition.getContainerName();
-                    long desiredMaxContainerCapacity = 
config.getMaxCapacity(containerName);
-                    
-                    // If no max capacity set for the container itself, use 
1/N of repo max
-                    // where N is the number of containers
-                    if ( desiredMaxContainerCapacity == 
config.getMaxStorageCapacity() ) {
-                        desiredMaxContainerCapacity = 
config.getMaxStorageCapacity() / config.getContainers().size();
-                    }
-                    
-                    // if the partition is more than 10% over its desired 
capacity, we don't want to write to it.
-                    if ( partition.getContainerSize() > 1.1 * 
desiredMaxContainerCapacity ) {
-                        partitionsSkipped.add(partition);
-                        continue;
+                    if ( waitIfNeeded ) {
+                        // determine if the container is full.
+                        final String containerName = 
partition.getContainerName();
+                        long desiredMaxContainerCapacity = 
config.getMaxCapacity(containerName);
+                        
+                        // If no max capacity set for the container itself, 
use 1/N of repo max
+                        // where N is the number of containers
+                        if ( desiredMaxContainerCapacity == 
config.getMaxStorageCapacity() ) {
+                            desiredMaxContainerCapacity = 
config.getMaxStorageCapacity() / config.getContainers().size();
+                        }
+                        
+                        // if the partition is more than 10% over its desired 
capacity, we don't want to write to it.
+                        if ( partition.getContainerSize() > 1.1 * 
desiredMaxContainerCapacity ) {
+                            partitionsSkipped.add(partition);
+                            continue;
+                        }
+                    } else {
+                        return null;
                     }
                 }
             }
@@ -155,7 +159,7 @@ public class QueuingPartitionManager implements 
PartitionManager {
     
     @Override
     public <T> T withPartition(final PartitionAction<T> action, final boolean 
writeAction) throws IOException {
-        final Partition partition = nextPartition(writeAction);
+        final Partition partition = nextPartition(writeAction, true);
 
         boolean ioe = false;
         try {
@@ -174,7 +178,7 @@ public class QueuingPartitionManager implements 
PartitionManager {
     
     @Override
     public void withPartition(final VoidPartitionAction action, final boolean 
writeAction) throws IOException {
-        final Partition partition = nextPartition(writeAction);
+        final Partition partition = nextPartition(writeAction, true);
 
         boolean ioe = false;
         try {
@@ -192,46 +196,6 @@ public class QueuingPartitionManager implements 
PartitionManager {
     }
 
     
-//    @Override
-//    public <T> Set<T> withEachPartition(final PartitionAction<T> action) 
throws IOException {
-//        if ( writeAction && blackListedPartitions.size() > 0 ) {
-//            throw new IOException("Cannot perform action {} because at least 
one partition has been blacklisted (i.e., writint to the partition failed)");
-//        }
-//
-//        final Set<T> results = new HashSet<>(partitionArray.length);
-//        
-//        final Map<Partition, Future<T>> futures = new 
HashMap<>(partitionArray.length);
-//        for ( final Partition partition : partitionArray ) {
-//            final Callable<T> callable = new Callable<T>() {
-//                @Override
-//                public T call() throws Exception {
-//                    return action.perform(partition);
-//                }
-//            };
-//            
-//            final Future<T> future = executor.submit(callable);
-//            futures.put(partition, future);
-//        }
-//        
-//        for ( final Map.Entry<Partition, Future<T>> entry : 
futures.entrySet() ) {
-//            try {
-//                final T result = entry.getValue().get();
-//                results.add(result);
-//            } catch (final ExecutionException ee) {
-//                final Throwable cause = ee.getCause();
-//                if ( cause instanceof IOException ) {
-//                    throw (IOException) cause;
-//                } else {
-//                    throw new RuntimeException("Failed to query Partition " 
+ entry.getKey() + " due to " + cause, cause);
-//                }
-//            } catch (InterruptedException e) {
-//                throw new RuntimeException(e);
-//            }
-//        }
-//        
-//        return results;
-//    }
-    
     @Override
     public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> 
action, final boolean writeAction) throws IOException {
         if ( writeAction && blackListedPartitions.size() > 0 ) {
@@ -257,44 +221,6 @@ public class QueuingPartitionManager implements 
PartitionManager {
         }
     }
     
-//    @Override
-//    public void withEachPartition(final VoidPartitionAction action, final 
boolean async) {
-//        // TODO: skip blacklisted partitions
-//        final Map<Partition, Future<?>> futures = new 
HashMap<>(partitionArray.length);
-//        for ( final Partition partition : partitionArray ) {
-//            final Runnable runnable = new Runnable() {
-//                @Override
-//                public void run() {
-//                    try {
-//                        action.perform(partition);
-//                    } catch (final Throwable t) {
-//                        logger.error("Failed to perform action against " + 
partition + " due to " + t);
-//                        if ( logger.isDebugEnabled() ) {
-//                            logger.error("", t);
-//                        }
-//                    }
-//                }
-//            };
-//            
-//            final Future<?> future = executor.submit(runnable);
-//            futures.put(partition, future);
-//        }
-//        
-//        if ( !async ) {
-//            for ( final Map.Entry<Partition, Future<?>> entry : 
futures.entrySet() ) {
-//                try {
-//                    // throw any exception thrown by runnable
-//                    entry.getValue().get();
-//                } catch (final ExecutionException ee) {
-//                    final Throwable cause = ee.getCause();
-//                    throw new RuntimeException("Failed to query Partition " 
+ entry.getKey() + " due to " + cause, cause);
-//                } catch (InterruptedException e) {
-//                    throw new RuntimeException(e);
-//                }
-//            }
-//        }
-//    }
-    
     private long getTotalSize() {
         long totalSize = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
index e1ecf7d..9f0ba99 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
@@ -47,6 +47,7 @@ public class TestStandardJournalReader {
         dos = new DataOutputStream(baos);
         
         // Write out header: codec name and serialization version
+        StandardJournalMagicHeader.write(dos);
         dos.writeUTF(StandardEventSerializer.CODEC_NAME);
         dos.writeInt(0);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
index 43efa7e..e8a6787 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
@@ -90,6 +90,7 @@ public class TestStandardJournalWriter {
             final ByteArrayInputStream bais = new ByteArrayInputStream(data);
             final DataInputStream dis = new DataInputStream(bais);
 
+            StandardJournalMagicHeader.read(dis);
             final String codecName = dis.readUTF();
             assertEquals(StandardEventSerializer.CODEC_NAME, codecName);
             
@@ -134,6 +135,7 @@ public class TestStandardJournalWriter {
             final ByteArrayInputStream bais = new ByteArrayInputStream(data);
             final DataInputStream dis = new DataInputStream(bais);
 
+            StandardJournalMagicHeader.read(dis);
             final String codecName = dis.readUTF();
             assertEquals(StandardEventSerializer.CODEC_NAME, codecName);
             

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb057227/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java
new file mode 100644
index 0000000..683e40a
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/partition/TestQueuingPartitionManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.partition;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestQueuingPartitionManager {
+
+    @Test(timeout=5000)
+    public void testWriteWaitsForDeletion() throws IOException {
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + 
UUID.randomUUID().toString()));
+        
+        final JournalingRepositoryConfig config = new 
JournalingRepositoryConfig();
+        config.setCompressOnRollover(false);
+        config.setMaxStorageCapacity(50L);
+        config.setContainers(containers);
+        
+        final ScheduledExecutorService exec = 
Executors.newScheduledThreadPool(1);
+
+        final AtomicLong indexSize = new AtomicLong(0L);
+        final IndexManager indexManager = Mockito.mock(IndexManager.class);
+        Mockito.doAnswer(new Answer<Long>() {
+            @Override
+            public Long answer(InvocationOnMock invocation) throws Throwable {
+                return indexSize.get();
+            }
+        }).when(indexManager).getSize(Mockito.any(String.class));
+        
+        final QueuingPartitionManager mgr = new 
QueuingPartitionManager(indexManager, new AtomicLong(0L), config, exec, exec);
+        
+        Partition partition = mgr.nextPartition(true, true);
+        assertNotNull(partition);
+        
+        indexSize.set(1024L);
+        partition = mgr.nextPartition(true, false);
+        assertNull(partition);
+        
+        indexSize.set(0L);
+        partition = mgr.nextPartition(true, true);
+        assertNotNull(partition);
+    }
+    
+}

Reply via email to