Repository: incubator-nifi
Updated Branches:
  refs/heads/journaling-prov-repo [created] f23f36d73


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
new file mode 100644
index 0000000..eca664e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.journals.JournalReader;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
+
+public class TocJournalReader implements Closeable {
+
+    private final TocReader tocReader;
+    private final JournalReader reader;
+    
+    private final String containerName;
+    private final String sectionName;
+    private final String journalId;
+    
+    private int blockIndex;
+    private long nextBlockOffset;
+    
+    
+    public TocJournalReader(final String containerName, final String 
sectionName, final String journalId, final File journalFile) throws IOException 
{
+        this.containerName = containerName;
+        this.sectionName = sectionName;
+        this.journalId = journalId;
+        
+        final File tocFile = new File(journalFile.getParentFile(), 
journalFile.getName() + ".toc");
+        tocReader = new StandardTocReader(tocFile);
+        reader = new StandardJournalReader(journalFile);
+        
+        blockIndex = 0;
+        nextBlockOffset = tocReader.getBlockOffset(1);
+    }
+    
+    @Override
+    public void close() throws IOException {
+        IOException suppressed = null;
+        try {
+            tocReader.close();
+        } catch (final IOException ioe) {
+            suppressed = ioe;
+        }
+        
+        try {
+            reader.close();
+        } catch (final IOException ioe) {
+            if ( suppressed != null ) {
+                ioe.addSuppressed(suppressed);
+            }
+            throw ioe;
+        }
+        
+        if ( suppressed != null ) {
+            throw suppressed;
+        }
+    }
+    
+    public JournaledProvenanceEvent nextJournaledEvent() throws IOException {
+        ProvenanceEventRecord event = reader.nextEvent();
+        if ( event == null ) {
+            return null;
+        }
+        
+        final JournaledStorageLocation location = new 
JournaledStorageLocation(containerName, sectionName, 
+                journalId, blockIndex, event.getEventId());
+        
+        // Check if we've gone beyond the offset of the next block. If so, 
write
+        // out a new block in the TOC.
+        final long newPosition = reader.getPosition();
+        if ( newPosition > nextBlockOffset && nextBlockOffset > 0 ) {
+            blockIndex++;
+            nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1);
+        }
+        
+        return new JournaledProvenanceEvent(event, location);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
new file mode 100644
index 0000000..9f6a264
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import java.io.Closeable;
+
+/**
+ * <p>
+ * Reads a Table of Contents (.toc file) for a corresponding Journal File. We 
use a Table of Contents
+ * to map a Block Index to an offset into the Journal file where that Block 
begins. We do this so that
+ * we can then persist a Block Index for an event and then compress the 
Journal later. This way, we can
+ * get good compression by compressing a large batch of events at once, and 
this way we can also look up
+ * an event in a Journal that has not been compressed by looking in the Table 
of Contents or lookup the
+ * event in a Journal post-compression by simply rewriting the TOC while we 
compress the data.
+ * </p>
+ */
+public interface TocReader extends Closeable {
+
+    /**
+     * Indicates whether or not the corresponding Journal file is compressed
+     * @return
+     */
+    boolean isCompressed();
+
+    /**
+     * Returns the byte offset into the Journal File for the Block with the 
given index.
+     * @param blockIndex
+     * @return
+     */
+    long getBlockOffset(int blockIndex);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java
new file mode 100644
index 0000000..b44b55b
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Writes a .toc file
+ */
+public interface TocWriter extends Closeable {
+
+    /**
+     * Adds the given block offset as the next Block Offset in the Table of 
Contents
+     * @param offset
+     * @throws IOException
+     */
+    void addBlockOffset(long offset) throws IOException;
+    
+    /**
+     * Returns the index of the current Block
+     * @return
+     */
+    int getCurrentBlockIndex();
+    
+    /**
+     * Returns the file that is currently being written to
+     * @return
+     */
+    File getFile();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
new file mode 100644
index 0000000..45b7338
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling;
+
+import java.util.UUID;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+
+public class TestUtil {
+    public static ProvenanceEventRecord generateEvent(final long id) {
+        // Create prov event to add to the stream
+        final ProvenanceEventRecord event = new 
StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.CREATE)
+            .setFlowFileUUID("00000000-0000-0000-0000-" + 
pad(String.valueOf(id), 12, '0'))
+            .setComponentType("Unit Test")
+            .setComponentId(UUID.randomUUID().toString())
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis() - 1000L)
+            .setLineageStartDate(System.currentTimeMillis() - 2000L)
+            .setCurrentContentClaim(null, null, null, null, 0L)
+            .build();
+        
+        return event;
+    }
+    
+    public static String pad(final String value, final int charCount, final 
char padding) {
+        if ( value.length() >= charCount ) {
+            return value;
+        }
+        
+        final StringBuilder sb = new StringBuilder();
+        for (int i=value.length(); i < charCount; i++) {
+            sb.append(padding);
+        }
+        sb.append(value);
+        
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
new file mode 100644
index 0000000..dfaeb1a
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.TestUtil;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.SearchTerms;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.junit.Test;
+
+public class TestEventIndexWriter {
+
+    @Test
+    public void testIndexAndFetch() throws IOException {
+        final JournalingRepositoryConfig config = new 
JournalingRepositoryConfig();
+        config.setSearchableAttributes(Arrays.asList(new SearchableField[] {
+                SearchableFields.newSearchableAttribute("test.1")
+        }));
+        config.setSearchableFields(Arrays.asList(new SearchableField[] {
+                SearchableFields.FlowFileUUID
+        }));
+        
+        final File indexDir = new File("target/" + 
UUID.randomUUID().toString());
+        
+        final File journalFile = new File("target/" + 
UUID.randomUUID().toString());
+        try (final LuceneIndexWriter indexWriter = new 
LuceneIndexWriter(indexDir, config)) {
+            
+            final ProvenanceEventRecord event = TestUtil.generateEvent(23L);
+            final JournaledStorageLocation location = new 
JournaledStorageLocation("container", "section", "journalId", 2, 23L);
+            final JournaledProvenanceEvent storedEvent = new 
JournaledProvenanceEvent(event, location);
+            indexWriter.index(Collections.singleton(storedEvent));
+            
+            final Query query = new Query("123");
+            
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, 
"00000000-0000-0000-0000-000000000023"));
+            
+            try (final EventIndexSearcher searcher = 
indexWriter.newIndexSearcher()) {
+                final SearchResult searchResult = searcher.search(query);
+                final List<JournaledStorageLocation> locations = 
searchResult.getLocations();
+                assertNotNull(locations);
+                assertEquals(1, locations.size());
+                
+                final JournaledStorageLocation found = locations.get(0);
+                assertNotNull(found);
+                assertEquals("container", found.getContainerName());
+                assertEquals("section", found.getSectionName());
+                assertEquals("journalId", found.getJournalId());
+                assertEquals(2, found.getBlockIndex());
+                assertEquals(23L, found.getEventId());
+            }
+        } finally {
+            journalFile.delete();
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
new file mode 100644
index 0000000..f2266e2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.TestUtil;
+import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJournalReadWrite {
+
+    @Test
+    public void testReadWrite100Blocks() throws IOException {
+        testReadWrite100Blocks(true);
+        testReadWrite100Blocks(false);
+    }
+    
+    private void testReadWrite100Blocks(final boolean compressed) throws 
IOException {
+        final long journalId = 1L;
+        final File journalFile = new File("target/1.journal");
+        final StandardEventSerializer serializer = new 
StandardEventSerializer();
+        
+        try {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(journalId, journalFile, compressed, serializer)) {
+                for (int block=0; block < 100; block++) {
+                    writer.beginNewBlock();
+                    
+                    for (int i=0; i < 5; i++) {
+                        final ProvenanceEventRecord event = 
TestUtil.generateEvent(i);
+                        writer.write(Collections.singleton(event), i);
+                    }
+                    
+                    final List<ProvenanceEventRecord> events = new 
ArrayList<>();
+                    for (int i=0; i < 90; i++) {
+                        events.add(TestUtil.generateEvent(i + 5));
+                    }
+                    writer.write(events, 5);
+                    
+                    for (int i=0; i < 5; i++) {
+                        final ProvenanceEventRecord event = 
TestUtil.generateEvent(i);
+                        writer.write(Collections.singleton(event), 95 + i);
+                    }
+                    
+                    writer.finishBlock();
+                }
+            }
+            
+            try (final StandardJournalReader reader = new 
StandardJournalReader(journalFile)) {
+                for (int block=0; block < 100; block++) {
+                    for (int i=0; i < 100; i++) {
+                        final ProvenanceEventRecord record = 
reader.nextEvent();
+                        Assert.assertNotNull(record);
+                        Assert.assertEquals((long) i, record.getEventId());
+                    }
+                }
+            }
+        } finally {
+            journalFile.delete();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
new file mode 100644
index 0000000..e1ecf7d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStandardJournalReader {
+
+    private ByteArrayOutputStream baos;
+    private DataOutputStream dos;
+    
+    @Before
+    public void setup() throws IOException {
+        // Create a BAOS to write the record to.
+        baos = new ByteArrayOutputStream();
+        dos = new DataOutputStream(baos);
+        
+        // Write out header: codec name and serialization version
+        dos.writeUTF(StandardEventSerializer.CODEC_NAME);
+        dos.writeInt(0);
+    }
+    
+    
+    @Test
+    public void testReadFirstEventUncompressed() throws IOException {
+        dos.writeBoolean(false);
+        writeRecord(88L);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            final ProvenanceEventRecord restored = reader.nextEvent();
+            assertNotNull(restored);
+            assertEquals(88L, restored.getEventId());
+            assertEquals(ProvenanceEventType.CREATE, restored.getEventType());
+            assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void testReadManyUncompressed() throws IOException {
+        dos.writeBoolean(false);
+        writeRecords(0, 1024, false);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            for (int i=0; i < 1024; i++) {
+                final ProvenanceEventRecord restored = reader.nextEvent();
+                assertNotNull(restored);
+                assertEquals((long) i, restored.getEventId());
+                assertEquals(ProvenanceEventType.CREATE, 
restored.getEventType());
+                assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+            }
+            
+            assertNull(reader.nextEvent());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    @Test
+    public void testReadFirstEventWithBlockOffsetUncompressed() throws 
IOException {
+        dos.writeBoolean(false);
+        writeRecords(0, 10, false);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, false);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            final ProvenanceEventRecord restored = 
reader.getEvent(secondBlockOffset, 10L);
+            assertNotNull(restored);
+            assertEquals(10L, restored.getEventId());
+            assertEquals(ProvenanceEventType.CREATE, restored.getEventType());
+            assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    @Test
+    public void testReadSubsequentEventWithBlockOffsetUncompressed() throws 
IOException {
+        dos.writeBoolean(false);
+        writeRecords(0, 10, false);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, false);
+
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            final ProvenanceEventRecord restored = 
reader.getEvent(secondBlockOffset, 10L);
+            assertNotNull(restored);
+            assertEquals(10L, restored.getEventId());
+            assertEquals(ProvenanceEventType.CREATE, restored.getEventType());
+            assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    @Test
+    public void testReadMultipleEventsWithBlockOffsetUncompressed() throws 
IOException {
+        dos.writeBoolean(false);
+        writeRecords(0, 10, false);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, false);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            for (int i=0; i < 2; i++) {
+                final ProvenanceEventRecord event10 = 
reader.getEvent(secondBlockOffset, 10L);
+                assertNotNull(event10);
+                assertEquals(10L, event10.getEventId());
+                assertEquals(ProvenanceEventType.CREATE, 
event10.getEventType());
+                assertEquals("00000000-0000-0000-0000-000000000000", 
event10.getFlowFileUuid());
+                
+                final ProvenanceEventRecord event13 = 
reader.getEvent(secondBlockOffset, 13L);
+                assertNotNull(event13);
+                assertEquals(13L, event13.getEventId());
+                assertEquals(ProvenanceEventType.CREATE, 
event13.getEventType());
+                assertEquals("00000000-0000-0000-0000-000000000000", 
event13.getFlowFileUuid());
+            }
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void testReadFirstEventCompressed() throws IOException {
+        dos.writeBoolean(true);
+        writeRecords(88L, 1, true);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            final ProvenanceEventRecord restored = reader.nextEvent();
+            assertNotNull(restored);
+            assertEquals(88L, restored.getEventId());
+            assertEquals(ProvenanceEventType.CREATE, restored.getEventType());
+            assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    @Test
+    public void testReadManyCompressed() throws IOException {
+        dos.writeBoolean(true);
+        writeRecords(0, 1024, true);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            for (int i=0; i < 1024; i++) {
+                final ProvenanceEventRecord restored = reader.nextEvent();
+                assertNotNull(restored);
+                assertEquals((long) i, restored.getEventId());
+                assertEquals(ProvenanceEventType.CREATE, 
restored.getEventType());
+                assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+            }
+            
+            assertNull(reader.nextEvent());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void testReadFirstEventWithBlockOffsetCompressed() throws 
IOException {
+        dos.writeBoolean(true);
+        writeRecords(0, 10, true);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, true);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            final ProvenanceEventRecord restored = 
reader.getEvent(secondBlockOffset, 10L);
+            assertNotNull(restored);
+            assertEquals(10L, restored.getEventId());
+            assertEquals(ProvenanceEventType.CREATE, restored.getEventType());
+            assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    @Test
+    public void testReadSubsequentEventWithBlockOffsetCompressed() throws 
IOException {
+        dos.writeBoolean(true);
+        writeRecords(0, 10, true);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, true);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            final ProvenanceEventRecord restored = 
reader.getEvent(secondBlockOffset, 10L);
+            assertNotNull(restored);
+            assertEquals(10L, restored.getEventId());
+            assertEquals(ProvenanceEventType.CREATE, restored.getEventType());
+            assertEquals("00000000-0000-0000-0000-000000000000", 
restored.getFlowFileUuid());
+        } finally {
+            file.delete();
+        }
+    }
+    
+    @Test
+    public void testReadMultipleEventsWithBlockOffsetCompressed() throws 
IOException {
+        dos.writeBoolean(true);
+        writeRecords(0, 10, true);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, true);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            for (int i=0; i < 2; i++) {
+                final ProvenanceEventRecord event10 = 
reader.getEvent(secondBlockOffset, 10L);
+                assertNotNull(event10);
+                assertEquals(10L, event10.getEventId());
+                assertEquals(ProvenanceEventType.CREATE, 
event10.getEventType());
+                assertEquals("00000000-0000-0000-0000-000000000000", 
event10.getFlowFileUuid());
+                
+                final ProvenanceEventRecord event13 = 
reader.getEvent(secondBlockOffset, 13L);
+                assertNotNull(event13);
+                assertEquals(13L, event13.getEventId());
+                assertEquals(ProvenanceEventType.CREATE, 
event13.getEventType());
+                assertEquals("00000000-0000-0000-0000-000000000000", 
event13.getFlowFileUuid());
+            }
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void 
testReadEventWithBlockOffsetThenPreviousBlockOffsetUncompressed() throws 
IOException {
+        dos.writeBoolean(false);
+        final int firstBlockOffset = baos.size();
+        writeRecords(0, 10, false);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, false);
+
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            for (int j=0; j < 2; j++) {
+                for (int i=0; i < 2; i++) {
+                    final ProvenanceEventRecord event10 = 
reader.getEvent(secondBlockOffset, 10L);
+                    assertNotNull(event10);
+                    assertEquals(10L, event10.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event10.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event10.getFlowFileUuid());
+                    
+                    final ProvenanceEventRecord event13 = 
reader.getEvent(secondBlockOffset, 13L);
+                    assertNotNull(event13);
+                    assertEquals(13L, event13.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event13.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event13.getFlowFileUuid());
+                }
+                
+                for (int i=0; i < 2; i++) {
+                    final ProvenanceEventRecord event2 = 
reader.getEvent(firstBlockOffset, 2L);
+                    assertNotNull(event2);
+                    assertEquals(2L, event2.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event2.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event2.getFlowFileUuid());
+                    
+                    final ProvenanceEventRecord event6 = 
reader.getEvent(firstBlockOffset, 6L);
+                    assertNotNull(event6);
+                    assertEquals(6L, event6.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event6.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event6.getFlowFileUuid());
+                }
+            }
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void 
testReadEventWithBlockOffsetThenPreviousBlockOffsetCompressed() throws 
IOException {
+        dos.writeBoolean(true);
+        final int firstBlockOffset = baos.size();
+        writeRecords(0, 10, true);
+        
+        final int secondBlockOffset = baos.size();
+        writeRecords(10, 10, true);
+        
+        // write data to a file so that we can read it with the journal reader
+        final File dir = new File("target/testData");
+        final File file = new File(dir, UUID.randomUUID().toString() + 
".journal");
+        dir.mkdirs();
+        
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            baos.writeTo(fos);
+        }
+        
+        // read the record and verify its contents
+        try (final StandardJournalReader reader = new 
StandardJournalReader(file)) {
+            for (int j=0; j < 2; j++) {
+                for (int i=0; i < 2; i++) {
+                    final ProvenanceEventRecord event10 = 
reader.getEvent(secondBlockOffset, 10L);
+                    assertNotNull(event10);
+                    assertEquals(10L, event10.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event10.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event10.getFlowFileUuid());
+                    
+                    final ProvenanceEventRecord event13 = 
reader.getEvent(secondBlockOffset, 13L);
+                    assertNotNull(event13);
+                    assertEquals(13L, event13.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event13.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event13.getFlowFileUuid());
+                }
+                
+                for (int i=0; i < 2; i++) {
+                    final ProvenanceEventRecord event2 = 
reader.getEvent(firstBlockOffset, 2L);
+                    assertNotNull(event2);
+                    assertEquals(2L, event2.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event2.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event2.getFlowFileUuid());
+                    
+                    final ProvenanceEventRecord event6 = 
reader.getEvent(firstBlockOffset, 6L);
+                    assertNotNull(event6);
+                    assertEquals(6L, event6.getEventId());
+                    assertEquals(ProvenanceEventType.CREATE, 
event6.getEventType());
+                    assertEquals("00000000-0000-0000-0000-000000000000", 
event6.getFlowFileUuid());
+                }
+            }
+        } finally {
+            file.delete();
+        }
+    }
+    
+
+    
+    
+    private void writeRecord(final long id) throws IOException {
+        writeRecord(id, dos);
+    }
+    
+    private void writeRecords(final long startId, final int numRecords, final 
boolean compressed) throws IOException {
+        if ( compressed ) {
+            final CompressionOutputStream compressedOut = new 
CompressionOutputStream(dos);
+            for (long id = startId; id < startId + numRecords; id++) {
+                writeRecord(id, new DataOutputStream(compressedOut));
+            }
+            compressedOut.close();
+        } else {
+            for (long id = startId; id < startId + numRecords; id++) {
+                writeRecord(id, dos);
+            }
+        }
+    }
+    
+    private void writeRecord(final long id, final DataOutputStream dos) throws 
IOException {
+        // Create prov event to add to the stream
+        final ProvenanceEventRecord event = new 
StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.CREATE)
+            .setFlowFileUUID("00000000-0000-0000-0000-000000000000")
+            .setComponentType("Unit Test")
+            .setComponentId(UUID.randomUUID().toString())
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis() - 1000L)
+            .setLineageStartDate(System.currentTimeMillis() - 2000L)
+            .setCurrentContentClaim(null, null, null, null, 0L)
+            .build();
+        
+        // Serialize the prov event
+        final ByteArrayOutputStream serializationStream = new 
ByteArrayOutputStream();
+        final StandardEventSerializer serializer = new 
StandardEventSerializer();
+        serializer.serialize(event, new DataOutputStream(serializationStream));
+        
+        // Write out to our stream the event length, followed by the id, and 
then the serialized event
+        final int recordLen = 8 + serializationStream.size();
+        
+        dos.writeInt(recordLen);
+        dos.writeLong(id);
+        serializationStream.writeTo(dos);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
new file mode 100644
index 0000000..d5eab8e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.journaling.TestUtil;
+import org.apache.nifi.provenance.journaling.io.StandardEventDeserializer;
+import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.junit.Test;
+
+public class TestStandardJournalWriter {
+
+    @Test
+    public void testOneBlockOneRecordWriteCompressed() throws IOException {
+        final File journalFile = new File("target/" + 
UUID.randomUUID().toString());
+        
+        final StandardEventSerializer serializer = new 
StandardEventSerializer();
+        try {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, true, serializer)) {
+                writer.beginNewBlock();
+                
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
+                writer.finishBlock();
+            }
+        
+            final byte[] data = Files.readAllBytes(journalFile.toPath());
+            final ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            final DataInputStream dis = new DataInputStream(bais);
+
+            final String codecName = dis.readUTF();
+            assertEquals(StandardEventSerializer.CODEC_NAME, codecName);
+            
+            final int version = dis.readInt();
+            assertEquals(1, version);
+            
+            // compression flag
+            assertEquals(true, dis.readBoolean());
+            
+            // read block start
+            final CompressionInputStream decompressedIn = new 
CompressionInputStream(bais);
+            final StandardEventDeserializer deserializer = new 
StandardEventDeserializer();
+            
+            final DataInputStream decompressedDis = new 
DataInputStream(decompressedIn);
+            final int eventLength = decompressedDis.readInt();
+            assertEquals(131, eventLength);
+            final ProvenanceEventRecord event = 
deserializer.deserialize(decompressedDis, 0);
+            assertEquals(1, event.getEventId());
+            assertEquals(ProvenanceEventType.CREATE, event.getEventType());
+            
+            assertEquals(-1, decompressedIn.read());
+        } finally {
+            journalFile.delete();
+        }
+    }
+    
+    @Test
+    public void testManyBlocksOneRecordWriteCompressed() throws IOException {
+        final File journalFile = new File("target/" + 
UUID.randomUUID().toString());
+        
+        final StandardEventSerializer serializer = new 
StandardEventSerializer();
+        try {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, true, serializer)) {
+                for (int i=0; i < 1024; i++) {
+                    writer.beginNewBlock();
+                    
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
+                    writer.finishBlock();
+                }
+            }
+        
+            final byte[] data = Files.readAllBytes(journalFile.toPath());
+            final ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            final DataInputStream dis = new DataInputStream(bais);
+
+            final String codecName = dis.readUTF();
+            assertEquals(StandardEventSerializer.CODEC_NAME, codecName);
+            
+            final int version = dis.readInt();
+            assertEquals(1, version);
+            
+            // compression flag
+            assertEquals(true, dis.readBoolean());
+            
+            // read block start
+            for (int i=0; i < 1024; i++) {
+                final CompressionInputStream decompressedIn = new 
CompressionInputStream(bais);
+                final StandardEventDeserializer deserializer = new 
StandardEventDeserializer();
+                
+                final DataInputStream decompressedDis = new 
DataInputStream(decompressedIn);
+                final int eventLength = decompressedDis.readInt();
+                assertEquals(131, eventLength);
+                final ProvenanceEventRecord event = 
deserializer.deserialize(decompressedDis, 0);
+                assertEquals(1, event.getEventId());
+                assertEquals(ProvenanceEventType.CREATE, event.getEventType());
+                
+                if ( i == 1023 ) {
+                    assertEquals(-1, decompressedIn.read());
+                }
+            }
+        } finally {
+            journalFile.delete();
+        }
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java
new file mode 100644
index 0000000..d5c4037
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import org.junit.Test;
+
+public class TestStandardTocReader {
+
+    @Test
+    public void testDetectsCompression() throws IOException {
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write(0);
+            out.write(0);
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertFalse(reader.isCompressed());
+            }
+        } finally {
+            file.delete();
+        }
+        
+        
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write(0);
+            out.write(1);
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertTrue(reader.isCompressed());
+            }
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void testGetBlockIndex() throws IOException {
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        try (final OutputStream out = new FileOutputStream(file);
+             final DataOutputStream dos = new DataOutputStream(out)) {
+            out.write(0);
+            out.write(0);
+            
+            for (int i=0; i < 1024; i++) {
+                dos.writeLong(i * 1024L);
+            }
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertFalse(reader.isCompressed());
+                
+                for (int i=0; i < 1024; i++) {
+                    assertEquals(i * 1024, reader.getBlockOffset(i));
+                }
+            }
+        } finally {
+            file.delete();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index f4f9d12..777130e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -64,7 +64,7 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     // default property values
     public static final int DEFAULT_BUFFER_SIZE = 10000;
 
-    private final RingBuffer<ProvenanceEventRecord> ringBuffer;
+    private final RingBuffer<StoredProvenanceEvent> ringBuffer;
     private final List<SearchableField> searchableFields;
     private final List<SearchableField> searchableAttributes;
     private final ExecutorService queryExecService;
@@ -123,17 +123,17 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     }
 
     @Override
-    public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+    public void registerEvents(final Collection<ProvenanceEventRecord> events) 
{
         for (final ProvenanceEventRecord event : events) {
             registerEvent(event);
         }
     }
 
     @Override
-    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
-        return ringBuffer.getSelectedElements(new 
Filter<ProvenanceEventRecord>() {
+    public List<StoredProvenanceEvent> getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
+        return ringBuffer.getSelectedElements(new 
Filter<StoredProvenanceEvent>() {
             @Override
-            public boolean select(final ProvenanceEventRecord value) {
+            public boolean select(final StoredProvenanceEvent value) {
                 return value.getEventId() >= firstRecordId;
             }
         }, maxRecords);
@@ -145,21 +145,22 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         return (newest == null) ? null : newest.getEventId();
     }
 
-    public ProvenanceEventRecord getEvent(final String identifier) throws 
IOException {
-        final List<ProvenanceEventRecord> records = 
ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() {
+    public StoredProvenanceEvent getEvent(final String identifier) throws 
IOException {
+        final List<StoredProvenanceEvent> records = 
ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() {
             @Override
-            public boolean select(final ProvenanceEventRecord event) {
+            public boolean select(final StoredProvenanceEvent event) {
                 return identifier.equals(event.getFlowFileUuid());
             }
         }, 1);
+        
         return records.isEmpty() ? null : records.get(0);
     }
 
     @Override
-    public ProvenanceEventRecord getEvent(final long id) {
-        final List<ProvenanceEventRecord> records = 
ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() {
+    public StoredProvenanceEvent getEvent(final long id) {
+        final List<StoredProvenanceEvent> records = 
ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() {
             @Override
-            public boolean select(final ProvenanceEventRecord event) {
+            public boolean select(final StoredProvenanceEvent event) {
                 return event.getEventId() == id;
             }
         }, 1);
@@ -407,6 +408,44 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         throw new UnsupportedOperationException();
     }
 
+
+    @Override
+    public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> 
storageLocations) throws IOException {
+        final List<StoredProvenanceEvent> events = new 
ArrayList<>(storageLocations.size());
+        for ( final StorageLocation location : storageLocations ) {
+            if ( !(location instanceof IdLocation) ) {
+                throw new IllegalArgumentException("Illegal Storage Location");
+            }
+            
+            final long id = ((IdLocation) location).getId();
+            final StoredProvenanceEvent event = getEvent(id);
+            if ( event != null ) {
+                events.add(event);
+            }
+        }
+        return events;
+    }
+
+    @Override
+    public StoredProvenanceEvent getEvent(final StorageLocation location) 
throws IOException {
+        if ( !(location instanceof IdLocation) ) {
+            throw new IllegalArgumentException("Illegal Storage Location");
+        }
+        
+        final long id = ((IdLocation) location).getId();
+        return getEvent(id);
+    }
+
+    @Override
+    public Long getEarliestEventTime() throws IOException {
+        final List<StoredProvenanceEvent> events = getEvents(0L, 1);
+        if ( events.isEmpty() ) {
+            return null;
+        }
+        
+        return events.get(0).getEventTime();
+    }
+    
     @Override
     public ComputeLineageSubmission submitExpandParents(final long eventId) {
         final ProvenanceEventRecord event = getEvent(eventId);
@@ -432,9 +471,6 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         }
     }
 
-    public Lineage expandSpawnEventChildren(final String identifier) {
-        throw new UnsupportedOperationException();
-    }
 
     @Override
     public ComputeLineageSubmission submitExpandChildren(final long eventId) {
@@ -465,9 +501,9 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         final AsyncLineageSubmission result = new 
AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1);
         lineageSubmissionMap.put(result.getLineageIdentifier(), result);
 
-        final Filter<ProvenanceEventRecord> filter = new 
Filter<ProvenanceEventRecord>() {
+        final Filter<StoredProvenanceEvent> filter = new 
Filter<StoredProvenanceEvent>() {
             @Override
-            public boolean select(final ProvenanceEventRecord event) {
+            public boolean select(final StoredProvenanceEvent event) {
                 if (flowFileUuids.contains(event.getFlowFileUuid())) {
                     return true;
                 }
@@ -495,12 +531,12 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
 
     private static class QueryRunnable implements Runnable {
 
-        private final RingBuffer<ProvenanceEventRecord> ringBuffer;
+        private final RingBuffer<StoredProvenanceEvent> ringBuffer;
         private final Filter<ProvenanceEventRecord> filter;
         private final AsyncQuerySubmission submission;
         private final int maxRecords;
 
-        public QueryRunnable(final RingBuffer<ProvenanceEventRecord> 
ringBuffer, final Filter<ProvenanceEventRecord> filter, final int maxRecords, 
final AsyncQuerySubmission submission) {
+        public QueryRunnable(final RingBuffer<StoredProvenanceEvent> 
ringBuffer, final Filter<ProvenanceEventRecord> filter, final int maxRecords, 
final AsyncQuerySubmission submission) {
             this.ringBuffer = ringBuffer;
             this.filter = filter;
             this.submission = submission;
@@ -511,10 +547,10 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         public void run() {
             // Retrieve the most recent results and count the total number of 
matches
             final IntegerHolder matchingCount = new IntegerHolder(0);
-            final List<ProvenanceEventRecord> matchingRecords = new 
ArrayList<>(maxRecords);
-            ringBuffer.forEach(new ForEachEvaluator<ProvenanceEventRecord>() {
+            final List<StoredProvenanceEvent> matchingRecords = new 
ArrayList<>(maxRecords);
+            ringBuffer.forEach(new ForEachEvaluator<StoredProvenanceEvent>() {
                 @Override
-                public boolean evaluate(final ProvenanceEventRecord record) {
+                public boolean evaluate(final StoredProvenanceEvent record) {
                     if (filter.select(record)) {
                         if (matchingCount.incrementAndGet() <= maxRecords) {
                             matchingRecords.add(record);
@@ -532,20 +568,21 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
 
     private static class ComputeLineageRunnable implements Runnable {
 
-        private final RingBuffer<ProvenanceEventRecord> ringBuffer;
-        private final Filter<ProvenanceEventRecord> filter;
+        private final RingBuffer<StoredProvenanceEvent> ringBuffer;
+        private final Filter<StoredProvenanceEvent> filter;
         private final AsyncLineageSubmission submission;
 
-        public ComputeLineageRunnable(final RingBuffer<ProvenanceEventRecord> 
ringBuffer, final Filter<ProvenanceEventRecord> filter, final 
AsyncLineageSubmission submission) {
+        public ComputeLineageRunnable(final RingBuffer<StoredProvenanceEvent> 
ringBuffer, final Filter<StoredProvenanceEvent> filter, final 
AsyncLineageSubmission submission) {
             this.ringBuffer = ringBuffer;
             this.filter = filter;
             this.submission = submission;
         }
 
         @Override
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         public void run() {
-            final List<ProvenanceEventRecord> records = 
ringBuffer.getSelectedElements(filter);
-            submission.getResult().update(records);
+            final List<StoredProvenanceEvent> records = 
ringBuffer.getSelectedElements(filter);
+            submission.getResult().update((List) records);
         }
     }
 
@@ -577,7 +614,7 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         }
     }
 
-    private static class IdEnrichedProvEvent implements ProvenanceEventRecord {
+    private static class IdEnrichedProvEvent implements StoredProvenanceEvent {
 
         private final ProvenanceEventRecord record;
         private final long id;
@@ -741,5 +778,23 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         public Long getPreviousContentClaimOffset() {
             return record.getPreviousContentClaimOffset();
         }
+
+        @Override
+        public StorageLocation getStorageLocation() {
+            return new IdLocation(getEventId());
+        }
     }
+    
+    private static class IdLocation implements StorageLocation {
+        private final long id;
+        
+        public IdLocation(final long id) {
+            this.id = id;
+        }
+        
+        public long getId() {
+            return id;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
index 3c3e401..fd27470 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
@@ -65,7 +65,7 @@ public class TestVolatileProvenanceRepository {
             repo.registerEvent(builder.build());
         }
 
-        final List<ProvenanceEventRecord> retrieved = repo.getEvents(0L, 12);
+        final List<StoredProvenanceEvent> retrieved = repo.getEvents(0L, 12);
 
         assertEquals(10, retrieved.size());
         for (int i = 0; i < 10; i++) {

Reply via email to