http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java 
b/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
new file mode 100644
index 0000000..95cf4da
--- /dev/null
+++ b/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -0,0 +1,1008 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This implementation provides as little Locking as possible in order to
+ * provide the highest throughput possible. However, this implementation is 
ONLY
+ * appropriate if it can be guaranteed that only a single thread will ever 
issue
+ * updates for a given Record at any one time.
+ * </p>
+ *
+ * @param <T>
+ */
+public final class MinimalLockingWriteAheadLog<T> implements 
WriteAheadRepository<T> {
+
+    private final Path basePath;
+    private final Path partialPath;
+    private final Path snapshotPath;
+
+    private final SerDe<T> serde;
+    private final SyncListener syncListener;
+    private final FileChannel lockChannel;
+    private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
+
+    private final Partition<T>[] partitions;
+    private final AtomicLong partitionIndex = new AtomicLong(0L);
+    private final ConcurrentMap<Object, T> recordMap = new 
ConcurrentHashMap<>();
+    private final Map<Object, T> unmodifiableRecordMap = 
Collections.unmodifiableMap(recordMap);
+    private final Set<String> externalLocations = new CopyOnWriteArraySet<>();
+
+    private final Set<String> recoveredExternalLocations = new 
CopyOnWriteArraySet<>();
+
+    private final AtomicInteger numberBlackListedPartitions = new 
AtomicInteger(0);
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock(); // required to update a 
partition
+    private final Lock writeLock = rwLock.writeLock(); // required for 
checkpoint
+
+    private volatile boolean updated = false;
+    private volatile boolean recovered = false;
+
+    public MinimalLockingWriteAheadLog(final Path path, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
+        this(new TreeSet<>(Collections.singleton(path)), partitionCount, 
serde, syncListener);
+    }
+
+    /**
+     *
+     * @param paths a sorted set of Paths to use for the partitions/journals 
and
+     * the snapshot. The snapshot will always be written to the first path
+     * specified.
+     *
+     * @param partitionCount the number of partitions/journals to use. For best
+     * performance, this should be close to the number of threads that are
+     * expected to update the repository simultaneously
+     *
+     * @param serde
+     * @param syncListener
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
+        this.syncListener = syncListener;
+
+        requireNonNull(paths);
+        requireNonNull(serde);
+
+        if (paths.isEmpty()) {
+            throw new IllegalArgumentException("Paths must be non-empty");
+        }
+
+        int existingPartitions = 0;
+        for (final Path path : paths) {
+            if (!Files.exists(path)) {
+                Files.createDirectories(path);
+            }
+
+            final File file = path.toFile();
+            if (!file.isDirectory()) {
+                throw new IOException("Path given [" + path + "] is not a 
directory");
+            }
+            if (!file.canWrite()) {
+                throw new IOException("Path given [" + path + "] is not 
writable");
+            }
+            if (!file.canRead()) {
+                throw new IOException("Path given [" + path + "] is not 
readable");
+            }
+            if (!file.canExecute()) {
+                throw new IOException("Path given [" + path + "] is not 
executable");
+            }
+
+            final File[] children = file.listFiles();
+            if (children != null) {
+                for (final File child : children) {
+                    if (child.isDirectory() && 
child.getName().startsWith("partition-")) {
+                        existingPartitions++;
+                    }
+                }
+
+                if (existingPartitions != 0 && existingPartitions != 
partitionCount) {
+                    logger.warn("Constructing MinimalLockingWriteAheadLog with 
partitionCount={}, but the repository currently has "
+                            + "{} partitions; ignoring argument and proceeding 
with {} partitions",
+                            new Object[]{partitionCount, existingPartitions, 
existingPartitions});
+                }
+            }
+        }
+
+        this.basePath = paths.iterator().next();
+        this.partialPath = basePath.resolve("snapshot.partial");
+        this.snapshotPath = basePath.resolve("snapshot");
+        this.serde = serde;
+
+        final Path lockPath = basePath.resolve("wali.lock");
+        lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
+        lockChannel.lock();
+
+        partitions = new Partition[partitionCount];
+
+        Iterator<Path> pathIterator = paths.iterator();
+        for (int i = 0; i < partitionCount; i++) {
+            // If we're out of paths, create a new iterator to start over.
+            if (!pathIterator.hasNext()) {
+                pathIterator = paths.iterator();
+            }
+
+            final Path partitionBasePath = pathIterator.next();
+
+            partitions[i] = new 
Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, 
getVersion());
+        }
+    }
+
+    @Override
+    public int update(final Collection<T> records, final boolean forceSync) 
throws IOException {
+        if (!recovered) {
+            throw new IllegalStateException("Cannot update repository until 
record recovery has been performed");
+        }
+
+        if (records.isEmpty()) {
+            return -1;
+        }
+
+        updated = true;
+        readLock.lock();
+        try {
+            while (true) {
+                final int numBlackListed = numberBlackListedPartitions.get();
+                if (numBlackListed >= partitions.length) {
+                    throw new IOException("All Partitions have been 
blacklisted due to failures when attempting to update. If the Write-Ahead Log 
is able to perform a checkpoint, this issue may resolve itself. Otherwise, 
manual intervention will be required.");
+                }
+
+                final long partitionIdx = partitionIndex.getAndIncrement();
+                final int resolvedIdx = (int) (partitionIdx % 
partitions.length);
+                final Partition<T> partition = partitions[resolvedIdx];
+                if (partition.tryClaim()) {
+                    try {
+                        final long transactionId = 
transactionIdGenerator.getAndIncrement();
+                        if (logger.isTraceEnabled()) {
+                            for (final T record : records) {
+                                logger.trace("Partition {} performing 
Transaction {}: {}", new Object[]{partition, transactionId, record});
+                            }
+                        }
+
+                        try {
+                            partition.update(records, transactionId, 
unmodifiableRecordMap, forceSync);
+                        } catch (final Exception e) {
+                            partition.blackList();
+                            numberBlackListedPartitions.incrementAndGet();
+                            throw e;
+                        }
+
+                        if (forceSync && syncListener != null) {
+                            syncListener.onSync(resolvedIdx);
+                        }
+                    } finally {
+                        partition.releaseClaim();
+                    }
+
+                    for (final T record : records) {
+                        final UpdateType updateType = 
serde.getUpdateType(record);
+                        final Object recordIdentifier = 
serde.getRecordIdentifier(record);
+
+                        if (updateType == UpdateType.DELETE) {
+                            recordMap.remove(recordIdentifier);
+                        } else if (updateType == UpdateType.SWAP_OUT) {
+                            final String newLocation = 
serde.getLocation(record);
+                            if (newLocation == null) {
+                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_OUT but no indicator of where the 
Record is to be Swapped Out to; these records may be lost when the repository 
is restored!");
+                            } else {
+                                recordMap.remove(recordIdentifier);
+                                this.externalLocations.add(newLocation);
+                            }
+                        } else if (updateType == UpdateType.SWAP_IN) {
+                            final String newLocation = 
serde.getLocation(record);
+                            if (newLocation == null) {
+                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_IN but no indicator of where the 
Record is to be Swapped In from; these records may be duplicated when the 
repository is restored!");
+                            } else {
+                                externalLocations.remove(newLocation);
+                            }
+                            recordMap.put(recordIdentifier, record);
+                        } else {
+                            recordMap.put(recordIdentifier, record);
+                        }
+                    }
+
+                    return resolvedIdx;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public Collection<T> recoverRecords() throws IOException {
+        if (updated) {
+            throw new IllegalStateException("Cannot recover records after 
updating the repository; must call recoverRecords first");
+        }
+
+        final long recoverStart = System.nanoTime();
+        writeLock.lock();
+        try {
+            Long maxTransactionId = recoverFromSnapshot(recordMap);
+            recoverFromEdits(recordMap, maxTransactionId);
+
+            for (final Partition<T> partition : partitions) {
+                final long transId = partition.getMaxRecoveredTransactionId();
+                if (maxTransactionId == null || transId > maxTransactionId) {
+                    maxTransactionId = transId;
+                }
+            }
+
+            this.transactionIdGenerator.set(maxTransactionId + 1);
+            this.externalLocations.addAll(recoveredExternalLocations);
+            logger.info("{} finished recovering records. Performing Checkpoint 
to ensure proper state of Partitions before updates", this);
+        } finally {
+            writeLock.unlock();
+        }
+        final long recoverNanos = System.nanoTime() - recoverStart;
+        final long recoveryMillis = 
TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
+        logger.info("Successfully recovered {} records in {} milliseconds", 
recordMap.size(), recoveryMillis);
+        checkpoint();
+
+        recovered = true;
+        return recordMap.values();
+    }
+
+    @Override
+    public Set<String> getRecoveredSwapLocations() throws IOException {
+        return recoveredExternalLocations;
+    }
+
+    private Long recoverFromSnapshot(final Map<Object, T> recordMap) throws 
IOException {
+        final boolean partialExists = Files.exists(partialPath);
+        final boolean snapshotExists = Files.exists(snapshotPath);
+
+        if (!partialExists && !snapshotExists) {
+            return null;
+        }
+
+        if (partialExists && snapshotExists) {
+            // both files exist -- assume we failed while checkpointing. Delete
+            // the partial file
+            Files.delete(partialPath);
+        } else if (partialExists) {
+            // partial exists but snapshot does not -- we must have completed
+            // creating the partial, deleted the snapshot
+            // but crashed before renaming the partial to the snapshot. Just
+            // rename partial to snapshot
+            Files.move(partialPath, snapshotPath);
+        }
+
+        if (Files.size(snapshotPath) == 0) {
+            logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file 
in recovery", this);
+            return null;
+        }
+
+        // at this point, we know the snapshotPath exists because if it 
didn't, then we either returned null
+        // or we renamed partialPath to snapshotPath. So just Recover from 
snapshotPath.
+        try (final DataInputStream dataIn = new DataInputStream(new 
BufferedInputStream(Files.newInputStream(snapshotPath, 
StandardOpenOption.READ)))) {
+            final String waliImplementationClass = dataIn.readUTF();
+            final int waliImplementationVersion = dataIn.readInt();
+
+            if 
(!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) {
+                throw new IOException("Write-Ahead Log located at " + 
snapshotPath + " was written using the " + waliImplementationClass + " class; 
cannot restore using " + getClass().getName());
+            }
+
+            if (waliImplementationVersion > getVersion()) {
+                throw new IOException("Write-Ahead Log located at " + 
snapshotPath + " was written using version " + waliImplementationVersion + " of 
the " + waliImplementationClass + " class; cannot restore using Version " + 
getVersion());
+            }
+
+            dataIn.readUTF(); // ignore serde class name for now
+            final int serdeVersion = dataIn.readInt();
+            final long maxTransactionId = dataIn.readLong();
+            final int numRecords = dataIn.readInt();
+
+            for (int i = 0; i < numRecords; i++) {
+                final T record = serde.deserializeRecord(dataIn, serdeVersion);
+                if (record == null) {
+                    throw new EOFException();
+                }
+
+                final UpdateType updateType = serde.getUpdateType(record);
+                if (updateType == UpdateType.DELETE) {
+                    logger.warn("While recovering from snapshot, found record 
with type 'DELETE'; this record will not be restored");
+                    continue;
+                }
+
+                logger.trace("Recovered from snapshot: {}", record);
+                recordMap.put(serde.getRecordIdentifier(record), record);
+            }
+
+            final int numSwapRecords = dataIn.readInt();
+            final Set<String> swapLocations = new HashSet<>();
+            for (int i = 0; i < numSwapRecords; i++) {
+                swapLocations.add(dataIn.readUTF());
+            }
+            this.recoveredExternalLocations.addAll(swapLocations);
+
+            logger.debug("{} restored {} Records and {} Swap Files from 
Snapshot, ending with Transaction ID {}", new Object[]{this, numRecords, 
recoveredExternalLocations.size(), maxTransactionId});
+            return maxTransactionId;
+        }
+    }
+
+    /**
+     * Recovers records from the edit logs via the Partitions. Returns a 
boolean
+     * if recovery of a Partition requires the Write-Ahead Log be checkpointed
+     * before modification.
+     *
+     * @param modifiableRecordMap
+     * @param maxTransactionIdRestored
+     * @return
+     * @throws IOException
+     */
+    private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, 
final Long maxTransactionIdRestored) throws IOException {
+        final Map<Object, T> updateMap = new HashMap<>();
+        final Map<Object, T> unmodifiableRecordMap = 
Collections.unmodifiableMap(modifiableRecordMap);
+        final Map<Object, T> ignorableMap = new HashMap<>();
+        final Set<String> ignorableSwapLocations = new HashSet<>();
+
+        // populate a map of the next transaction id for each partition to the
+        // partition that has that next transaction id.
+        final SortedMap<Long, Partition<T>> transactionMap = new TreeMap<>();
+        for (final Partition<T> partition : partitions) {
+            Long transactionId;
+            boolean keepTransaction;
+            do {
+                transactionId = partition.getNextRecoverableTransactionId();
+
+                keepTransaction = transactionId == null || 
maxTransactionIdRestored == null || transactionId > maxTransactionIdRestored;
+                if (keepTransaction && transactionId != null) {
+                    // map this transaction id to its partition so that we can
+                    // start restoring transactions from this partition,
+                    // starting at 'transactionId'
+                    transactionMap.put(transactionId, partition);
+                } else if (transactionId != null) {
+                    // skip the next transaction, because our snapshot already
+                    // contained this transaction.
+                    try {
+                        partition.recoverNextTransaction(ignorableMap, 
updateMap, ignorableSwapLocations);
+                    } catch (final EOFException e) {
+                        logger.error("{} unexpectedly reached End of File 
while reading from {} for Transaction {}; assuming crash and ignoring this 
transaction.",
+                                new Object[]{this, partition, transactionId});
+                    }
+                }
+            } while (!keepTransaction);
+        }
+
+        while (!transactionMap.isEmpty()) {
+            final Map.Entry<Long, Partition<T>> firstEntry = 
transactionMap.entrySet().iterator().next();
+            final Long firstTransactionId = firstEntry.getKey();
+            final Partition<T> nextPartition = firstEntry.getValue();
+
+            try {
+                updateMap.clear();
+                final Set<Object> idsRemoved = 
nextPartition.recoverNextTransaction(unmodifiableRecordMap, updateMap, 
recoveredExternalLocations);
+                modifiableRecordMap.putAll(updateMap);
+                for (final Object id : idsRemoved) {
+                    modifiableRecordMap.remove(id);
+                }
+            } catch (final EOFException e) {
+                logger.error("{} unexpectedly reached End-of-File when reading 
from {} for Transaction ID {}; assuming crash and ignoring this transaction",
+                        new Object[]{this, nextPartition, firstTransactionId});
+            }
+
+            transactionMap.remove(firstTransactionId);
+
+            Long subsequentTransactionId = null;
+            try {
+                subsequentTransactionId = 
nextPartition.getNextRecoverableTransactionId();
+            } catch (final IOException e) {
+                logger.error("{} unexpectedly found End-of-File when reading 
from {} for Transaction ID {}; assuming crash and ignoring this transaction",
+                        new Object[]{this, nextPartition, firstTransactionId});
+            }
+
+            if (subsequentTransactionId != null) {
+                transactionMap.put(subsequentTransactionId, nextPartition);
+            }
+        }
+
+        for (final Partition<T> partition : partitions) {
+            partition.endRecovery();
+        }
+    }
+
+    @Override
+    public synchronized int checkpoint() throws IOException {
+        final Set<T> records;
+        final Set<String> swapLocations;
+        final long maxTransactionId;
+
+        final long startNanos = System.nanoTime();
+
+        FileOutputStream fileOut = null;
+        DataOutputStream dataOut = null;
+
+        long stopTheWorldNanos = -1L;
+        long stopTheWorldStart = -1L;
+        try {
+            writeLock.lock();
+            try {
+                stopTheWorldStart = System.nanoTime();
+                // stop the world while we make a copy of the records that must
+                // be checkpointed and rollover the partitions.
+                // We copy the records because serializing them is potentially
+                // very expensive, especially when we have hundreds
+                // of thousands or even millions of them. We don't want to
+                // prevent WALI from being used during this time.
+
+                // So the design is to copy all of the records, determine the
+                // last transaction ID that the records represent,
+                // and roll over the partitions to new write-ahead logs.
+                // Then, outside of the write lock, we will serialize the data
+                // to disk, and then remove the old Partition data.
+                records = new HashSet<>(recordMap.values());
+                maxTransactionId = transactionIdGenerator.get() - 1;
+
+                swapLocations = new HashSet<>(externalLocations);
+                for (final Partition<T> partition : partitions) {
+                    partition.rollover();
+                }
+
+                // notify global sync with the write lock held. We do this 
because we don't want the repository to get updated
+                // while the listener is performing its necessary tasks
+                if (syncListener != null) {
+                    syncListener.onGlobalSync();
+                }
+            } finally {
+                writeLock.unlock();
+            }
+
+            stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
+
+            // perform checkpoint, writing to .partial file
+            fileOut = new FileOutputStream(partialPath.toFile());
+            dataOut = new DataOutputStream(fileOut);
+            dataOut.writeUTF(MinimalLockingWriteAheadLog.class.getName());
+            dataOut.writeInt(getVersion());
+            dataOut.writeUTF(serde.getClass().getName());
+            dataOut.writeInt(serde.getVersion());
+            dataOut.writeLong(maxTransactionId);
+            dataOut.writeInt(records.size());
+
+            for (final T record : records) {
+                logger.trace("Checkpointing {}", record);
+                serde.serializeRecord(record, dataOut);
+            }
+
+            dataOut.writeInt(swapLocations.size());
+            for (final String swapLocation : swapLocations) {
+                dataOut.writeUTF(swapLocation);
+            }
+        } finally {
+            if (dataOut != null) {
+                try {
+                    dataOut.flush();
+                    fileOut.getFD().sync();
+                    dataOut.close();
+                } catch (final IOException e) {
+                    logger.warn("Failed to close Data Stream due to {}", 
e.toString(), e);
+                }
+            }
+        }
+
+        // delete the snapshot, if it exists, and rename the .partial to
+        // snapshot
+        Files.deleteIfExists(snapshotPath);
+        Files.move(partialPath, snapshotPath);
+
+        // clear all of the edit logs
+        final long partitionStart = System.nanoTime();
+        for (final Partition<T> partition : partitions) {
+            // we can call clearOld without claiming the partition because it
+            // does not change the partition's state
+            // and the only member variable it touches cannot be modified, 
other
+            // than when #rollover() is called.
+            // And since this method is the only one that calls #rollover() and
+            // this method is synchronized,
+            // the value of that member variable will not change. And it's
+            // volatile, so we will get the correct value.
+            partition.clearOld();
+        }
+        final long partitionEnd = System.nanoTime();
+        numberBlackListedPartitions.set(0);
+
+        final long endNanos = System.nanoTime();
+        final long millis = TimeUnit.MILLISECONDS.convert(endNanos - 
startNanos, TimeUnit.NANOSECONDS);
+        final long partitionMillis = 
TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, 
TimeUnit.NANOSECONDS);
+        final long stopTheWorldMillis = 
TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos);
+
+        logger.info("{} checkpointed with {} Records and {} Swap Files in {} 
milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} 
millis), max Transaction ID {}",
+                new Object[]{this, records.size(), swapLocations.size(), 
millis, stopTheWorldMillis, partitionMillis, maxTransactionId});
+
+        return records.size();
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+        writeLock.lock();
+        try {
+            for (final Partition<T> partition : partitions) {
+                partition.close();
+            }
+        } finally {
+            writeLock.unlock();
+            lockChannel.close();
+        }
+    }
+
+    public int getVersion() {
+        return 1;
+    }
+
+    /**
+     * Represents a partition of this repository, which maps directly to a
+     * .journal file.
+     *
+     * All methods with the exceptions of {@link #claim()}, {@link 
#tryClaim()},
+     * and {@link #releaseClaim()} in this Partition MUST be called while
+     * holding the claim (via {@link #claim} or {@link #tryClaim()).
+     *
+     * @param <S>
+     */
+    private static class Partition<S> {
+
+        public static final String JOURNAL_EXTENSION = ".journal";
+        private static final Pattern JOURNAL_FILENAME_PATTERN = 
Pattern.compile("\\d+\\.journal");
+
+        private final SerDe<S> serde;
+
+        private final Path editDirectory;
+        private final int writeAheadLogVersion;
+
+        private final Lock lock = new ReentrantLock();
+        private DataOutputStream dataOut = null;
+        private FileOutputStream fileOut = null;
+        private boolean blackListed = false;
+        private boolean closed = false;
+        private DataInputStream recoveryIn;
+        private int recoveryVersion;
+        private String currentJournalFilename = "";
+
+        private static final byte TRANSACTION_CONTINUE = 1;
+        private static final byte TRANSACTION_COMMIT = 2;
+
+        private final String description;
+        private final AtomicLong maxTransactionId = new AtomicLong(-1L);
+        private final Logger logger = 
LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
+
+        private final Queue<Path> recoveryFiles;
+
+        public Partition(final Path path, final SerDe<S> serde, final int 
partitionIndex, final int writeAheadLogVersion) throws IOException {
+            this.editDirectory = path;
+            this.serde = serde;
+
+            final File file = path.toFile();
+            if (!file.exists() && !file.mkdirs()) {
+                throw new IOException("Could not create directory " + 
file.getAbsolutePath());
+            }
+
+            this.recoveryFiles = new LinkedBlockingQueue<>();
+            for (final Path recoveryPath : getRecoveryPaths()) {
+                recoveryFiles.add(recoveryPath);
+            }
+
+            this.description = "Partition-" + partitionIndex;
+            this.writeAheadLogVersion = writeAheadLogVersion;
+        }
+
+        public boolean tryClaim() {
+            final boolean obtainedLock = lock.tryLock();
+            if (!obtainedLock) {
+                return false;
+            }
+
+            // Check if the partition is blacklisted. If so, unlock it and 
return false. Otherwise,
+            // leave it locked and return true, so that the caller will need 
to unlock.
+            if (blackListed) {
+                lock.unlock();
+                return false;
+            }
+
+            return true;
+        }
+
+        public void releaseClaim() {
+            lock.unlock();
+        }
+
+        public void close() {
+            final DataOutputStream out = dataOut;
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (final Exception e) {
+
+                }
+            }
+
+            this.closed = true;
+            this.dataOut = null;
+        }
+
+        public void blackList() {
+            lock.lock();
+            try {
+                blackListed = true;
+            } finally {
+                lock.unlock();
+            }
+            logger.debug("Blacklisted {}", this);
+        }
+
+        /**
+         * Closes resources pointing to the current journal and begins writing
+         * to a new one
+         *
+         * @throws IOException
+         */
+        public void rollover() throws IOException {
+            lock.lock();
+            try {
+                final DataOutputStream out = dataOut;
+                if (out != null) {
+                    out.close();
+                }
+
+                final Path editPath = getNewEditPath();
+                final FileOutputStream fos = new 
FileOutputStream(editPath.toFile());
+                final DataOutputStream outStream = new DataOutputStream(new 
BufferedOutputStream(fos));
+                
outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
+                outStream.writeInt(writeAheadLogVersion);
+                outStream.writeUTF(serde.getClass().getName());
+                outStream.writeInt(serde.getVersion());
+                outStream.flush();
+                dataOut = outStream;
+                fileOut = fos;
+
+                currentJournalFilename = editPath.toFile().getName();
+
+                blackListed = false;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        private long getJournalIndex(final File file) {
+            final String filename = file.getName();
+            final int dotIndex = filename.indexOf(".");
+            final String number = filename.substring(0, dotIndex);
+            return Long.parseLong(number);
+        }
+
+        private Path getNewEditPath() {
+            final List<Path> recoveryPaths = getRecoveryPaths();
+            final long newIndex;
+            if (recoveryPaths == null || recoveryPaths.isEmpty()) {
+                newIndex = 1;
+            } else {
+                final long lastFileIndex = 
getJournalIndex(recoveryPaths.get(recoveryPaths.size() - 1).toFile());
+                newIndex = lastFileIndex + 1;
+            }
+
+            return editDirectory.resolve(newIndex + JOURNAL_EXTENSION);
+        }
+
+        private List<Path> getRecoveryPaths() {
+            final List<Path> paths = new ArrayList<>();
+
+            final File directory = editDirectory.toFile();
+            final File[] partitionFiles = directory.listFiles();
+            if (partitionFiles == null) {
+                return paths;
+            }
+
+            for (final File file : partitionFiles) {
+                // if file is a journal file but no data has yet been 
persisted, it may
+                // very well be a 0-byte file (the journal is not SYNC'ed to 
disk after
+                // a header is written out, so it may be lost). In this case, 
the journal
+                // is empty, so we can just skip it.
+                if (file.isDirectory() || file.length() == 0L) {
+                    continue;
+                }
+
+                if 
(!JOURNAL_FILENAME_PATTERN.matcher(file.getName()).matches()) {
+                    continue;
+                }
+
+                if (isJournalFile(file)) {
+                    paths.add(file.toPath());
+                } else {
+                    logger.warn("Found file {}, but could not access it, or it 
was not in the expected format; will ignore this file", file.getAbsolutePath());
+                }
+            }
+
+            // Sort journal files by the numeric portion of the filename
+            Collections.sort(paths, new Comparator<Path>() {
+                @Override
+                public int compare(final Path o1, final Path o2) {
+                    if (o1 == null && o2 == null) {
+                        return 0;
+                    }
+                    if (o1 == null) {
+                        return 1;
+                    }
+                    if (o2 == null) {
+                        return -1;
+                    }
+
+                    final long index1 = getJournalIndex(o1.toFile());
+                    final long index2 = getJournalIndex(o2.toFile());
+                    return Long.compare(index1, index2);
+                }
+            });
+
+            return paths;
+        }
+
+        void clearOld() {
+            final List<Path> oldRecoveryFiles = getRecoveryPaths();
+
+            for (final Path path : oldRecoveryFiles) {
+                final File file = path.toFile();
+                if (file.getName().equals(currentJournalFilename)) {
+                    continue;
+                }
+                if (file.exists()) {
+                    file.delete();
+                }
+            }
+        }
+
+        private boolean isJournalFile(final File file) {
+            final String expectedStartsWith = 
MinimalLockingWriteAheadLog.class.getName();
+            try {
+                try (final FileInputStream fis = new FileInputStream(file);
+                        final InputStream bufferedIn = new 
BufferedInputStream(fis);
+                        final DataInputStream in = new 
DataInputStream(bufferedIn)) {
+                    final String waliImplClassName = in.readUTF();
+                    if (!expectedStartsWith.equals(waliImplClassName)) {
+                        return false;
+                    }
+                }
+            } catch (final IOException e) {
+                return false;
+            }
+
+            return true;
+        }
+
+        public void update(final Collection<S> records, final long 
transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws 
IOException {
+            if (this.closed) {
+                throw new IllegalStateException("Partition is closed");
+            }
+
+            final DataOutputStream out = dataOut;
+            out.writeLong(transactionId);
+
+            final int numEditsToSerialize = records.size();
+            int editsSerialized = 0;
+            for (final S record : records) {
+                final Object recordId = serde.getRecordIdentifier(record);
+                final S previousVersion = recordMap.get(recordId);
+
+                serde.serializeEdit(previousVersion, record, out);
+                if (++editsSerialized < numEditsToSerialize) {
+                    out.write(TRANSACTION_CONTINUE);
+                } else {
+                    out.write(TRANSACTION_COMMIT);
+                }
+            }
+
+            out.flush();
+
+            if (forceSync) {
+                fileOut.getFD().sync();
+            }
+        }
+
+        private DataInputStream createDataInputStream(final Path path) throws 
IOException {
+            return new DataInputStream(new 
BufferedInputStream(Files.newInputStream(path)));
+        }
+
+        private DataInputStream getRecoveryStream() throws IOException {
+            if (recoveryIn != null && hasMoreData(recoveryIn)) {
+                return recoveryIn;
+            }
+
+            while (true) {
+                final Path nextRecoveryPath = recoveryFiles.poll();
+                if (nextRecoveryPath == null) {
+                    return null;
+                }
+
+                recoveryIn = createDataInputStream(nextRecoveryPath);
+                if (hasMoreData(recoveryIn)) {
+                    final String waliImplementationClass = 
recoveryIn.readUTF();
+                    if 
(!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
+                        continue;
+                    }
+
+                    final long waliVersion = recoveryIn.readInt();
+                    if (waliVersion > writeAheadLogVersion) {
+                        throw new IOException("Cannot recovery from file " + 
nextRecoveryPath + " because it was written using WALI version " + waliVersion 
+ ", but the version used to restore it is only " + writeAheadLogVersion);
+                    }
+
+                    @SuppressWarnings("unused")
+                    final String serdeClassName = recoveryIn.readUTF();
+                    this.recoveryVersion = recoveryIn.readInt();
+
+                    break;
+                }
+            }
+
+            return recoveryIn;
+        }
+
+        public Long getNextRecoverableTransactionId() throws IOException {
+            while (true) {
+                DataInputStream recoveryStream = getRecoveryStream();
+                if (recoveryStream == null) {
+                    return null;
+                }
+
+                final long transactionId;
+                try {
+                    transactionId = recoveryIn.readLong();
+                } catch (final EOFException e) {
+                    continue;
+                }
+
+                this.maxTransactionId.set(transactionId);
+                return transactionId;
+            }
+        }
+
+        private boolean hasMoreData(final InputStream in) throws IOException {
+            in.mark(1);
+            final int nextByte = in.read();
+            in.reset();
+            return nextByte >= 0;
+        }
+
+        public void endRecovery() throws IOException {
+            if (recoveryIn != null) {
+                recoveryIn.close();
+            }
+
+            final Path nextRecoveryPath = this.recoveryFiles.poll();
+            if (nextRecoveryPath != null) {
+                throw new IllegalStateException("Signaled to end recovery, but 
there are more recovery files for Partition in directory " + editDirectory);
+            }
+
+            final Path newEditPath = getNewEditPath();
+
+            final FileOutputStream fos = new 
FileOutputStream(newEditPath.toFile());
+            final DataOutputStream outStream = new DataOutputStream(new 
BufferedOutputStream(fos));
+            outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
+            outStream.writeInt(writeAheadLogVersion);
+            outStream.writeUTF(serde.getClass().getName());
+            outStream.writeInt(serde.getVersion());
+            outStream.flush();
+            dataOut = outStream;
+            fileOut = fos;
+        }
+
+        public Set<Object> recoverNextTransaction(final Map<Object, S> 
currentRecordMap, final Map<Object, S> updatedRecordMap, final Set<String> 
swapLocations) throws IOException {
+            final Set<Object> idsRemoved = new HashSet<>();
+
+            int transactionFlag;
+            do {
+                final S record = serde.deserializeEdit(recoveryIn, 
currentRecordMap, recoveryVersion);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("{} Recovering Transaction {}: {}", new 
Object[]{this, maxTransactionId.get(), record});
+                }
+
+                final Object recordId = serde.getRecordIdentifier(record);
+                final UpdateType updateType = serde.getUpdateType(record);
+                if (updateType == UpdateType.DELETE) {
+                    updatedRecordMap.remove(recordId);
+                    idsRemoved.add(recordId);
+                } else if (updateType == UpdateType.SWAP_IN) {
+                    final String location = serde.getLocation(record);
+                    if (location == null) {
+                        logger.error("Recovered SWAP_IN record from edit log, 
but it did not contain a Location; skipping record");
+                    } else {
+                        swapLocations.remove(location);
+                        updatedRecordMap.put(recordId, record);
+                        idsRemoved.remove(recordId);
+                    }
+                } else if (updateType == UpdateType.SWAP_OUT) {
+                    final String location = serde.getLocation(record);
+                    if (location == null) {
+                        logger.error("Recovered SWAP_OUT record from edit log, 
but it did not contain a Location; skipping record");
+                    } else {
+                        swapLocations.add(location);
+                        updatedRecordMap.remove(recordId);
+                        idsRemoved.add(recordId);
+                    }
+                } else {
+                    updatedRecordMap.put(recordId, record);
+                    idsRemoved.remove(recordId);
+                }
+
+                transactionFlag = recoveryIn.read();
+            } while (transactionFlag != TRANSACTION_COMMIT);
+
+            return idsRemoved;
+        }
+
+        /**
+         * Must be called after recovery has finished
+         *
+         * @return
+         */
+        public long getMaxRecoveredTransactionId() {
+            return maxTransactionId.get();
+        }
+
+        @Override
+        public String toString() {
+            return description;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/SerDe.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/SerDe.java 
b/commons/wali/src/main/java/org/wali/SerDe.java
new file mode 100644
index 0000000..bbc7efb
--- /dev/null
+++ b/commons/wali/src/main/java/org/wali/SerDe.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * A mechanism for Serializing and De-Serializing a Record of a given Type
+ *
+ * @param <T> the type of record that is to be Serialized and De-Serialized by
+ * this object
+ */
+public interface SerDe<T> {
+
+    /**
+     * <p>
+     * Serializes an Edit Record to the log via the given
+     * {@link DataOutputStream}.
+     * </p>
+     *
+     * @param previousRecordState
+     * @param newRecordState
+     * @param out
+     * @throws IOException
+     */
+    void serializeEdit(T previousRecordState, T newRecordState, 
DataOutputStream out) throws IOException;
+
+    /**
+     * <p>
+     * Serializes a Record in a form suitable for a Snapshot via the given
+     * {@link DataOutputStream}.
+     * </p>
+     *
+     * @param record
+     * @param out
+     * @throws IOException
+     */
+    void serializeRecord(T record, DataOutputStream out) throws IOException;
+
+    /**
+     * <p>
+     * Reads an Edit Record from the given {@link DataInputStream} and merges
+     * that edit with the current version of the record, returning the new,
+     * merged version. If the Edit Record indicates that the entity was 
deleted,
+     * must return a Record with an UpdateType of {@link UpdateType#DELETE}.
+     * This method must never return <code>null</code>.
+     * </p>
+     *
+     * @param in
+     * @param currentRecordStates an unmodifiable map of Record ID's to the
+     * current state of that record
+     * @param version the version of the SerDe that was used to serialize the
+     * edit record
+     * @return
+     * @throws IOException
+     */
+    T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, 
int version) throws IOException;
+
+    /**
+     * <p>
+     * Reads a Record from the given {@link DataInputStream} and returns this
+     * record. If no data is available, returns <code>null</code>.
+     * </p>
+     *
+     * @param in
+     * @param version the version of the SerDe that was used to serialize the
+     * record
+     * @return
+     * @throws IOException
+     */
+    T deserializeRecord(DataInputStream in, int version) throws IOException;
+
+    /**
+     * Returns the unique ID for the given record
+     *
+     * @param record
+     * @return
+     */
+    Object getRecordIdentifier(T record);
+
+    /**
+     * Returns the UpdateType for the given record
+     *
+     * @param record
+     * @return
+     */
+    UpdateType getUpdateType(T record);
+
+    /**
+     * Returns the external location of the given record; this is used when a
+     * record is moved away from WALI or is being re-introduced to WALI. For
+     * example, WALI can be updated with a record of type
+     * {@link UpdateType#SWAP_OUT} that indicates a Location of
+     * file://tmp/external1 and can then be re-introduced to WALI by updating
+     * WALI with a record of type {@link UpdateType#CREATE} that indicates a
+     * Location of file://tmp/external1
+     *
+     * @param record
+     * @return
+     */
+    String getLocation(T record);
+
+    /**
+     * Returns the version that this SerDe will use when writing. This used 
used
+     * when serializing/deserializing the edit logs so that if the version
+     * changes, we are still able to deserialize old versions
+     *
+     * @return
+     */
+    int getVersion();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/SyncListener.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/SyncListener.java 
b/commons/wali/src/main/java/org/wali/SyncListener.java
new file mode 100644
index 0000000..ffb11ca
--- /dev/null
+++ b/commons/wali/src/main/java/org/wali/SyncListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+/**
+ * <p>
+ * Provides a callback mechanism by which applicable listeners can be notified
+ * when a WriteAheadRepository is synched (via the
+ * {@link WriteAheadRepository#sync()} method) or one of its partitions is
+ * synched via
+ * {@link WriteAheadRepository#update(java.util.Collection, boolean)} with a
+ * value of <code>true</code> for the second argument.
+ * </p>
+ *
+ * <p>
+ * It is not required that an implementation of {@link WriteAheadRepository}
+ * support this interface. Those that do generally will require that the
+ * listener be injected via the constructor.
+ * </p>
+ *
+ * <p>
+ * All implementations of this interface must be thread-safe.
+ * </p>
+ *
+ * <p>
+ * The {@link #onSync(int)} method will always be called while the associated
+ * partition is locked. The {@link #onGlobalSync()} will always be called while
+ * the entire repository is locked.
+ * </p>
+ *
+ */
+public interface SyncListener {
+
+    /**
+     * This method is called whenever a specific partition is synched via the
+     * {@link WriteAheadRepository#update(java.util.Collection, boolean)} 
method
+     *
+     * @param partitionIndex the index of the partition that was synched
+     */
+    void onSync(int partitionIndex);
+
+    /**
+     * This method is called whenever the entire
+     * <code>WriteAheadRepository</code> is synched via the
+     * {@link WriteAheadRepository#sync()} method.
+     */
+    void onGlobalSync();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/UpdateType.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/UpdateType.java 
b/commons/wali/src/main/java/org/wali/UpdateType.java
new file mode 100644
index 0000000..1b039f8
--- /dev/null
+++ b/commons/wali/src/main/java/org/wali/UpdateType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+/**
+ * <p>
+ * Enumerates the valid types of things that can cause a
+ * {@link WriteAheadRepository} to update its state</p>
+ */
+public enum UpdateType {
+
+    /**
+     * Used when a new Record has been created
+     */
+    CREATE,
+    /**
+     * Used when a Record has been updated in some way
+     */
+    UPDATE,
+    /**
+     * Used to indicate that a Record has been deleted and should be removed
+     * from the Repository
+     */
+    DELETE,
+    /**
+     * Used to indicate that a Record still exists but has been moved 
elsewhere,
+     * so that it is no longer maintained by the WALI instance
+     */
+    SWAP_OUT,
+    /**
+     * Used to indicate that a Record that was previously Swapped Out is now
+     * being Swapped In
+     */
+    SWAP_IN;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/WriteAheadRepository.java 
b/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
new file mode 100644
index 0000000..4567872
--- /dev/null
+++ b/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * <p>
+ * A WriteAheadRepository is used to persist state that is otherwise kept
+ * in-memory. The Repository does not provide any query capability except to
+ * allow the data to be recovered upon restart of the system.
+ * </p>
+ *
+ * <p>
+ * A WriteAheadRepository operates by writing every update to an Edit Log. On
+ * restart, the data can be recovered by replaying all of the updates that are
+ * found in the Edit Log. This can, however, eventually result in very large
+ * Edit Logs, which can both take up massive amounts of disk space and take a
+ * long time to recover. In order to prevent this, the Repository provides a
+ * Checkpointing capability. This allows the current in-memory state of the
+ * Repository to be flushed to disk and the Edit Log to be deleted, thereby
+ * compacting the amount of space required to store the Repository. After a
+ * Checkpoint is performed, modifications are again written to an Edit Log. At
+ * this point, when the system is to be restored, it is restored by first
+ * loading the Checkpointed version of the Repository and then replaying the
+ * Edit Log.
+ * </p>
+ *
+ * <p>
+ * All implementations of <code>WriteAheadRepository</code> use one or more
+ * partitions to manage their Edit Logs. An implementation may require exactly
+ * one partition or may allow many partitions.
+ * </p>
+ *
+ * @param <T>
+ */
+public interface WriteAheadRepository<T> {
+
+    /**
+     * <p>
+     * Updates the repository with the specified Records. The Collection must
+     * not contain multiple records with the same ID
+     * </p>
+     *
+     * @param records the records to update
+     * @param forceSync specifies whether or not the Repository forces the data
+     * to be flushed to disk. If false, the data may be stored in Operating
+     * System buffers, which improves performance but could cause loss of data
+     * if power is lost or the Operating System crashes
+     * @throws IOException
+     * @throws IllegalArgumentException if multiple records within the given
+     * Collection have the same ID, as specified by {@link Record#getId()}
+     * method
+     *
+     * @return the index of the Partition that performed the update
+     */
+    int update(Collection<T> records, boolean forceSync) throws IOException;
+
+    /**
+     * <p>
+     * Recovers all records from the persisted state. This method must be 
called
+     * before any updates are issued to the Repository.
+     * </p>
+     *
+     * @return
+     * @throws IOException
+     * @throws IllegalStateException if any updates have been issued against
+     * this Repository before this method is invoked
+     */
+    Collection<T> recoverRecords() throws IOException;
+
+    /**
+     * <p>
+     * Recovers all External Swap locations that were persisted. If this method
+     * is to be called, it must be called AFTER {@link #recoverRecords()} and
+     * BEFORE {@link update}.
+     * </p>
+     *
+     * @return
+     * @throws IOException
+     */
+    Set<String> getRecoveredSwapLocations() throws IOException;
+
+    /**
+     * <p>
+     * Compacts the contents of the Repository so that rather than having a
+     * Snapshot and an Edit Log indicating many Updates to the Snapshot, the
+     * Snapshot is updated to contain the current state of the Repository, and
+     * the edit log is purged.
+     * </p>
+     *
+     *
+     * @return the number of records that were written to the new snapshot
+     * @throws java.io.IOException
+     */
+    int checkpoint() throws IOException;
+
+    /**
+     * <p>
+     * Causes the repository to checkpoint and then close any open resources.
+     * </p>
+     *
+     * @throws IOException
+     */
+    void shutdown() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/test/java/org/wali/DummyRecord.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/DummyRecord.java 
b/commons/wali/src/test/java/org/wali/DummyRecord.java
new file mode 100644
index 0000000..e0f7f96
--- /dev/null
+++ b/commons/wali/src/test/java/org/wali/DummyRecord.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DummyRecord {
+
+    private final String id;
+    private final Map<String, String> props;
+    private final UpdateType updateType;
+
+    public DummyRecord(final String id, final UpdateType updateType) {
+        this.id = id;
+        this.props = new HashMap<>();
+        this.updateType = updateType;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public UpdateType getUpdateType() {
+        return updateType;
+    }
+
+    public DummyRecord setProperties(final Map<String, String> props) {
+        this.props.clear();
+        this.props.putAll(props);
+        return this;
+    }
+
+    public DummyRecord setProperty(final String name, final String value) {
+        this.props.put(name, value);
+        return this;
+    }
+
+    public Map<String, String> getProperties() {
+        return Collections.unmodifiableMap(this.props);
+    }
+
+    public String getProperty(final String name) {
+        return props.get(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/DummyRecordSerde.java 
b/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
new file mode 100644
index 0000000..8cc7860
--- /dev/null
+++ b/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Map;
+
+public class DummyRecordSerde implements SerDe<DummyRecord> {
+
+    public static final int NUM_UPDATE_TYPES = UpdateType.values().length;
+    private int throwIOEAfterNserializeEdits = -1;
+    private int serializeEditCount = 0;
+
+    @Override
+    public void serializeEdit(final DummyRecord previousState, final 
DummyRecord record, final DataOutputStream out) throws IOException {
+        if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= 
throwIOEAfterNserializeEdits)) {
+            throw new IOException("Serialized " + (serializeEditCount - 1) + " 
records successfully, so now it's time to throw IOE");
+        }
+
+        out.write(record.getUpdateType().ordinal());
+        out.writeUTF(record.getId());
+
+        if (record.getUpdateType() != UpdateType.DELETE) {
+            final Map<String, String> props = record.getProperties();
+            out.writeInt(props.size());
+            for (final Map.Entry<String, String> entry : props.entrySet()) {
+                out.writeUTF(entry.getKey());
+                out.writeUTF(entry.getValue());
+            }
+        }
+    }
+
+    @Override
+    public void serializeRecord(final DummyRecord record, final 
DataOutputStream out) throws IOException {
+        serializeEdit(null, record, out);
+    }
+
+    @Override
+    public DummyRecord deserializeRecord(final DataInputStream in, final int 
version) throws IOException {
+        final int index = in.read();
+        if (index < 0) {
+            throw new EOFException();
+        }
+        if (index >= NUM_UPDATE_TYPES) {
+            throw new IOException("Corrupt stream; got UpdateType value of " + 
index + " but there are only " + NUM_UPDATE_TYPES + " valid values");
+        }
+        final UpdateType updateType = UpdateType.values()[index];
+        final String id = in.readUTF();
+        final DummyRecord record = new DummyRecord(id, updateType);
+
+        if (record.getUpdateType() != UpdateType.DELETE) {
+            final int numProps = in.readInt();
+            for (int i = 0; i < numProps; i++) {
+                final String key = in.readUTF();
+                final String value = in.readUTF();
+                record.setProperty(key, value);
+            }
+        }
+        return record;
+    }
+
+    @Override
+    public Object getRecordIdentifier(final DummyRecord record) {
+        return record.getId();
+    }
+
+    @Override
+    public UpdateType getUpdateType(final DummyRecord record) {
+        return record.getUpdateType();
+    }
+
+    @Override
+    public DummyRecord deserializeEdit(final DataInputStream in, final 
Map<Object, DummyRecord> currentVersion, final int version) throws IOException {
+        return deserializeRecord(in, version);
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    public void setThrowIOEAfterNSerializeEdits(final int n) {
+        this.throwIOEAfterNserializeEdits = n;
+    }
+
+    @Override
+    public String getLocation(final DummyRecord record) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java 
b/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
new file mode 100644
index 0000000..57f3495
--- /dev/null
+++ b/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMinimalLockingWriteAheadLog {
+
+    @Test
+    public void testWrite() throws IOException, InterruptedException {
+        final int numPartitions = 8;
+
+        final Path path = Paths.get("target/minimal-locking-repo");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final List<InsertThread> threads = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            threads.add(new InsertThread(10000, 1000000 * i, repo));
+        }
+
+        final long start = System.nanoTime();
+        for (final InsertThread thread : threads) {
+            thread.start();
+        }
+        for (final InsertThread thread : threads) {
+            thread.join();
+        }
+        final long nanos = System.nanoTime() - start;
+        final long millis = TimeUnit.MILLISECONDS.convert(nanos, 
TimeUnit.NANOSECONDS);
+        System.out.println("Took " + millis + " millis to insert 1,000,000 
records each in its own transaction");
+        repo.shutdown();
+
+        final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> recoveredRecords = 
recoverRepo.recoverRecords();
+        assertFalse(recoveredRecords.isEmpty());
+        assertEquals(100000, recoveredRecords.size());
+        for (final DummyRecord record : recoveredRecords) {
+            final Map<String, String> recoveredProps = record.getProperties();
+            assertEquals(1, recoveredProps.size());
+            assertEquals("B", recoveredProps.get("A"));
+        }
+    }
+
+    @Test
+    public void testRecoverAfterIOException() throws IOException {
+        final int numPartitions = 5;
+        final Path path = 
Paths.get("target/minimal-locking-repo-test-recover-after-ioe");
+        deleteRecursively(path.toFile());
+        Files.createDirectories(path);
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        serde.setThrowIOEAfterNSerializeEdits(7);   // serialize the 2 
transactions, then the first edit of the third transaction; then throw 
IOException
+
+        final List<DummyRecord> firstTransaction = new ArrayList<>();
+        firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
+
+        final List<DummyRecord> secondTransaction = new ArrayList<>();
+        secondTransaction.add(new DummyRecord("1", 
UpdateType.UPDATE).setProperty("abc", "123"));
+        secondTransaction.add(new DummyRecord("2", 
UpdateType.UPDATE).setProperty("cba", "123"));
+        secondTransaction.add(new DummyRecord("3", 
UpdateType.UPDATE).setProperty("aaa", "123"));
+
+        final List<DummyRecord> thirdTransaction = new ArrayList<>();
+        thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
+        thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
+
+        repo.update(firstTransaction, true);
+        repo.update(secondTransaction, true);
+        try {
+            repo.update(thirdTransaction, true);
+            Assert.fail("Did not throw IOException on third transaction");
+        } catch (final IOException e) {
+            // expected behavior.
+        }
+
+        repo.shutdown();
+
+        serde.setThrowIOEAfterNSerializeEdits(-1);
+        final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> recoveredRecords = 
recoverRepo.recoverRecords();
+        assertFalse(recoveredRecords.isEmpty());
+        assertEquals(3, recoveredRecords.size());
+
+        boolean record1 = false, record2 = false, record3 = false;
+        for (final DummyRecord record : recoveredRecords) {
+            switch (record.getId()) {
+                case "1":
+                    record1 = true;
+                    assertEquals("123", record.getProperty("abc"));
+                    break;
+                case "2":
+                    record2 = true;
+                    assertEquals("123", record.getProperty("cba"));
+                    break;
+                case "3":
+                    record3 = true;
+                    assertEquals("123", record.getProperty("aaa"));
+                    break;
+            }
+        }
+
+        assertTrue(record1);
+        assertTrue(record2);
+        assertTrue(record3);
+    }
+
+    @Test
+    public void testCannotModifyLogAfterAllAreBlackListed() throws IOException 
{
+        final int numPartitions = 5;
+        final Path path = 
Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted");
+        deleteRecursively(path.toFile());
+        Files.createDirectories(path);
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        serde.setThrowIOEAfterNSerializeEdits(3);   // serialize the first 
transaction, then fail on all subsequent transactions
+
+        final List<DummyRecord> firstTransaction = new ArrayList<>();
+        firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
+
+        final List<DummyRecord> secondTransaction = new ArrayList<>();
+        secondTransaction.add(new DummyRecord("1", 
UpdateType.UPDATE).setProperty("abc", "123"));
+        secondTransaction.add(new DummyRecord("2", 
UpdateType.UPDATE).setProperty("cba", "123"));
+        secondTransaction.add(new DummyRecord("3", 
UpdateType.UPDATE).setProperty("aaa", "123"));
+
+        final List<DummyRecord> thirdTransaction = new ArrayList<>();
+        thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
+        thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
+
+        repo.update(firstTransaction, true);
+
+        try {
+            repo.update(secondTransaction, true);
+            Assert.fail("Did not throw IOException on second transaction");
+        } catch (final IOException e) {
+            // expected behavior.
+        }
+
+        for (int i = 0; i < 4; i++) {
+            try {
+                repo.update(thirdTransaction, true);
+                Assert.fail("Did not throw IOException on third transaction");
+            } catch (final IOException e) {
+                // expected behavior.
+            }
+        }
+
+        serde.setThrowIOEAfterNSerializeEdits(-1);
+        final List<DummyRecord> fourthTransaction = new ArrayList<>();
+        fourthTransaction.add(new DummyRecord("1", UpdateType.DELETE));
+
+        try {
+            repo.update(fourthTransaction, true);
+            Assert.fail("Successfully updated repo for 4th transaction");
+        } catch (final IOException e) {
+            // expected behavior
+            assertTrue(e.getMessage().contains("All Partitions have been 
blacklisted"));
+        }
+
+        repo.shutdown();
+        serde.setThrowIOEAfterNSerializeEdits(-1);
+
+        final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> recoveredRecords = 
recoverRepo.recoverRecords();
+        assertFalse(recoveredRecords.isEmpty());
+        assertEquals(3, recoveredRecords.size());
+    }
+
+    @Test
+    public void testStriping() throws IOException {
+        final int numPartitions = 6;
+        final Path path = Paths.get("target/minimal-locking-repo-striped");
+        deleteRecursively(path.toFile());
+        Files.createDirectories(path);
+
+        final SortedSet<Path> paths = new TreeSet<>();
+        paths.add(path.resolve("stripe-1"));
+        paths.add(path.resolve("stripe-2"));
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(paths, numPartitions, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final InsertThread inserter = new InsertThread(100000, 0, repo);
+        inserter.run();
+
+        for (final Path partitionPath : paths) {
+            final File[] files = partitionPath.toFile().listFiles(new 
FileFilter() {
+                @Override
+                public boolean accept(File pathname) {
+                    return pathname.getName().startsWith("partition");
+                }
+            });
+            assertEquals(3, files.length);
+
+            for (final File file : files) {
+                final File[] journalFiles = file.listFiles();
+                assertEquals(1, journalFiles.length);
+            }
+        }
+
+        repo.checkpoint();
+
+    }
+
+    private static class InsertThread extends Thread {
+
+        private final List<List<DummyRecord>> records;
+        private final WriteAheadRepository<DummyRecord> repo;
+
+        public InsertThread(final int numInsertions, final int startIndex, 
final WriteAheadRepository<DummyRecord> repo) {
+            records = new ArrayList<>();
+            for (int i = 0; i < numInsertions; i++) {
+                final DummyRecord record = new DummyRecord(String.valueOf(i + 
startIndex), UpdateType.CREATE);
+                record.setProperty("A", "B");
+                final List<DummyRecord> list = new ArrayList<>();
+                list.add(record);
+                records.add(list);
+            }
+            this.repo = repo;
+        }
+
+        @Override
+        public void run() {
+            try {
+                int counter = 0;
+                for (final List<DummyRecord> list : records) {
+                    final boolean forceSync = (++counter == records.size());
+                    repo.update(list, forceSync);
+                }
+            } catch (IOException e) {
+                Assert.fail("Failed to update: " + e.toString());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void deleteRecursively(final File file) {
+        final File[] children = file.listFiles();
+        if (children != null) {
+            for (final File child : children) {
+                deleteRecursively(child);
+            }
+        }
+
+        file.delete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/pom.xml 
b/extensions/file-authorization-provider/pom.xml
new file mode 100644
index 0000000..f8d823f
--- /dev/null
+++ b/extensions/file-authorization-provider/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>file-authorization-provider</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>Authorization Provider: File</name>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+            <resource>
+                <directory>src/main/xsd</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>com.sun.tools.xjc.maven2</groupId>
+                <artifactId>maven-jaxb-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>current</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <schemaDirectory>src/main/xsd</schemaDirectory>
+                            <includeSchemas>
+                                <includeSchema>**/*.xsd</includeSchema>
+                            </includeSchemas>
+                            
<generatePackage>org.apache.nifi.user.generated</generatePackage>
+                        </configuration>
+                    </execution>
+                </executions>
+                <configuration>
+                    
<generateDirectory>${project.build.directory}/generated-sources/jaxb</generateDirectory>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>[0.0.1-SNAPSHOT, 1.0.0-SNAPSHOT)</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-file-utils</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+            <version>[0.0.1-SNAPSHOT, 1.0.0-SNAPSHOT)</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.6</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.10</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

Reply via email to