Repository: nifi
Updated Branches:
  refs/heads/master 5a25884f5 -> 1be087147


http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
new file mode 100644
index 0000000..bae2364
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class AbstractTestRecordReaderWriter {
+    @BeforeClass
+    public static void setLogLevel() {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", 
"INFO");
+    }
+
+    protected ProvenanceEventRecord createEvent() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "1.txt");
+        attributes.put("uuid", UUID.randomUUID().toString());
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final ProvenanceEventRecord record = builder.build();
+
+        return record;
+    }
+
+    @Test
+    public void testSimpleWriteWithToc() throws IOException {
+        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+        final RecordWriter writer = createWriter(journalFile, tocWriter, 
false, 1024 * 1024);
+
+        writer.writeHeader(1L);
+        writer.writeRecord(createEvent(), 1L);
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+            final RecordReader reader = createReader(fis, 
journalFile.getName(), tocReader, 2048)) {
+            assertEquals(0, reader.getBlockIndex());
+            reader.skipToBlock(0);
+            final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
+            assertNotNull(recovered);
+
+            assertEquals("nifi://unit-test", recovered.getTransitUri());
+            assertNull(reader.nextRecord());
+        }
+
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+    }
+
+
+    @Test
+    public void testSingleRecordCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+        final RecordWriter writer = createWriter(journalFile, tocWriter, true, 
8192);
+
+        writer.writeHeader(1L);
+        writer.writeRecord(createEvent(), 1L);
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+            final RecordReader reader = createReader(fis, 
journalFile.getName(), tocReader, 2048)) {
+            assertEquals(0, reader.getBlockIndex());
+            reader.skipToBlock(0);
+            final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
+            assertNotNull(recovered);
+
+            assertEquals("nifi://unit-test", recovered.getTransitUri());
+            assertNull(reader.nextRecord());
+        }
+
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+    }
+
+
+    @Test
+    public void testMultipleRecordsSameBlockCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+        // new record each 1 MB of uncompressed data
+        final RecordWriter writer = createWriter(journalFile, tocWriter, true, 
1024 * 1024);
+
+        writer.writeHeader(1L);
+        for (int i = 0; i < 10; i++) {
+            writer.writeRecord(createEvent(), i);
+        }
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+            final RecordReader reader = createReader(fis, 
journalFile.getName(), tocReader, 2048)) {
+            for (int i = 0; i < 10; i++) {
+                assertEquals(0, reader.getBlockIndex());
+
+                // call skipToBlock half the time to ensure that we can; avoid 
calling it
+                // the other half of the time to ensure that it's okay.
+                if (i <= 5) {
+                    reader.skipToBlock(0);
+                }
+
+                final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
+                assertNotNull(recovered);
+                assertEquals("nifi://unit-test", recovered.getTransitUri());
+            }
+
+            assertNull(reader.nextRecord());
+        }
+
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+    }
+
+
+    @Test
+    public void testMultipleRecordsMultipleBlocksCompressed() throws 
IOException {
+        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+        // new block each 10 bytes
+        final RecordWriter writer = createWriter(journalFile, tocWriter, true, 
100);
+
+        writer.writeHeader(1L);
+        for (int i = 0; i < 10; i++) {
+            writer.writeRecord(createEvent(), i);
+        }
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+            final RecordReader reader = createReader(fis, 
journalFile.getName(), tocReader, 2048)) {
+            for (int i = 0; i < 10; i++) {
+                final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
+                System.out.println(recovered);
+                assertNotNull(recovered);
+                assertEquals(i, recovered.getEventId());
+                assertEquals("nifi://unit-test", recovered.getTransitUri());
+
+                final Map<String, String> updatedAttrs = 
recovered.getUpdatedAttributes();
+                assertNotNull(updatedAttrs);
+                assertEquals(2, updatedAttrs.size());
+                assertEquals("1.txt", updatedAttrs.get("filename"));
+                assertTrue(updatedAttrs.containsKey("uuid"));
+            }
+
+            assertNull(reader.nextRecord());
+        }
+
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+    }
+
+    protected abstract RecordWriter createWriter(File file, TocWriter 
tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException;
+
+    protected abstract RecordReader createReader(InputStream in, String 
journalFilename, TocReader tocReader, int maxAttributeSize) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/LoopingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/LoopingInputStream.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/LoopingInputStream.java
new file mode 100644
index 0000000..b6a0bfa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/LoopingInputStream.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LoopingInputStream extends InputStream {
+
+    private final byte[] buffer;
+    private int index;
+
+    private final byte[] header;
+    private int headerIndex;
+    private boolean headerComplete = false;
+
+    public LoopingInputStream(final byte[] header, final byte[] toRepeat) {
+        this.header = header;
+        this.buffer = toRepeat;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (headerComplete) {
+            final byte nextByte = buffer[index++];
+            if (index >= buffer.length) {
+                index = 0;
+            }
+
+            final int returnValue = nextByte & 0xFF;
+            return returnValue;
+        } else {
+            final byte nextByte = header[headerIndex++];
+            if (headerIndex >= header.length) {
+                headerComplete = true;
+            }
+
+            final int returnValue = nextByte & 0xFF;
+            return returnValue;
+        }
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws 
IOException {
+        if (headerComplete) {
+            final int toRead = Math.min(len, buffer.length - index);
+            System.arraycopy(buffer, index, b, off, toRead);
+            index += toRead;
+            if (index >= buffer.length) {
+                index = 0;
+            }
+
+            return toRead;
+        } else {
+            final int toRead = Math.min(len, header.length - headerIndex);
+            System.arraycopy(header, headerIndex, b, off, toRead);
+            headerIndex += toRead;
+            if (headerIndex >= header.length) {
+                headerComplete = true;
+            }
+
+            return toRead;
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        return 1;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 80a67eb..dec2a7b 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -38,7 +38,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -86,9 +85,6 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1161,6 +1157,7 @@ public class TestPersistentProvenanceRepository {
     }
 
     @Test
+    @Ignore("This test relies too much on timing of background events by using 
Thread.sleep().")
     public void testIndexDirectoryRemoved() throws InterruptedException, 
IOException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxRecordLife(5, TimeUnit.MINUTES);
@@ -1198,6 +1195,10 @@ public class TestPersistentProvenanceRepository {
 
         Thread.sleep(2000L);
 
+        final FileFilter indexFileFilter = file -> 
file.getName().startsWith("index");
+        final int numIndexDirs = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter).length;
+        assertEquals(1, numIndexDirs);
+
         // add more records so that we will create a new index
         final long secondBatchStartTime = System.currentTimeMillis();
         for (int i = 0; i < 10; i++) {
@@ -1221,12 +1222,6 @@ public class TestPersistentProvenanceRepository {
         assertEquals(20, result.getMatchingEvents().size());
 
         // Ensure index directories exists
-        final FileFilter indexFileFilter = new FileFilter() {
-            @Override
-            public boolean accept(File pathname) {
-                return pathname.getName().startsWith("index");
-            }
-        };
         File[] indexDirs = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
         assertEquals(2, indexDirs.length);
 
@@ -1777,8 +1772,12 @@ public class TestPersistentProvenanceRepository {
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
         repo.initialize(getEventReporter(), null, null);
 
+        final String maxLengthChars = 
"12345678901234567890123456789012345678901234567890";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
+        attributes.put("51chars", 
"123456789012345678901234567890123456789012345678901");
+        attributes.put("50chars", 
"12345678901234567890123456789012345678901234567890");
+        attributes.put("49chars", 
"1234567890123456789012345678901234567890123456789");
         attributes.put("nullChar", null);
 
         final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
@@ -1797,11 +1796,14 @@ public class TestPersistentProvenanceRepository {
         final ProvenanceEventRecord retrieved = repo.getEvent(0L, null);
         assertNotNull(retrieved);
         assertEquals("12345678-0000-0000-0000-012345678912", 
retrieved.getAttributes().get("uuid"));
-        assertEquals("12345678901234567890123456789012345678901234567890", 
retrieved.getAttributes().get("75chars"));
+        assertEquals(maxLengthChars, retrieved.getAttributes().get("75chars"));
+        assertEquals(maxLengthChars, retrieved.getAttributes().get("51chars"));
+        assertEquals(maxLengthChars, retrieved.getAttributes().get("50chars"));
+        assertEquals(maxLengthChars.substring(0, 49), 
retrieved.getAttributes().get("49chars"));
     }
 
 
-    @Test(timeout=5000)
+    @Test(timeout = 15000)
     public void testExceptionOnIndex() throws IOException {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxAttributeChars(50);
@@ -1914,112 +1916,6 @@ public class TestPersistentProvenanceRepository {
     }
 
 
-    @Test
-    public void testBehaviorOnOutOfMemory() throws IOException, 
InterruptedException {
-        final RepositoryConfiguration config = createConfiguration();
-        config.setMaxEventFileLife(3, TimeUnit.MINUTES);
-        config.setJournalCount(4);
-
-        // Create a repository that overrides the createWriters() method so 
that we can return writers that will throw
-        // OutOfMemoryError where we want to
-        final AtomicBoolean causeOOME = new AtomicBoolean(false);
-        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
-            @Override
-            protected RecordWriter[] createWriters(RepositoryConfiguration 
config, long initialRecordId) throws IOException {
-                final RecordWriter[] recordWriters = 
super.createWriters(config, initialRecordId);
-
-                // Spy on each of the writers so that a call to writeUUID 
throws an OutOfMemoryError if we set the
-                // causeOOME flag to true
-                final StandardRecordWriter[] spiedWriters = new 
StandardRecordWriter[recordWriters.length];
-                for (int i = 0; i < recordWriters.length; i++) {
-                    final StandardRecordWriter writer = (StandardRecordWriter) 
recordWriters[i];
-
-                    spiedWriters[i] = Mockito.spy(writer);
-                    Mockito.doAnswer(new Answer<Object>() {
-                        @Override
-                        public Object answer(final InvocationOnMock 
invocation) throws Throwable {
-                            if (causeOOME.get()) {
-                                throw new OutOfMemoryError();
-                            } else {
-                                writer.writeUUID(invocation.getArgumentAt(0, 
DataOutputStream.class), invocation.getArgumentAt(1, String.class));
-                            }
-                            return null;
-                        }
-                    
}).when(spiedWriters[i]).writeUUID(Mockito.any(DataOutputStream.class), 
Mockito.any(String.class));
-                }
-
-                // return the writers that we are spying on
-                return spiedWriters;
-            }
-        };
-        repo.initialize(getEventReporter(), null, null);
-
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
-
-        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
-        builder.setEventTime(System.currentTimeMillis());
-        builder.setEventType(ProvenanceEventType.RECEIVE);
-        builder.setTransitUri("nifi://unit-test");
-        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
-        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
-        builder.setComponentId("1234");
-        builder.setComponentType("dummy processor");
-
-        // first make sure that we are able to write to the repo successfully.
-        for (int i = 0; i < 4; i++) {
-            final ProvenanceEventRecord record = builder.build();
-            repo.registerEvent(record);
-        }
-
-        // cause OOME to occur
-        causeOOME.set(true);
-
-        // write 4 times to make sure that we mark all partitions as dirty
-        for (int i = 0; i < 4; i++) {
-            final ProvenanceEventRecord record = builder.build();
-            try {
-                repo.registerEvent(record);
-                Assert.fail("Expected OutOfMemoryError but was able to 
register event");
-            } catch (final OutOfMemoryError oome) {
-            }
-        }
-
-        // now that all partitions are dirty, ensure that as we keep trying to 
write, we get an IllegalStateException
-        // and that we don't corrupt the repository by writing partial records
-        for (int i = 0; i < 8; i++) {
-            final ProvenanceEventRecord record = builder.build();
-            try {
-                repo.registerEvent(record);
-                Assert.fail("Expected OutOfMemoryError but was able to 
register event");
-            } catch (final IllegalStateException ise) {
-            }
-        }
-
-        // close repo so that we can create a new one to recover records
-        repo.close();
-
-        // make sure we can recover
-        final PersistentProvenanceRepository recoveryRepo = new 
PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
-            @Override
-            protected Set<File> recoverJournalFiles() throws IOException {
-                try {
-                    return super.recoverJournalFiles();
-                } catch (final IOException ioe) {
-                    Assert.fail("Failed to recover properly");
-                    return null;
-                }
-            }
-        };
-
-        try {
-            recoveryRepo.initialize(getEventReporter(), null, null);
-        } finally {
-            recoveryRepo.close();
-        }
-    }
-
-
     private static class ReportedEvent {
         private final Severity severity;
         private final String category;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
new file mode 100644
index 0000000..b9bb85e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.nifi.provenance.schema.EventRecord;
+import org.apache.nifi.provenance.schema.EventRecordFields;
+import org.apache.nifi.provenance.schema.ProvenanceEventSchema;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.NopTocWriter;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.repository.schema.FieldMapRecord;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter {
+
+    private File journalFile;
+    private File tocFile;
+
+    @Before
+    public void setup() {
+        journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testFieldAddedToSchema");
+        tocFile = TocUtil.getTocFile(journalFile);
+    }
+
+
+    @Test
+    public void testFieldAddedToSchema() throws IOException {
+        final RecordField unitTestField = new SimpleRecordField("Unit Test 
Field", FieldType.STRING, Repetition.EXACTLY_ONE);
+        final Consumer<List<RecordField>> schemaModifier = fields -> 
fields.add(unitTestField);
+
+        final Map<RecordField, Object> toAdd = new HashMap<>();
+        toAdd.put(unitTestField, "hello");
+
+        try (final ByteArraySchemaRecordWriter writer = 
createSchemaWriter(schemaModifier, toAdd)) {
+            writer.writeHeader(1L);
+            writer.writeRecord(createEvent(), 3L);
+            writer.writeRecord(createEvent(), 3L);
+        }
+
+        try (final InputStream in = new FileInputStream(journalFile);
+            final TocReader tocReader = new StandardTocReader(tocFile);
+            final RecordReader reader = createReader(in, 
journalFile.getName(), tocReader, 10000)) {
+
+            for (int i = 0; i < 2; i++) {
+                final StandardProvenanceEventRecord event = 
reader.nextRecord();
+                assertNotNull(event);
+                assertEquals(3L, event.getEventId());
+                assertEquals("1234", event.getComponentId());
+                assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+
+                assertNotNull(event.getUpdatedAttributes());
+                assertFalse(event.getUpdatedAttributes().isEmpty());
+            }
+        }
+    }
+
+    @Test
+    public void testFieldRemovedFromSchema() throws IOException {
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+        try {
+            // Create a schema that has the fields modified
+            final RecordSchema schemaV1 = 
ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1;
+            final List<RecordField> fields = new 
ArrayList<>(schemaV1.getFields());
+            fields.remove(new 
SimpleRecordField(EventRecordFields.Names.UPDATED_ATTRIBUTES, FieldType.STRING, 
Repetition.EXACTLY_ONE));
+            fields.remove(new 
SimpleRecordField(EventRecordFields.Names.PREVIOUS_ATTRIBUTES, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+            final RecordSchema recordSchema = new RecordSchema(fields);
+
+            // Create a record writer whose schema does not contain updated 
attributes or previous attributes.
+            // This means that we must also override the method that writes 
out attributes so that we are able
+            // to avoid actually writing them out.
+            final ByteArraySchemaRecordWriter writer = new 
ByteArraySchemaRecordWriter(journalFile, tocWriter, false, 0) {
+                @Override
+                public void writeHeader(long firstEventId, DataOutputStream 
out) throws IOException {
+                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                    recordSchema.writeTo(baos);
+
+                    out.writeInt(baos.size());
+                    baos.writeTo(out);
+                }
+
+                @Override
+                protected Record createRecord(final ProvenanceEventRecord 
event, final long eventId) {
+                    final RecordSchema contentClaimSchema = new 
RecordSchema(recordSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields());
+                    return new EventRecord(event, eventId, recordSchema, 
contentClaimSchema);
+                }
+            };
+
+            try {
+                writer.writeHeader(1L);
+                writer.writeRecord(createEvent(), 3L);
+                writer.writeRecord(createEvent(), 3L);
+            } finally {
+                writer.close();
+            }
+        } finally {
+            tocWriter.close();
+        }
+
+        // Read the records in and make sure that they have the info that we 
expect.
+        try (final InputStream in = new FileInputStream(journalFile);
+            final TocReader tocReader = new StandardTocReader(tocFile);
+            final RecordReader reader = createReader(in, 
journalFile.getName(), tocReader, 10000)) {
+
+            for (int i = 0; i < 2; i++) {
+                final StandardProvenanceEventRecord event = 
reader.nextRecord();
+                assertNotNull(event);
+                assertEquals(3L, event.getEventId());
+                assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+
+                // We will still have a Map<String, String> for updated 
attributes because the
+                // Provenance Event Builder will create an empty map.
+                assertNotNull(event.getUpdatedAttributes());
+                assertTrue(event.getUpdatedAttributes().isEmpty());
+            }
+        }
+    }
+
+
+    /**
+     * Creates a SchemaRecordWriter that uses a modified schema
+     *
+     * @param fieldModifier the callback for modifying the schema
+     * @return a SchemaRecordWriter that uses the modified schema
+     * @throws IOException if unable to create the writer
+     */
+    private ByteArraySchemaRecordWriter createSchemaWriter(final 
Consumer<List<RecordField>> fieldModifier, final Map<RecordField, Object> 
fieldsToAdd) throws IOException {
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+
+        // Create a schema that has the fields modified
+        final RecordSchema schemaV1 = 
ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1;
+        final List<RecordField> fields = new ArrayList<>(schemaV1.getFields());
+        fieldModifier.accept(fields);
+
+        final RecordSchema recordSchema = new RecordSchema(fields);
+        final RecordSchema contentClaimSchema = new 
RecordSchema(recordSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields());
+
+        final ByteArraySchemaRecordWriter writer = new 
ByteArraySchemaRecordWriter(journalFile, tocWriter, false, 0) {
+            @Override
+            public void writeHeader(long firstEventId, DataOutputStream out) 
throws IOException {
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                recordSchema.writeTo(baos);
+
+                out.writeInt(baos.size());
+                baos.writeTo(out);
+            }
+
+            @Override
+            protected Record createRecord(final ProvenanceEventRecord event, 
final long eventId) {
+                final Map<RecordField, Object> values = new HashMap<>();
+
+                final EventRecord eventRecord = new EventRecord(event, 
eventId, recordSchema, contentClaimSchema);
+                for (final RecordField field : recordSchema.getFields()) {
+                    final Object value = eventRecord.getFieldValue(field);
+                    values.put(field, value);
+                }
+
+                values.putAll(fieldsToAdd);
+                return new FieldMapRecord(values, recordSchema);
+            }
+        };
+
+        return writer;
+    }
+
+    @Test
+    @Ignore("For local testing only")
+    public void testWritePerformance() throws IOException {
+        // This is a simple micro-benchmarking test so that we can determine 
how fast the serialization/deserialization is before
+        // making significant changes. This allows us to ensure that changes 
that we make do not have significant adverse effects
+        // on performance of the repository.
+        final ProvenanceEventRecord event = createEvent();
+
+        final TocWriter tocWriter = new NopTocWriter();
+
+        final int numEvents = 10_000_000;
+        final long startNanos = System.nanoTime();
+        try (final OutputStream nullOut = new NullOutputStream();
+            final RecordWriter writer = new 
ByteArraySchemaRecordWriter(nullOut, tocWriter, false, 0)) {
+
+            writer.writeHeader(0L);
+
+            for (int i = 0; i < numEvents; i++) {
+                writer.writeRecord(event, i);
+            }
+
+        }
+
+        final long nanos = System.nanoTime() - startNanos;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+        System.out.println("Took " + millis + " millis to write " + numEvents 
+ " events");
+    }
+
+
+    @Test
+    @Ignore("For local performance testing only")
+    public void testReadPerformance() throws IOException, InterruptedException 
{
+        // This is a simple micro-benchmarking test so that we can determine 
how fast the serialization/deserialization is before
+        // making significant changes. This allows us to ensure that changes 
that we make do not have significant adverse effects
+        // on performance of the repository.
+        final ProvenanceEventRecord event = createEvent();
+
+        final TocReader tocReader = null;
+
+        final byte[] header;
+        try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
+            final DataOutputStream out = new DataOutputStream(headerOut)) {
+
+            final RecordWriter schemaWriter = new 
ByteArraySchemaRecordWriter(out, null, false, 0);
+            schemaWriter.writeHeader(1L);
+
+            header = headerOut.toByteArray();
+        }
+
+        final byte[] serializedRecord;
+        try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
+            final RecordWriter writer = new 
ByteArraySchemaRecordWriter(headerOut, null, false, 0)) {
+
+            writer.writeHeader(1L);
+            headerOut.reset();
+
+            writer.writeRecord(event, 1L);
+            writer.flush();
+            serializedRecord = headerOut.toByteArray();
+        }
+
+        final int numEvents = 10_000_000;
+        final int recordBytes = serializedRecord.length;
+        final long totalRecordBytes = (long) recordBytes * (long) numEvents;
+
+        final long startNanos = System.nanoTime();
+        try (final InputStream in = new LoopingInputStream(header, 
serializedRecord);
+            final RecordReader reader = new ByteArraySchemaRecordReader(in, 
"filename", tocReader, 100000)) {
+
+            for (int i = 0; i < numEvents; i++) {
+                reader.nextRecord();
+            }
+        }
+
+        final long nanos = System.nanoTime() - startNanos;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+        final double seconds = millis / 1000D;
+        final long bytesPerSecond = (long) (totalRecordBytes / seconds);
+        final long megaBytesPerSecond = bytesPerSecond / 1024 / 1024;
+        System.out.println("Took " + millis + " millis to read " + numEvents + 
" events or " + megaBytesPerSecond + " MB/sec");
+    }
+
+
+    @Override
+    protected RecordWriter createWriter(File file, TocWriter tocWriter, 
boolean compressed, int uncompressedBlockSize) throws IOException {
+        return new ByteArraySchemaRecordWriter(file, tocWriter, compressed, 
uncompressedBlockSize);
+    }
+
+
+    @Override
+    protected RecordReader createReader(InputStream in, String 
journalFilename, TocReader tocReader, int maxAttributeSize) throws IOException {
+        final ByteArraySchemaRecordReader reader = new 
ByteArraySchemaRecordReader(in, journalFilename, tocReader, maxAttributeSize);
+        return reader;
+    }
+
+    private static interface WriteRecordInterceptor {
+        void writeRawRecord(ProvenanceEventRecord event, long 
recordIdentifier, DataOutputStream out) throws IOException;
+    }
+
+    private static WriteRecordInterceptor NOP_INTERCEPTOR = (event, id, out) 
-> {};
+    private static WriteRecordInterceptor WRITE_DUMMY_STRING_INTERCEPTOR = 
(event, id, out) -> out.writeUTF("hello");
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index e11502a..cc69b18 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -16,174 +16,106 @@
  */
 package org.apache.nifi.provenance;
 
-import static org.apache.nifi.provenance.TestUtil.createFlowFile;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
+import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.provenance.toc.StandardTocReader;
-import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.NopTocWriter;
 import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.provenance.toc.TocUtil;
 import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.BeforeClass;
+import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class TestStandardRecordReaderWriter {
-    @BeforeClass
-    public static void setLogLevel() {
-        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", 
"DEBUG");
-    }
+public class TestStandardRecordReaderWriter extends 
AbstractTestRecordReaderWriter {
 
-    private ProvenanceEventRecord createEvent() {
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("filename", "1.txt");
-        attributes.put("uuid", UUID.randomUUID().toString());
-
-        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
-        builder.setEventTime(System.currentTimeMillis());
-        builder.setEventType(ProvenanceEventType.RECEIVE);
-        builder.setTransitUri("nifi://unit-test");
-        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
-        builder.setComponentId("1234");
-        builder.setComponentType("dummy processor");
-        final ProvenanceEventRecord record = builder.build();
-
-        return record;
-    }
 
     @Test
-    public void testSimpleWriteWithToc() throws IOException {
-        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite");
-        final File tocFile = TocUtil.getTocFile(journalFile);
-        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
-        final StandardRecordWriter writer = new 
StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
-
-        writer.writeHeader(1L);
-        writer.writeRecord(createEvent(), 1L);
-        writer.close();
-
-        final TocReader tocReader = new StandardTocReader(tocFile);
-
-        try (final FileInputStream fis = new FileInputStream(journalFile);
-            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
-            assertEquals(0, reader.getBlockIndex());
-            reader.skipToBlock(0);
-            final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
-            assertNotNull(recovered);
-
-            assertEquals("nifi://unit-test", recovered.getTransitUri());
-            assertNull(reader.nextRecord());
-        }
+    @Ignore("For local testing only")
+    public void testWritePerformance() throws IOException {
+        // This is a simple micro-benchmarking test so that we can determine 
how fast the serialization/deserialization is before
+        // making significant changes. This allows us to ensure that changes 
that we make do not have significant adverse effects
+        // on performance of the repository.
+        final ProvenanceEventRecord event = createEvent();
 
-        FileUtils.deleteFile(journalFile.getParentFile(), true);
-    }
+        final TocWriter tocWriter = new NopTocWriter();
 
+        final int numEvents = 10_000_000;
+        final long startNanos = System.nanoTime();
+        try (final OutputStream nullOut = new NullOutputStream();
+            final RecordWriter writer = new StandardRecordWriter(nullOut, 
tocWriter, false, 100000)) {
 
-    @Test
-    public void testSingleRecordCompressed() throws IOException {
-        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite.gz");
-        final File tocFile = TocUtil.getTocFile(journalFile);
-        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
-        final StandardRecordWriter writer = new 
StandardRecordWriter(journalFile, tocWriter, true, 100);
-
-        writer.writeHeader(1L);
-        writer.writeRecord(createEvent(), 1L);
-        writer.close();
-
-        final TocReader tocReader = new StandardTocReader(tocFile);
-
-        try (final FileInputStream fis = new FileInputStream(journalFile);
-            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
-            assertEquals(0, reader.getBlockIndex());
-            reader.skipToBlock(0);
-            final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
-            assertNotNull(recovered);
-
-            assertEquals("nifi://unit-test", recovered.getTransitUri());
-            assertNull(reader.nextRecord());
+            writer.writeHeader(0L);
+
+            for (int i = 0; i < numEvents; i++) {
+                writer.writeRecord(event, i);
+            }
         }
 
-        FileUtils.deleteFile(journalFile.getParentFile(), true);
+        final long nanos = System.nanoTime() - startNanos;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+        System.out.println("Took " + millis + " millis to write " + numEvents 
+ " events");
     }
 
-
     @Test
-    public void testMultipleRecordsSameBlockCompressed() throws IOException {
-        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite.gz");
-        final File tocFile = TocUtil.getTocFile(journalFile);
-        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
-        // new record each 1 MB of uncompressed data
-        final StandardRecordWriter writer = new 
StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
-
-        writer.writeHeader(1L);
-        for (int i=0; i < 10; i++) {
-            writer.writeRecord(createEvent(), i);
+    @Ignore("For local testing only")
+    public void testReadPerformance() throws IOException {
+        // This is a simple micro-benchmarking test so that we can determine 
how fast the serialization/deserialization is before
+        // making significant changes. This allows us to ensure that changes 
that we make do not have significant adverse effects
+        // on performance of the repository.
+        final ProvenanceEventRecord event = createEvent();
+
+        final TocReader tocReader = null;
+
+        final byte[] header;
+        try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
+            final DataOutputStream out = new DataOutputStream(headerOut)) {
+            out.writeUTF(PersistentProvenanceRepository.class.getName());
+            out.writeInt(9);
+            header = headerOut.toByteArray();
         }
-        writer.close();
 
-        final TocReader tocReader = new StandardTocReader(tocFile);
+        final byte[] serializedRecord;
+        try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
+            final StandardRecordWriter writer = new 
StandardRecordWriter(headerOut, null, false, 0)) {
+
+            writer.writeHeader(1L);
+            headerOut.reset();
 
-        try (final FileInputStream fis = new FileInputStream(journalFile);
-            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
-            for (int i=0; i < 10; i++) {
-                assertEquals(0, reader.getBlockIndex());
+            writer.writeRecord(event, 1L);
+            writer.flush();
+            serializedRecord = headerOut.toByteArray();
+        }
 
-                // call skipToBlock half the time to ensure that we can; avoid 
calling it
-                // the other half of the time to ensure that it's okay.
-                if (i <= 5) {
-                    reader.skipToBlock(0);
-                }
+        final int numEvents = 10_000_000;
+        final long startNanos = System.nanoTime();
+        try (final InputStream in = new LoopingInputStream(header, 
serializedRecord);
+            final RecordReader reader = new StandardRecordReader(in, 
"filename", tocReader, 100000)) {
 
-                final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
-                assertNotNull(recovered);
-                assertEquals("nifi://unit-test", recovered.getTransitUri());
+            for (int i = 0; i < numEvents; i++) {
+                reader.nextRecord();
             }
-
-            assertNull(reader.nextRecord());
         }
 
-        FileUtils.deleteFile(journalFile.getParentFile(), true);
+        final long nanos = System.nanoTime() - startNanos;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+        System.out.println("Took " + millis + " millis to read " + numEvents + 
" events");
     }
 
+    @Override
+    protected RecordWriter createWriter(File file, TocWriter tocWriter, 
boolean compressed, int uncompressedBlockSize) throws IOException {
+        return new StandardRecordWriter(file, tocWriter, compressed, 
uncompressedBlockSize);
+    }
 
-    @Test
-    public void testMultipleRecordsMultipleBlocksCompressed() throws 
IOException {
-        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite.gz");
-        final File tocFile = TocUtil.getTocFile(journalFile);
-        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
-        // new block each 10 bytes
-        final StandardRecordWriter writer = new 
StandardRecordWriter(journalFile, tocWriter, true, 100);
-
-        writer.writeHeader(1L);
-        for (int i=0; i < 10; i++) {
-            writer.writeRecord(createEvent(), i);
-        }
-        writer.close();
-
-        final TocReader tocReader = new StandardTocReader(tocFile);
-
-        try (final FileInputStream fis = new FileInputStream(journalFile);
-            final StandardRecordReader reader = new StandardRecordReader(fis, 
journalFile.getName(), tocReader, 2048)) {
-            for (int i=0; i < 10; i++) {
-                final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
-                System.out.println(recovered);
-                assertNotNull(recovered);
-                assertEquals(i, recovered.getEventId());
-                assertEquals("nifi://unit-test", recovered.getTransitUri());
-            }
-
-            assertNull(reader.nextRecord());
-        }
-
-        FileUtils.deleteFile(journalFile.getParentFile(), true);
+    @Override
+    protected RecordReader createReader(InputStream in, String 
journalFilename, TocReader tocReader, int maxAttributeSize) throws IOException {
+        return new StandardRecordReader(in, journalFilename, tocReader, 
maxAttributeSize);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/NopTocWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/NopTocWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/NopTocWriter.java
new file mode 100644
index 0000000..edf62d8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/NopTocWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.toc;
+
+import java.io.File;
+import java.io.IOException;
+
+public class NopTocWriter implements TocWriter {
+    private int blockIndex;
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void addBlockOffset(long offset, long firstEventId) throws 
IOException {
+        blockIndex++;
+    }
+
+    @Override
+    public int getCurrentBlockIndex() {
+        return blockIndex;
+    }
+
+    @Override
+    public File getFile() {
+        return null;
+    }
+
+    @Override
+    public void sync() throws IOException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f6eed44..d19dba0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1307,6 +1307,11 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-schema-utils</artifactId>
+                <version>1.1.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-hadoop-utils</artifactId>
                 <version>1.1.0-SNAPSHOT</version>
             </dependency>

Reply via email to