Repository: nifi
Updated Branches:
  refs/heads/master 14fef2de1 -> 0bcb241db


http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java
new file mode 100644
index 0000000..2492283
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+public class TestBlockingQueuePool {
+    private static final Consumer<AtomicBoolean> DO_NOTHING = ab -> {};
+
+    @Test
+    public void testReuse() {
+        final BlockingQueuePool<AtomicBoolean> pool = new 
BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
+
+        final AtomicBoolean firstObject = pool.borrowObject();
+        firstObject.set(true);
+        pool.returnObject(firstObject);
+
+        for (int i = 0; i < 100; i++) {
+            final AtomicBoolean value = pool.borrowObject();
+            assertSame(firstObject, value);
+            pool.returnObject(value);
+        }
+    }
+
+    @Test
+    public void testCreateOnExhaustion() {
+        final BlockingQueuePool<AtomicBoolean> pool = new 
BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
+
+        final AtomicBoolean firstObject = pool.borrowObject();
+        final AtomicBoolean secondObject = pool.borrowObject();
+
+        assertNotSame(firstObject, secondObject);
+    }
+
+    @Test
+    public void testCreateMoreThanMaxCapacity() {
+        final BlockingQueuePool<AtomicBoolean> pool = new 
BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
+
+        for (int i = 0; i < 50; i++) {
+            final AtomicBoolean value = pool.borrowObject();
+            assertNotNull(value);
+        }
+    }
+
+    @Test
+    public void testDoesNotBufferMoreThanCapacity() {
+        final BlockingQueuePool<AtomicBoolean> pool = new 
BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
+
+        final AtomicBoolean[] seen = new AtomicBoolean[50];
+        for (int i = 0; i < 50; i++) {
+            final AtomicBoolean value = pool.borrowObject();
+            assertNotNull(value);
+            value.set(true);
+            seen[i] = value;
+        }
+
+        for (final AtomicBoolean value : seen) {
+            pool.returnObject(value);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            final AtomicBoolean value = pool.borrowObject();
+
+            // verify that the object exists in the 'seen' array
+            boolean found = false;
+            for (final AtomicBoolean seenBoolean : seen) {
+                if (value == seenBoolean) {
+                    found = true;
+                    break;
+                }
+            }
+
+            assertTrue(found);
+        }
+
+        for (int i = 0; i < 40; i++) {
+            final AtomicBoolean value = pool.borrowObject();
+
+            // verify that the object does not exist in the 'seen' array
+            boolean found = false;
+            for (final AtomicBoolean seenBoolean : seen) {
+                if (value == seenBoolean) {
+                    found = true;
+                    break;
+                }
+            }
+
+            assertFalse(found);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java
new file mode 100644
index 0000000..692500e
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.wali.DummyRecord;
+import org.wali.DummyRecordSerde;
+import org.wali.SerDeFactory;
+import org.wali.SingletonSerDeFactory;
+import org.wali.UpdateType;
+
+public class TestHashMapSnapshot {
+
+    private final File storageDirectory = new 
File("target/test-hashmap-snapshot");
+    private DummyRecordSerde serde;
+    private SerDeFactory<DummyRecord> serdeFactory;
+
+    @Before
+    public void setup() throws IOException {
+        if (!storageDirectory.exists()) {
+            Files.createDirectories(storageDirectory.toPath());
+        }
+
+        final File[] childFiles = storageDirectory.listFiles();
+        for (final File childFile : childFiles) {
+            if (childFile.isFile()) {
+                Files.delete(childFile.toPath());
+            }
+        }
+
+        serde = new DummyRecordSerde();
+        serdeFactory = new SingletonSerDeFactory<>(serde);
+
+    }
+
+    @Test
+    public void testSuccessfulRoundTrip() throws IOException {
+        final HashMapSnapshot<DummyRecord> snapshot = new 
HashMapSnapshot<>(storageDirectory, serdeFactory);
+        final Map<String, String> props = new HashMap<>();
+
+        for (int i = 0; i < 10; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            props.put("key", String.valueOf(i));
+            record.setProperties(props);
+            snapshot.update(Collections.singleton(record));
+        }
+
+        for (int i = 2; i < 10; i += 2) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.DELETE);
+            snapshot.update(Collections.singleton(record));
+        }
+
+        for (int i = 1; i < 10; i += 2) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.SWAP_OUT);
+            record.setSwapLocation("swapFile-" + i);
+            snapshot.update(Collections.singleton(record));
+        }
+
+        final DummyRecord swapIn7 = new DummyRecord("7", UpdateType.SWAP_IN);
+        swapIn7.setSwapLocation("swapFile-7");
+        snapshot.update(Collections.singleton(swapIn7));
+
+        final Set<String> swappedOutLocations = new HashSet<>();
+        swappedOutLocations.add("swapFile-1");
+        swappedOutLocations.add("swapFile-3");
+        swappedOutLocations.add("swapFile-5");
+        swappedOutLocations.add("swapFile-9");
+
+        final SnapshotCapture<DummyRecord> capture = 
snapshot.prepareSnapshot(180L);
+        assertEquals(180L, capture.getMaxTransactionId());
+        assertEquals(swappedOutLocations, capture.getSwapLocations());
+
+        final Map<Object, DummyRecord> records = capture.getRecords();
+        assertEquals(2, records.size());
+        assertTrue(records.containsKey("0"));
+        assertTrue(records.containsKey("7"));
+
+        snapshot.writeSnapshot(capture);
+
+        final SnapshotRecovery<DummyRecord> recovery = snapshot.recover();
+        assertEquals(180L, recovery.getMaxTransactionId());
+        assertEquals(swappedOutLocations, 
recovery.getRecoveredSwapLocations());
+
+        final Map<Object, DummyRecord> recoveredRecords = 
recovery.getRecords();
+        assertEquals(records, recoveredRecords);
+    }
+
+    @Test
+    public void testOOMEWhenWritingResultsInPreviousSnapshotStillRecoverable() 
throws IOException {
+        final HashMapSnapshot<DummyRecord> snapshot = new 
HashMapSnapshot<>(storageDirectory, serdeFactory);
+        final Map<String, String> props = new HashMap<>();
+
+        for (int i = 0; i < 11; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            props.put("key", String.valueOf(i));
+            record.setProperties(props);
+            snapshot.update(Collections.singleton(record));
+        }
+
+        final DummyRecord swapOutRecord = new DummyRecord("10", 
UpdateType.SWAP_OUT);
+        swapOutRecord.setSwapLocation("SwapLocation-1");
+        snapshot.update(Collections.singleton(swapOutRecord));
+
+        snapshot.writeSnapshot(snapshot.prepareSnapshot(25L));
+
+        serde.setThrowOOMEAfterNSerializeEdits(3);
+
+        try {
+            snapshot.writeSnapshot(snapshot.prepareSnapshot(150L));
+            Assert.fail("Expected OOME");
+        } catch (final OutOfMemoryError oome) {
+            // expected
+        }
+
+        final SnapshotRecovery<DummyRecord> recovery = snapshot.recover();
+        assertEquals(25L, recovery.getMaxTransactionId());
+
+        final Map<Object, DummyRecord> recordMap = recovery.getRecords();
+        assertEquals(10, recordMap.size());
+        for (int i = 0; i < 10; i++) {
+            assertTrue(recordMap.containsKey(String.valueOf(i)));
+        }
+        for (final Map.Entry<Object, DummyRecord> entry : 
recordMap.entrySet()) {
+            final DummyRecord record = entry.getValue();
+            final Map<String, String> properties = record.getProperties();
+            assertNotNull(properties);
+            assertEquals(1, properties.size());
+            assertEquals(entry.getKey(), properties.get("key"));
+        }
+
+        final Set<String> swapLocations = recovery.getRecoveredSwapLocations();
+        assertEquals(1, swapLocations.size());
+        assertTrue(swapLocations.contains("SwapLocation-1"));
+    }
+
+    @Test
+    public void 
testIOExceptionWhenWritingResultsInPreviousSnapshotStillRecoverable() throws 
IOException {
+        final HashMapSnapshot<DummyRecord> snapshot = new 
HashMapSnapshot<>(storageDirectory, serdeFactory);
+        final Map<String, String> props = new HashMap<>();
+
+        for (int i = 0; i < 11; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            props.put("key", String.valueOf(i));
+            record.setProperties(props);
+            snapshot.update(Collections.singleton(record));
+        }
+
+        final DummyRecord swapOutRecord = new DummyRecord("10", 
UpdateType.SWAP_OUT);
+        swapOutRecord.setSwapLocation("SwapLocation-1");
+        snapshot.update(Collections.singleton(swapOutRecord));
+
+        snapshot.writeSnapshot(snapshot.prepareSnapshot(25L));
+
+        serde.setThrowIOEAfterNSerializeEdits(3);
+
+        for (int i = 0; i < 5; i++) {
+            try {
+                snapshot.writeSnapshot(snapshot.prepareSnapshot(150L));
+                Assert.fail("Expected IOE");
+            } catch (final IOException ioe) {
+                // expected
+            }
+        }
+
+        final SnapshotRecovery<DummyRecord> recovery = snapshot.recover();
+        assertEquals(25L, recovery.getMaxTransactionId());
+
+        final Map<Object, DummyRecord> recordMap = recovery.getRecords();
+        assertEquals(10, recordMap.size());
+        for (int i = 0; i < 10; i++) {
+            assertTrue(recordMap.containsKey(String.valueOf(i)));
+        }
+        for (final Map.Entry<Object, DummyRecord> entry : 
recordMap.entrySet()) {
+            final DummyRecord record = entry.getValue();
+            final Map<String, String> properties = record.getProperties();
+            assertNotNull(properties);
+            assertEquals(1, properties.size());
+            assertEquals(entry.getKey(), properties.get("key"));
+        }
+
+        final Set<String> swapLocations = recovery.getRecoveredSwapLocations();
+        assertEquals(1, swapLocations.size());
+        assertTrue(swapLocations.contains("SwapLocation-1"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
new file mode 100644
index 0000000..94df890
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.wali.DummyRecord;
+import org.wali.DummyRecordSerde;
+import org.wali.SerDeFactory;
+import org.wali.SingletonSerDeFactory;
+import org.wali.UpdateType;
+
+public class TestLengthDelimitedJournal {
+    private final File journalFile = new 
File("target/testLengthDelimitedJournal/testJournal.journal");
+    private SerDeFactory<DummyRecord> serdeFactory;
+    private DummyRecordSerde serde;
+    private ObjectPool<ByteArrayDataOutputStream> streamPool;
+    private static final int BUFFER_SIZE = 4096;
+
+    @Before
+    public void setupJournal() throws IOException {
+        Files.deleteIfExists(journalFile.toPath());
+
+        if (!journalFile.getParentFile().exists()) {
+            Files.createDirectories(journalFile.getParentFile().toPath());
+        }
+
+        serde = new DummyRecordSerde();
+        serdeFactory = new SingletonSerDeFactory<>(serde);
+        streamPool = new BlockingQueuePool<>(1,
+            () -> new ByteArrayDataOutputStream(BUFFER_SIZE),
+            stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE,
+            stream -> stream.getByteArrayOutputStream().reset());
+    }
+
+    @Test
+    public void testHandlingOfTrailingNulBytes() throws IOException {
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            journal.writeHeader();
+
+            final List<DummyRecord> firstTransaction = new ArrayList<>();
+            firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
+            firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
+            firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
+
+            final List<DummyRecord> secondTransaction = new ArrayList<>();
+            secondTransaction.add(new DummyRecord("1", 
UpdateType.UPDATE).setProperty("abc", "123"));
+            secondTransaction.add(new DummyRecord("2", 
UpdateType.UPDATE).setProperty("cba", "123"));
+            secondTransaction.add(new DummyRecord("3", 
UpdateType.UPDATE).setProperty("aaa", "123"));
+
+            final List<DummyRecord> thirdTransaction = new ArrayList<>();
+            thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
+            thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
+
+            journal.update(firstTransaction, id -> null);
+            journal.update(secondTransaction, id -> null);
+            journal.update(thirdTransaction, id -> null);
+        }
+
+        // Truncate the contents of the journal file by 8 bytes. Then replace 
with 28 trailing NUL bytes,
+        // as this is what we often see when we have a sudden power loss.
+        final byte[] contents = Files.readAllBytes(journalFile.toPath());
+        final byte[] truncated = Arrays.copyOfRange(contents, 0, 
contents.length - 8);
+        final byte[] withNuls = new byte[truncated.length + 28];
+        System.arraycopy(truncated, 0, withNuls, 0, truncated.length);
+
+        try (final OutputStream fos = new FileOutputStream(journalFile)) {
+            fos.write(withNuls);
+        }
+
+
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            final Map<Object, DummyRecord> recordMap = new HashMap<>();
+            final Set<String> swapLocations = new HashSet<>();
+
+            journal.recoverRecords(recordMap, swapLocations);
+
+            assertFalse(recordMap.isEmpty());
+            assertEquals(3, recordMap.size());
+
+            final DummyRecord record1 = recordMap.get("1");
+            assertNotNull(record1);
+            assertEquals(Collections.singletonMap("abc", "123"), 
record1.getProperties());
+
+            final DummyRecord record2 = recordMap.get("2");
+            assertNotNull(record2);
+            assertEquals(Collections.singletonMap("cba", "123"), 
record2.getProperties());
+
+            final DummyRecord record3 = recordMap.get("3");
+            assertNotNull(record3);
+            assertEquals(Collections.singletonMap("aaa", "123"), 
record3.getProperties());
+        }
+    }
+
+    @Test
+    public void testUpdateOnlyAppliedIfEntireTransactionApplied() throws 
IOException {
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            journal.writeHeader();
+
+            for (int i = 0; i < 3; i++) {
+                final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+                journal.update(Collections.singleton(record), key -> null);
+            }
+
+            final DummyRecord swapOut1Record = new DummyRecord("1", 
UpdateType.SWAP_OUT);
+            swapOut1Record.setSwapLocation("swap12");
+            journal.update(Collections.singleton(swapOut1Record), id -> null);
+
+            final DummyRecord swapOut2Record = new DummyRecord("2", 
UpdateType.SWAP_OUT);
+            swapOut2Record.setSwapLocation("swap12");
+            journal.update(Collections.singleton(swapOut2Record), id -> null);
+
+            final List<DummyRecord> records = new ArrayList<>();
+            for (int i = 0; i < 10; i++) {
+                final DummyRecord record = new DummyRecord("1" + i, 
UpdateType.CREATE);
+                records.add(record);
+            }
+
+            final DummyRecord swapIn1Record = new DummyRecord("1", 
UpdateType.SWAP_IN);
+            swapIn1Record.setSwapLocation("swap12");
+            records.add(swapIn1Record);
+
+            final DummyRecord swapOut1AgainRecord = new DummyRecord("1", 
UpdateType.SWAP_OUT);
+            swapOut1AgainRecord.setSwapLocation("swap12");
+            records.add(swapOut1AgainRecord);
+
+            final DummyRecord swapIn2Record = new DummyRecord("2", 
UpdateType.SWAP_IN);
+            swapIn2Record.setSwapLocation("swap12");
+            records.add(swapIn2Record);
+
+            final DummyRecord swapOut0Record = new DummyRecord("0", 
UpdateType.SWAP_OUT);
+            swapOut0Record.setSwapLocation("swap0");
+            records.add(swapOut0Record);
+
+            journal.update(records, id -> null);
+        }
+
+        // Truncate the last 8 bytes so that we will get an EOFException when 
reading the last transaction.
+        try (final FileOutputStream fos = new FileOutputStream(journalFile, 
true)) {
+            fos.getChannel().truncate(journalFile.length() - 8);
+        }
+
+
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            final Map<Object, DummyRecord> recordMap = new HashMap<>();
+            final Set<String> swapLocations = new HashSet<>();
+
+            final JournalRecovery recovery = journal.recoverRecords(recordMap, 
swapLocations);
+            assertEquals(5L, recovery.getMaxTransactionId());
+            assertEquals(5, recovery.getUpdateCount());
+
+            final Set<String> expectedSwap = Collections.singleton("swap12");
+            assertEquals(expectedSwap, swapLocations);
+
+            final Map<Object, DummyRecord> expectedRecordMap = new HashMap<>();
+            expectedRecordMap.put("0", new DummyRecord("0", 
UpdateType.CREATE));
+            assertEquals(expectedRecordMap, recordMap);
+        }
+    }
+
+    @Test
+    public void testPoisonedJournalNotWritableAfterIOE() throws IOException {
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            journal.writeHeader();
+
+            serde.setThrowIOEAfterNSerializeEdits(2);
+
+            final DummyRecord firstRecord = new DummyRecord("1", 
UpdateType.CREATE);
+            journal.update(Collections.singleton(firstRecord), key -> null);
+
+            final DummyRecord secondRecord = new DummyRecord("1", 
UpdateType.UPDATE);
+            journal.update(Collections.singleton(secondRecord), key -> 
firstRecord);
+
+            final DummyRecord thirdRecord = new DummyRecord("1", 
UpdateType.UPDATE);
+            final RecordLookup<DummyRecord> lookup = key -> secondRecord;
+            try {
+                journal.update(Collections.singleton(thirdRecord), lookup);
+                Assert.fail("Expected IOException");
+            } catch (final IOException ioe) {
+                // expected
+            }
+
+            serde.setThrowIOEAfterNSerializeEdits(-1);
+
+            final Collection<DummyRecord> records = 
Collections.singleton(thirdRecord);
+            for (int i = 0; i < 10; i++) {
+                try {
+                    journal.update(records, lookup);
+                    Assert.fail("Expected IOException");
+                } catch (final IOException expected) {
+                }
+
+                try {
+                    journal.fsync();
+                    Assert.fail("Expected IOException");
+                } catch (final IOException expected) {
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testPoisonedJournalNotWritableAfterOOME() throws IOException {
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            journal.writeHeader();
+
+            serde.setThrowOOMEAfterNSerializeEdits(2);
+
+            final DummyRecord firstRecord = new DummyRecord("1", 
UpdateType.CREATE);
+            journal.update(Collections.singleton(firstRecord), key -> null);
+
+            final DummyRecord secondRecord = new DummyRecord("1", 
UpdateType.UPDATE);
+            journal.update(Collections.singleton(secondRecord), key -> 
firstRecord);
+
+            final DummyRecord thirdRecord = new DummyRecord("1", 
UpdateType.UPDATE);
+            final RecordLookup<DummyRecord> lookup = key -> secondRecord;
+            try {
+                journal.update(Collections.singleton(thirdRecord), lookup);
+                Assert.fail("Expected OOME");
+            } catch (final OutOfMemoryError oome) {
+                // expected
+            }
+
+            serde.setThrowOOMEAfterNSerializeEdits(-1);
+
+            final Collection<DummyRecord> records = 
Collections.singleton(thirdRecord);
+            for (int i = 0; i < 10; i++) {
+                try {
+                    journal.update(records, lookup);
+                    Assert.fail("Expected IOException");
+                } catch (final IOException expected) {
+                }
+
+                try {
+                    journal.fsync();
+                    Assert.fail("Expected IOException");
+                } catch (final IOException expected) {
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSuccessfulRoundTrip() throws IOException {
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            journal.writeHeader();
+
+            final DummyRecord firstRecord = new DummyRecord("1", 
UpdateType.CREATE);
+            journal.update(Collections.singleton(firstRecord), key -> null);
+
+            final DummyRecord secondRecord = new DummyRecord("1", 
UpdateType.UPDATE);
+            journal.update(Collections.singleton(secondRecord), key -> 
firstRecord);
+
+            final DummyRecord thirdRecord = new DummyRecord("1", 
UpdateType.UPDATE);
+            journal.update(Collections.singleton(thirdRecord), key -> 
secondRecord);
+
+            final Map<Object, DummyRecord> recordMap = new HashMap<>();
+            final Set<String> swapLocations = new HashSet<>();
+            final JournalRecovery recovery = journal.recoverRecords(recordMap, 
swapLocations);
+            assertFalse(recovery.isEOFExceptionEncountered());
+
+            assertEquals(2L, recovery.getMaxTransactionId());
+            assertEquals(3, recovery.getUpdateCount());
+
+            assertTrue(swapLocations.isEmpty());
+            assertEquals(1, recordMap.size());
+
+            final DummyRecord retrieved = recordMap.get("1");
+            assertNotNull(retrieved);
+            assertEquals(thirdRecord, retrieved);
+        }
+    }
+
+    @Test
+    public void testTruncatedJournalFile() throws IOException {
+        final DummyRecord firstRecord, secondRecord;
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            journal.writeHeader();
+
+            firstRecord = new DummyRecord("1", UpdateType.CREATE);
+            journal.update(Collections.singleton(firstRecord), key -> null);
+
+            secondRecord = new DummyRecord("2", UpdateType.CREATE);
+            journal.update(Collections.singleton(secondRecord), key -> 
firstRecord);
+
+            final DummyRecord thirdRecord = new DummyRecord("1", 
UpdateType.UPDATE);
+            journal.update(Collections.singleton(thirdRecord), key -> 
secondRecord);
+        }
+
+        // Truncate the file
+        try (final FileOutputStream fos = new FileOutputStream(journalFile, 
true)) {
+            fos.getChannel().truncate(journalFile.length() - 8);
+        }
+
+        // Ensure that we are able to recover the first two records without an 
issue but the third is lost.
+        try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+            final Map<Object, DummyRecord> recordMap = new HashMap<>();
+            final Set<String> swapLocations = new HashSet<>();
+            final JournalRecovery recovery = journal.recoverRecords(recordMap, 
swapLocations);
+            assertTrue(recovery.isEOFExceptionEncountered());
+
+            assertEquals(2L, recovery.getMaxTransactionId()); // transaction 
ID is still 2 because that's what was written to the journal
+            assertEquals(2, recovery.getUpdateCount()); // only 2 updates 
because the last update will incur an EOFException and be skipped
+
+            assertTrue(swapLocations.isEmpty());
+            assertEquals(2, recordMap.size());
+
+            final DummyRecord retrieved1 = recordMap.get("1");
+            assertNotNull(retrieved1);
+            assertEquals(firstRecord, retrieved1);
+
+            final DummyRecord retrieved2 = recordMap.get("2");
+            assertNotNull(retrieved2);
+            assertEquals(secondRecord, retrieved2);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
new file mode 100644
index 0000000..4fc0fe7
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.wali.DummyRecord;
+import org.wali.DummyRecordSerde;
+import org.wali.SerDeFactory;
+import org.wali.SingletonSerDeFactory;
+import org.wali.UpdateType;
+import org.wali.WriteAheadRepository;
+
+public class TestSequentialAccessWriteAheadLog {
+    @Rule
+    public TestName testName = new TestName();
+
+    @Test
+    public void testRecoverWithNoCheckpoint() throws IOException {
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = 
createWriteRepo();
+
+        final List<DummyRecord> records = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            records.add(record);
+        }
+
+        repo.update(records, false);
+        repo.shutdown();
+
+        final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = 
createRecoveryRepo();
+        final Collection<DummyRecord> recovered = 
recoveryRepo.recoverRecords();
+
+        // ensure that we get the same records back, but the order may be 
different, so wrap both collections
+        // in a HashSet so that we can compare unordered collections of the 
same type.
+        assertEquals(new HashSet<>(records), new HashSet<>(recovered));
+    }
+
+    @Test
+    public void testRecoverWithNoJournalUpdates() throws IOException {
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = 
createWriteRepo();
+
+        final List<DummyRecord> records = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            records.add(record);
+        }
+
+        repo.update(records, false);
+        repo.checkpoint();
+        repo.shutdown();
+
+        final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = 
createRecoveryRepo();
+        final Collection<DummyRecord> recovered = 
recoveryRepo.recoverRecords();
+
+        // ensure that we get the same records back, but the order may be 
different, so wrap both collections
+        // in a HashSet so that we can compare unordered collections of the 
same type.
+        assertEquals(new HashSet<>(records), new HashSet<>(recovered));
+    }
+
+    @Test
+    public void testRecoverWithMultipleCheckpointsBetweenJournalUpdate() 
throws IOException {
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = 
createWriteRepo();
+
+        final List<DummyRecord> records = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            records.add(record);
+        }
+
+        repo.update(records, false);
+
+        for (int i = 0; i < 8; i++) {
+            repo.checkpoint();
+        }
+
+        final DummyRecord updateRecord = new DummyRecord("4", 
UpdateType.UPDATE);
+        updateRecord.setProperties(Collections.singletonMap("updated", 
"true"));
+        repo.update(Collections.singleton(updateRecord), false);
+
+        repo.shutdown();
+
+        final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = 
createRecoveryRepo();
+        final Collection<DummyRecord> recovered = 
recoveryRepo.recoverRecords();
+
+        // what we expect is the same as what we updated with, except we don't 
want the DummyRecord for CREATE 4
+        // because we will instead recover an UPDATE only for 4.
+        final Set<DummyRecord> expected = new HashSet<>(records);
+        expected.remove(new DummyRecord("4", UpdateType.CREATE));
+        expected.add(updateRecord);
+
+        // ensure that we get the same records back, but the order may be 
different, so wrap both collections
+        // in a HashSet so that we can compare unordered collections of the 
same type.
+        assertEquals(expected, new HashSet<>(recovered));
+    }
+
+    private SequentialAccessWriteAheadLog<DummyRecord> createRecoveryRepo() 
throws IOException {
+        final File targetDir = new File("target");
+        final File storageDir = new File(targetDir, testName.getMethodName());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final SerDeFactory<DummyRecord> serdeFactory = new 
SingletonSerDeFactory<>(serde);
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = new 
SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);
+
+        return repo;
+    }
+
+    private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo() 
throws IOException {
+        final File targetDir = new File("target");
+        final File storageDir = new File(targetDir, testName.getMethodName());
+        deleteRecursively(storageDir);
+        assertTrue(storageDir.mkdirs());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final SerDeFactory<DummyRecord> serdeFactory = new 
SingletonSerDeFactory<>(serde);
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = new 
SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);
+
+        final Collection<DummyRecord> recovered = repo.recoverRecords();
+        assertNotNull(recovered);
+        assertTrue(recovered.isEmpty());
+
+        return repo;
+    }
+
+    /**
+     * This test is designed to update the repository in several different 
wants, testing CREATE, UPDATE, SWAP IN, SWAP OUT, and DELETE
+     * update types, as well as testing updates with single records and with 
multiple records in a transaction. It also verifies that we
+     * are able to checkpoint, then update journals, and then recover updates 
to both the checkpoint and the journals.
+     */
+    @Test
+    public void testUpdateThenRecover() throws IOException {
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = 
createWriteRepo();
+
+        final DummyRecord firstCreate = new DummyRecord("0", 
UpdateType.CREATE);
+        repo.update(Collections.singleton(firstCreate), false);
+
+        final List<DummyRecord> creations = new ArrayList<>();
+        for (int i = 1; i < 11; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            creations.add(record);
+        }
+        repo.update(creations, false);
+
+        final DummyRecord deleteRecord3 = new DummyRecord("3", 
UpdateType.DELETE);
+        repo.update(Collections.singleton(deleteRecord3), false);
+
+        final DummyRecord swapOutRecord4 = new DummyRecord("4", 
UpdateType.SWAP_OUT);
+        swapOutRecord4.setSwapLocation("swap");
+
+        final DummyRecord swapOutRecord5 = new DummyRecord("5", 
UpdateType.SWAP_OUT);
+        swapOutRecord5.setSwapLocation("swap");
+
+        final List<DummyRecord> swapOuts = new ArrayList<>();
+        swapOuts.add(swapOutRecord4);
+        swapOuts.add(swapOutRecord5);
+        repo.update(swapOuts, false);
+
+        final DummyRecord swapInRecord5 = new DummyRecord("5", 
UpdateType.SWAP_IN);
+        swapInRecord5.setSwapLocation("swap");
+        repo.update(Collections.singleton(swapInRecord5), false);
+
+        final int recordCount = repo.checkpoint();
+        assertEquals(9, recordCount);
+
+        final DummyRecord updateRecord6 = new DummyRecord("6", 
UpdateType.UPDATE);
+        updateRecord6.setProperties(Collections.singletonMap("greeting", 
"hello"));
+        repo.update(Collections.singleton(updateRecord6), false);
+
+        final List<DummyRecord> updateRecords = new ArrayList<>();
+        for (int i = 7; i < 11; i++) {
+            final DummyRecord updateRecord = new 
DummyRecord(String.valueOf(i), UpdateType.UPDATE);
+            updateRecord.setProperties(Collections.singletonMap("greeting", 
"hi"));
+            updateRecords.add(updateRecord);
+        }
+
+        final DummyRecord deleteRecord2 = new DummyRecord("2", 
UpdateType.DELETE);
+        updateRecords.add(deleteRecord2);
+
+        repo.update(updateRecords, false);
+
+        repo.shutdown();
+
+        final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = 
createRecoveryRepo();
+        final Collection<DummyRecord> recoveredRecords = 
recoveryRepo.recoverRecords();
+
+        // We should now have records:
+        // 0-10 CREATED
+        // 2 & 3 deleted
+        // 4 & 5 swapped out
+        // 5 swapped back in
+        // 6 updated with greeting = hello
+        // 7-10 updated with greeting = hi
+
+        assertEquals(8, recoveredRecords.size());
+        final Map<String, DummyRecord> recordMap = recoveredRecords.stream()
+            .collect(Collectors.toMap(record -> record.getId(), 
Function.identity()));
+
+        assertFalse(recordMap.containsKey("2"));
+        assertFalse(recordMap.containsKey("3"));
+        assertFalse(recordMap.containsKey("4"));
+
+        assertTrue(recordMap.get("1").getProperties().isEmpty());
+        assertTrue(recordMap.get("5").getProperties().isEmpty());
+
+        assertEquals("hello", 
recordMap.get("6").getProperties().get("greeting"));
+
+        for (int i = 7; i < 11; i++) {
+            assertEquals("hi", 
recordMap.get(String.valueOf(i)).getProperties().get("greeting"));
+        }
+
+        recoveryRepo.shutdown();
+    }
+
+
+    @Test
+    @Ignore("For manual performance testing")
+    public void testUpdatePerformance() throws IOException, 
InterruptedException {
+        final Path path = Paths.get("target/sequential-access-repo");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final SerDeFactory<DummyRecord> serdeFactory = new 
SingletonSerDeFactory<>(serde);
+
+        final WriteAheadRepository<DummyRecord> repo = new 
SequentialAccessWriteAheadLog<>(path.toFile(), serdeFactory);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final long updateCountPerThread = 1_000_000;
+        final int numThreads = 4;
+
+        final Thread[] threads = new Thread[numThreads];
+        final int batchSize = 1;
+
+        long previousBytes = 0L;
+
+        for (int j = 0; j < 2; j++) {
+            for (int i = 0; i < numThreads; i++) {
+                final Thread t = new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        final List<DummyRecord> batch = new ArrayList<>();
+                        for (int i = 0; i < updateCountPerThread / batchSize; 
i++) {
+                            batch.clear();
+                            for (int j = 0; j < batchSize; j++) {
+                                final DummyRecord record = new 
DummyRecord(String.valueOf(i), UpdateType.CREATE);
+                                batch.add(record);
+                            }
+
+                            try {
+                                repo.update(batch, false);
+                            } catch (Throwable t) {
+                                t.printStackTrace();
+                                Assert.fail(t.toString());
+                            }
+                        }
+                    }
+                });
+
+                threads[i] = t;
+            }
+
+            final long start = System.nanoTime();
+            for (final Thread t : threads) {
+                t.start();
+            }
+            for (final Thread t : threads) {
+                t.join();
+            }
+
+            long bytes = 0L;
+            for (final File journalFile : 
path.resolve("journals").toFile().listFiles()) {
+                bytes += journalFile.length();
+            }
+
+            bytes -= previousBytes;
+            previousBytes = bytes;
+
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            final long eventsPerSecond = (updateCountPerThread * numThreads * 
1000) / millis;
+            final String eps = 
NumberFormat.getInstance().format(eventsPerSecond);
+            final long bytesPerSecond = bytes * 1000 / millis;
+            final String bps = 
NumberFormat.getInstance().format(bytesPerSecond);
+
+            if (j == 0) {
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numThreads
+                    + " threads, *as a warmup!*  " + eps + " events per 
second, " + bps + " bytes per second");
+            } else {
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numThreads
+                    + " threads, " + eps + " events per second, " + bps + " 
bytes per second");
+            }
+        }
+    }
+
+    private void deleteRecursively(final File file) {
+        final File[] children = file.listFiles();
+        if (children != null) {
+            for (final File child : children) {
+                deleteRecursively(child);
+            }
+        }
+
+        file.delete();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
index bf15ba7..1ae7178 100644
--- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
+++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
@@ -19,12 +19,14 @@ package org.wali;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 public class DummyRecord {
 
     private final String id;
     private final Map<String, String> props;
     private final UpdateType updateType;
+    private String swapLocation;
 
     public DummyRecord(final String id, final UpdateType updateType) {
         this.id = id;
@@ -59,8 +61,37 @@ public class DummyRecord {
         return props.get(name);
     }
 
+    public String getSwapLocation() {
+        return swapLocation;
+    }
+
+    public void setSwapLocation(String swapLocation) {
+        this.swapLocation = swapLocation;
+    }
+
     @Override
     public String toString() {
         return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" 
+ updateType + "]";
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.id, this.props, this.updateType, 
this.swapLocation);
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof DummyRecord)) {
+            return false;
+        }
+        final DummyRecord other = (DummyRecord) obj;
+        return Objects.equals(id, other.id) && Objects.equals(props, 
other.props) && Objects.equals(updateType, other.updateType) && 
Objects.equals(swapLocation, other.swapLocation);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
index e9f3b01..1f6aede 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
@@ -27,6 +27,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
     private int throwOOMEAfterNserializeEdits = -1;
     private int serializeEditCount = 0;
 
+    @SuppressWarnings("fallthrough")
     @Override
     public void serializeEdit(final DummyRecord previousState, final 
DummyRecord record, final DataOutputStream out) throws IOException {
         if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= 
throwIOEAfterNserializeEdits)) {
@@ -39,14 +40,28 @@ public class DummyRecordSerde implements SerDe<DummyRecord> 
{
         out.writeUTF(record.getUpdateType().name());
         out.writeUTF(record.getId());
 
-        if (record.getUpdateType() != UpdateType.DELETE) {
-            final Map<String, String> props = record.getProperties();
-            out.writeInt(props.size());
-            for (final Map.Entry<String, String> entry : props.entrySet()) {
-                out.writeUTF(entry.getKey());
-                out.writeUTF(entry.getValue());
+        switch (record.getUpdateType()) {
+            case DELETE:
+                break;
+            case SWAP_IN: {
+                out.writeUTF(record.getSwapLocation());
+                // intentionally fall through to CREATE/UPDATE block
             }
+            case CREATE:
+            case UPDATE: {
+                    final Map<String, String> props = record.getProperties();
+                    out.writeInt(props.size());
+                    for (final Map.Entry<String, String> entry : 
props.entrySet()) {
+                        out.writeUTF(entry.getKey());
+                        out.writeUTF(entry.getValue());
+                    }
+                }
+                break;
+            case SWAP_OUT:
+                out.writeUTF(record.getSwapLocation());
+                break;
         }
+
     }
 
     @Override
@@ -55,20 +70,36 @@ public class DummyRecordSerde implements SerDe<DummyRecord> 
{
     }
 
     @Override
+    @SuppressWarnings("fallthrough")
     public DummyRecord deserializeRecord(final DataInputStream in, final int 
version) throws IOException {
         final String updateTypeName = in.readUTF();
         final UpdateType updateType = UpdateType.valueOf(updateTypeName);
         final String id = in.readUTF();
         final DummyRecord record = new DummyRecord(id, updateType);
 
-        if (record.getUpdateType() != UpdateType.DELETE) {
-            final int numProps = in.readInt();
-            for (int i = 0; i < numProps; i++) {
-                final String key = in.readUTF();
-                final String value = in.readUTF();
-                record.setProperty(key, value);
+        switch (record.getUpdateType()) {
+            case DELETE:
+                break;
+            case SWAP_IN: {
+                final String swapLocation = in.readUTF();
+                record.setSwapLocation(swapLocation);
+                // intentionally fall through to the CREATE/UPDATE block
             }
+            case CREATE:
+            case UPDATE:
+                final int numProps = in.readInt();
+                for (int i = 0; i < numProps; i++) {
+                    final String key = in.readUTF();
+                    final String value = in.readUTF();
+                    record.setProperty(key, value);
+                }
+                break;
+            case SWAP_OUT:
+                final String swapLocation = in.readUTF();
+                record.setSwapLocation(swapLocation);
+                break;
         }
+
         return record;
     }
 
@@ -102,6 +133,6 @@ public class DummyRecordSerde implements SerDe<DummyRecord> 
{
 
     @Override
     public String getLocation(final DummyRecord record) {
-        return null;
+        return record.getSwapLocation();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index ef33f57..20009d1 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -34,6 +34,7 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -139,9 +140,9 @@ public class TestMinimalLockingWriteAheadLog {
     }
 
     @Test
-    @Ignore("for local testing only")
+    @Ignore("For manual performance testing")
     public void testUpdatePerformance() throws IOException, 
InterruptedException {
-        final int numPartitions = 4;
+        final int numPartitions = 16;
 
         final Path path = Paths.get("target/minimal-locking-repo");
         deleteRecursively(path.toFile());
@@ -152,23 +153,34 @@ public class TestMinimalLockingWriteAheadLog {
         final Collection<DummyRecord> initialRecs = repo.recoverRecords();
         assertTrue(initialRecs.isEmpty());
 
-        final int updateCountPerThread = 1_000_000;
-        final int numThreads = 16;
+        final long updateCountPerThread = 1_000_000;
+        final int numThreads = 4;
 
         final Thread[] threads = new Thread[numThreads];
 
+        final int batchSize = 1;
+
+        long previousBytes = 0;
+
         for (int j = 0; j < 2; j++) {
             for (int i = 0; i < numThreads; i++) {
                 final Thread t = new Thread(new Runnable() {
                     @Override
                     public void run() {
-                        for (int i = 0; i < updateCountPerThread; i++) {
-                            final DummyRecord record = new 
DummyRecord(String.valueOf(i), UpdateType.CREATE);
+                        final List<DummyRecord> batch = new ArrayList<>();
+
+                        for (int i = 0; i < updateCountPerThread / batchSize; 
i++) {
+                            batch.clear();
+                            for (int j = 0; j < batchSize; j++) {
+                                final DummyRecord record = new 
DummyRecord(String.valueOf(i), UpdateType.CREATE);
+                                batch.add(record);
+                            }
+
                             try {
-                                repo.update(Collections.singleton(record), 
false);
-                            } catch (IOException e) {
-                                e.printStackTrace();
-                                Assert.fail(e.toString());
+                                repo.update(batch, false);
+                            } catch (Throwable t) {
+                                t.printStackTrace();
+                                Assert.fail(t.toString());
                             }
                         }
                     }
@@ -185,11 +197,30 @@ public class TestMinimalLockingWriteAheadLog {
                 t.join();
             }
 
+            long bytes = 0L;
+            for (final File file : path.toFile().listFiles()) {
+                if (file.getName().startsWith("partition-")) {
+                    for (final File journalFile : file.listFiles()) {
+                        bytes += journalFile.length();
+                    }
+                }
+            }
+
+            bytes -= previousBytes;
+            previousBytes = bytes;
+
             final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            final long eventsPerSecond = (updateCountPerThread * numThreads * 
1000) / millis;
+            final String eps = 
NumberFormat.getInstance().format(eventsPerSecond);
+            final long bytesPerSecond = bytes * 1000 / millis;
+            final String bps = 
NumberFormat.getInstance().format(bytesPerSecond);
+
             if (j == 0) {
-                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numPartitions + " 
partitions and " + numThreads + " threads, *as a warmup!*");
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numThreads + " threads, 
*as a warmup!*  "
+                    + eps + " events per second, " + bps + " bytes per 
second");
             } else {
-                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numPartitions + " 
partitions and " + numThreads + " threads");
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numThreads + " threads, 
"
+                    + eps + " events per second, " + bps + " bytes per 
second");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 00dde06..3901029 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,10 +16,10 @@
  */
 package org.apache.nifi.controller.repository;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -47,6 +48,7 @@ import 
org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wali.MinimalLockingWriteAheadLog;
@@ -86,7 +88,8 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private volatile ScheduledFuture<?> checkpointFuture;
 
     private final long checkpointDelayMillis;
-    private final SortedSet<Path> flowFileRepositoryPaths = new TreeSet<>();
+    private final File flowFileRepositoryPath;
+    private final List<File> recoveryFiles = new ArrayList<>();
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
 
@@ -126,16 +129,23 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         checkpointDelayMillis = 0l;
         numPartitions = 0;
         checkpointExecutor = null;
+        flowFileRepositoryPath = null;
     }
 
     public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
         alwaysSync = 
Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC,
 "false"));
 
         // determine the database file path and ensure it exists
+        final String directoryName = 
nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
+        flowFileRepositoryPath = new File(directoryName);
+
+        // We used to use the MinimalLockingWriteAheadLog, but we now use the 
SequentialAccessWriteAheadLog. Since the
+        // MinimalLockingWriteAheadLog supports multiple partitions, we need 
to ensure that we recover records from all
+        // partitions, so we build up a List of Files for the recovery files.
         for (final String propertyName : nifiProperties.getPropertyKeys()) {
             if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) 
{
-                final String directoryName = 
nifiProperties.getProperty(propertyName);
-                flowFileRepositoryPaths.add(Paths.get(directoryName));
+                final String dirName = 
nifiProperties.getProperty(propertyName);
+                recoveryFiles.add(new File(dirName));
             }
         }
 
@@ -149,16 +159,14 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     public void initialize(final ResourceClaimManager claimManager) throws 
IOException {
         this.claimManager = claimManager;
 
-        for (final Path path : flowFileRepositoryPaths) {
-            Files.createDirectories(path);
-        }
+        Files.createDirectories(flowFileRepositoryPath.toPath());
 
         // TODO: Should ensure that only 1 instance running and pointing at a 
particular path
         // TODO: Allow for backup path that can be used if disk out of space?? 
Would allow a snapshot to be stored on
         // backup and then the data deleted from the normal location; then can 
move backup to normal location and
         // delete backup. On restore, if no files exist in partition's 
directory, would have to check backup directory
         serdeFactory = new RepositoryRecordSerdeFactory(claimManager);
-        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPaths, 
numPartitions, serdeFactory, this);
+        wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPath, 
serdeFactory, this);
         logger.info("Initialized FlowFile Repository using {} partitions", 
numPartitions);
     }
 
@@ -179,22 +187,12 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public long getStorageCapacity() throws IOException {
-        long capacity = 0L;
-        for (final Path path : flowFileRepositoryPaths) {
-            capacity += Files.getFileStore(path).getTotalSpace();
-        }
-
-        return capacity;
+        return 
Files.getFileStore(flowFileRepositoryPath.toPath()).getTotalSpace();
     }
 
     @Override
     public long getUsableStorageSpace() throws IOException {
-        long usableSpace = 0L;
-        for (final Path path : flowFileRepositoryPaths) {
-            usableSpace += Files.getFileStore(path).getUsableSpace();
-        }
-
-        return usableSpace;
+        return 
Files.getFileStore(flowFileRepositoryPath.toPath()).getUsableSpace();
     }
 
     @Override
@@ -371,6 +369,72 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         logger.info("Repository updated to reflect that {} FlowFiles were 
swapped in to {}", new Object[]{swapRecords.size(), queue});
     }
 
+
+    @SuppressWarnings("deprecation")
+    private Optional<Collection<RepositoryRecord>> 
recoverFromOldWriteAheadLog() throws IOException {
+        final List<File> partitionDirs = new ArrayList<>();
+        for (final File recoveryFile : recoveryFiles) {
+            final File[] partitions = recoveryFile.listFiles(file -> 
file.getName().startsWith("partition-"));
+            for (final File partition : partitions) {
+                partitionDirs.add(partition);
+            }
+        }
+
+        if (partitionDirs == null || partitionDirs.isEmpty()) {
+            return Optional.empty();
+        }
+
+        logger.info("Encountered FlowFile Repository that was written using an 
old version of the Write-Ahead Log. "
+            + "Will recover from this version and re-write the repository 
using the new version of the Write-Ahead Log.");
+
+        final SortedSet<Path> paths = recoveryFiles.stream()
+            .map(File::toPath)
+            .collect(Collectors.toCollection(TreeSet::new));
+
+        final Collection<RepositoryRecord> recordList;
+        final MinimalLockingWriteAheadLog<RepositoryRecord> minimalLockingWal 
= new MinimalLockingWriteAheadLog<>(paths, partitionDirs.size(), serdeFactory, 
null);
+        try {
+            recordList = minimalLockingWal.recoverRecords();
+        } finally {
+            minimalLockingWal.shutdown();
+        }
+
+        wal.update(recordList, true);
+
+        // Delete the old repository
+        logger.info("Successfully recovered files from existing Write-Ahead 
Log and transitioned to new implementation. Will now delete old files.");
+        for (final File partitionDir : partitionDirs) {
+            final File[] children = partitionDir.listFiles();
+
+            if (children != null) {
+                for (final File child : children) {
+                    final boolean deleted = child.delete();
+                    if (!deleted) {
+                        logger.warn("Failed to delete old file {}; this file 
should be cleaned up manually", child);
+                    }
+                }
+            }
+
+            if (!partitionDir.delete()) {
+                logger.warn("Failed to delete old directory {}; this directory 
should be cleaned up manually", partitionDir);
+            }
+        }
+
+        for (final File recoveryFile : recoveryFiles) {
+            final File snapshotFile = new File(recoveryFile, "snapshot");
+            if (!snapshotFile.delete() && snapshotFile.exists()) {
+                logger.warn("Failed to delete old file {}; this file should be 
cleaned up manually", snapshotFile);
+            }
+
+            final File partialFile = new File(recoveryFile, 
"snapshot.partial");
+            if (!partialFile.delete() && partialFile.exists()) {
+                logger.warn("Failed to delete old file {}; this file should be 
cleaned up manually", partialFile);
+            }
+        }
+
+        return Optional.of(recordList);
+    }
+
     @Override
     public long loadFlowFiles(final QueueProvider queueProvider, final long 
minimumSequenceNumber) throws IOException {
         final Map<String, FlowFileQueue> queueMap = new HashMap<>();
@@ -378,7 +442,15 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             queueMap.put(queue.getIdentifier(), queue);
         }
         serdeFactory.setQueueMap(queueMap);
-        final Collection<RepositoryRecord> recordList = wal.recoverRecords();
+
+        // Since we used to use the MinimalLockingWriteAheadRepository, we 
need to ensure that if the FlowFile
+        // Repo was written using that impl, that we properly recover from the 
implementation.
+        Collection<RepositoryRecord> recordList = wal.recoverRecords();
+
+        if (recordList == null || recordList.isEmpty()) {
+            recordList = recoverFromOldWriteAheadLog().orElse(new 
ArrayList<>());
+        }
+
         serdeFactory.setQueueMap(null);
 
         for (final RepositoryRecord record : recordList) {

Reply via email to