Repository: incubator-nifi Updated Branches: refs/heads/journaling-prov-repo [created] f23f36d73
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java new file mode 100644 index 0000000..eca664e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.toc; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.journals.JournalReader; +import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; + +public class TocJournalReader implements Closeable { + + private final TocReader tocReader; + private final JournalReader reader; + + private final String containerName; + private final String sectionName; + private final String journalId; + + private int blockIndex; + private long nextBlockOffset; + + + public TocJournalReader(final String containerName, final String sectionName, final String journalId, final File journalFile) throws IOException { + this.containerName = containerName; + this.sectionName = sectionName; + this.journalId = journalId; + + final File tocFile = new File(journalFile.getParentFile(), journalFile.getName() + ".toc"); + tocReader = new StandardTocReader(tocFile); + reader = new StandardJournalReader(journalFile); + + blockIndex = 0; + nextBlockOffset = tocReader.getBlockOffset(1); + } + + @Override + public void close() throws IOException { + IOException suppressed = null; + try { + tocReader.close(); + } catch (final IOException ioe) { + suppressed = ioe; + } + + try { + reader.close(); + } catch (final IOException ioe) { + if ( suppressed != null ) { + ioe.addSuppressed(suppressed); + } + throw ioe; + } + + if ( suppressed != null ) { + throw suppressed; + } + } + + public JournaledProvenanceEvent nextJournaledEvent() throws IOException { + ProvenanceEventRecord event = reader.nextEvent(); + if ( event == null ) { + return null; + } + + final JournaledStorageLocation location = new JournaledStorageLocation(containerName, sectionName, + journalId, blockIndex, event.getEventId()); + + // Check if we've gone beyond the offset of the next block. If so, write + // out a new block in the TOC. + final long newPosition = reader.getPosition(); + if ( newPosition > nextBlockOffset && nextBlockOffset > 0 ) { + blockIndex++; + nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1); + } + + return new JournaledProvenanceEvent(event, location); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java new file mode 100644 index 0000000..9f6a264 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.toc; + +import java.io.Closeable; + +/** + * <p> + * Reads a Table of Contents (.toc file) for a corresponding Journal File. We use a Table of Contents + * to map a Block Index to an offset into the Journal file where that Block begins. We do this so that + * we can then persist a Block Index for an event and then compress the Journal later. This way, we can + * get good compression by compressing a large batch of events at once, and this way we can also look up + * an event in a Journal that has not been compressed by looking in the Table of Contents or lookup the + * event in a Journal post-compression by simply rewriting the TOC while we compress the data. + * </p> + */ +public interface TocReader extends Closeable { + + /** + * Indicates whether or not the corresponding Journal file is compressed + * @return + */ + boolean isCompressed(); + + /** + * Returns the byte offset into the Journal File for the Block with the given index. + * @param blockIndex + * @return + */ + long getBlockOffset(int blockIndex); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java new file mode 100644 index 0000000..b44b55b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocWriter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.toc; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +/** + * Writes a .toc file + */ +public interface TocWriter extends Closeable { + + /** + * Adds the given block offset as the next Block Offset in the Table of Contents + * @param offset + * @throws IOException + */ + void addBlockOffset(long offset) throws IOException; + + /** + * Returns the index of the current Block + * @return + */ + int getCurrentBlockIndex(); + + /** + * Returns the file that is currently being written to + * @return + */ + File getFile(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java new file mode 100644 index 0000000..45b7338 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import java.util.UUID; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; + +public class TestUtil { + public static ProvenanceEventRecord generateEvent(final long id) { + // Create prov event to add to the stream + final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.CREATE) + .setFlowFileUUID("00000000-0000-0000-0000-" + pad(String.valueOf(id), 12, '0')) + .setComponentType("Unit Test") + .setComponentId(UUID.randomUUID().toString()) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(System.currentTimeMillis() - 1000L) + .setLineageStartDate(System.currentTimeMillis() - 2000L) + .setCurrentContentClaim(null, null, null, null, 0L) + .build(); + + return event; + } + + public static String pad(final String value, final int charCount, final char padding) { + if ( value.length() >= charCount ) { + return value; + } + + final StringBuilder sb = new StringBuilder(); + for (int i=value.length(); i < charCount; i++) { + sb.append(padding); + } + sb.append(value); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java new file mode 100644 index 0000000..dfaeb1a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.index; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.TestUtil; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.SearchTerms; +import org.apache.nifi.provenance.search.SearchableField; +import org.junit.Test; + +public class TestEventIndexWriter { + + @Test + public void testIndexAndFetch() throws IOException { + final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); + config.setSearchableAttributes(Arrays.asList(new SearchableField[] { + SearchableFields.newSearchableAttribute("test.1") + })); + config.setSearchableFields(Arrays.asList(new SearchableField[] { + SearchableFields.FlowFileUUID + })); + + final File indexDir = new File("target/" + UUID.randomUUID().toString()); + + final File journalFile = new File("target/" + UUID.randomUUID().toString()); + try (final LuceneIndexWriter indexWriter = new LuceneIndexWriter(indexDir, config)) { + + final ProvenanceEventRecord event = TestUtil.generateEvent(23L); + final JournaledStorageLocation location = new JournaledStorageLocation("container", "section", "journalId", 2, 23L); + final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location); + indexWriter.index(Collections.singleton(storedEvent)); + + final Query query = new Query("123"); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000-000000000023")); + + try (final EventIndexSearcher searcher = indexWriter.newIndexSearcher()) { + final SearchResult searchResult = searcher.search(query); + final List<JournaledStorageLocation> locations = searchResult.getLocations(); + assertNotNull(locations); + assertEquals(1, locations.size()); + + final JournaledStorageLocation found = locations.get(0); + assertNotNull(found); + assertEquals("container", found.getContainerName()); + assertEquals("section", found.getSectionName()); + assertEquals("journalId", found.getJournalId()); + assertEquals(2, found.getBlockIndex()); + assertEquals(23L, found.getEventId()); + } + } finally { + journalFile.delete(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java new file mode 100644 index 0000000..f2266e2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.TestUtil; +import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; +import org.junit.Assert; +import org.junit.Test; + +public class TestJournalReadWrite { + + @Test + public void testReadWrite100Blocks() throws IOException { + testReadWrite100Blocks(true); + testReadWrite100Blocks(false); + } + + private void testReadWrite100Blocks(final boolean compressed) throws IOException { + final long journalId = 1L; + final File journalFile = new File("target/1.journal"); + final StandardEventSerializer serializer = new StandardEventSerializer(); + + try { + try (final StandardJournalWriter writer = new StandardJournalWriter(journalId, journalFile, compressed, serializer)) { + for (int block=0; block < 100; block++) { + writer.beginNewBlock(); + + for (int i=0; i < 5; i++) { + final ProvenanceEventRecord event = TestUtil.generateEvent(i); + writer.write(Collections.singleton(event), i); + } + + final List<ProvenanceEventRecord> events = new ArrayList<>(); + for (int i=0; i < 90; i++) { + events.add(TestUtil.generateEvent(i + 5)); + } + writer.write(events, 5); + + for (int i=0; i < 5; i++) { + final ProvenanceEventRecord event = TestUtil.generateEvent(i); + writer.write(Collections.singleton(event), 95 + i); + } + + writer.finishBlock(); + } + } + + try (final StandardJournalReader reader = new StandardJournalReader(journalFile)) { + for (int block=0; block < 100; block++) { + for (int i=0; i < 100; i++) { + final ProvenanceEventRecord record = reader.nextEvent(); + Assert.assertNotNull(record); + Assert.assertEquals((long) i, record.getEventId()); + } + } + } + } finally { + journalFile.delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java new file mode 100644 index 0000000..e1ecf7d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.UUID; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; +import org.apache.nifi.remote.io.CompressionOutputStream; +import org.junit.Before; +import org.junit.Test; + +public class TestStandardJournalReader { + + private ByteArrayOutputStream baos; + private DataOutputStream dos; + + @Before + public void setup() throws IOException { + // Create a BAOS to write the record to. + baos = new ByteArrayOutputStream(); + dos = new DataOutputStream(baos); + + // Write out header: codec name and serialization version + dos.writeUTF(StandardEventSerializer.CODEC_NAME); + dos.writeInt(0); + } + + + @Test + public void testReadFirstEventUncompressed() throws IOException { + dos.writeBoolean(false); + writeRecord(88L); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + final ProvenanceEventRecord restored = reader.nextEvent(); + assertNotNull(restored); + assertEquals(88L, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } finally { + file.delete(); + } + } + + + @Test + public void testReadManyUncompressed() throws IOException { + dos.writeBoolean(false); + writeRecords(0, 1024, false); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + for (int i=0; i < 1024; i++) { + final ProvenanceEventRecord restored = reader.nextEvent(); + assertNotNull(restored); + assertEquals((long) i, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } + + assertNull(reader.nextEvent()); + } finally { + file.delete(); + } + } + + @Test + public void testReadFirstEventWithBlockOffsetUncompressed() throws IOException { + dos.writeBoolean(false); + writeRecords(0, 10, false); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, false); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + final ProvenanceEventRecord restored = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(restored); + assertEquals(10L, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } finally { + file.delete(); + } + } + + @Test + public void testReadSubsequentEventWithBlockOffsetUncompressed() throws IOException { + dos.writeBoolean(false); + writeRecords(0, 10, false); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, false); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + final ProvenanceEventRecord restored = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(restored); + assertEquals(10L, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } finally { + file.delete(); + } + } + + @Test + public void testReadMultipleEventsWithBlockOffsetUncompressed() throws IOException { + dos.writeBoolean(false); + writeRecords(0, 10, false); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, false); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + for (int i=0; i < 2; i++) { + final ProvenanceEventRecord event10 = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(event10); + assertEquals(10L, event10.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event10.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event10.getFlowFileUuid()); + + final ProvenanceEventRecord event13 = reader.getEvent(secondBlockOffset, 13L); + assertNotNull(event13); + assertEquals(13L, event13.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event13.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event13.getFlowFileUuid()); + } + } finally { + file.delete(); + } + } + + + @Test + public void testReadFirstEventCompressed() throws IOException { + dos.writeBoolean(true); + writeRecords(88L, 1, true); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + final ProvenanceEventRecord restored = reader.nextEvent(); + assertNotNull(restored); + assertEquals(88L, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } finally { + file.delete(); + } + } + + @Test + public void testReadManyCompressed() throws IOException { + dos.writeBoolean(true); + writeRecords(0, 1024, true); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + for (int i=0; i < 1024; i++) { + final ProvenanceEventRecord restored = reader.nextEvent(); + assertNotNull(restored); + assertEquals((long) i, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } + + assertNull(reader.nextEvent()); + } finally { + file.delete(); + } + } + + + @Test + public void testReadFirstEventWithBlockOffsetCompressed() throws IOException { + dos.writeBoolean(true); + writeRecords(0, 10, true); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, true); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + final ProvenanceEventRecord restored = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(restored); + assertEquals(10L, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } finally { + file.delete(); + } + } + + @Test + public void testReadSubsequentEventWithBlockOffsetCompressed() throws IOException { + dos.writeBoolean(true); + writeRecords(0, 10, true); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, true); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + final ProvenanceEventRecord restored = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(restored); + assertEquals(10L, restored.getEventId()); + assertEquals(ProvenanceEventType.CREATE, restored.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", restored.getFlowFileUuid()); + } finally { + file.delete(); + } + } + + @Test + public void testReadMultipleEventsWithBlockOffsetCompressed() throws IOException { + dos.writeBoolean(true); + writeRecords(0, 10, true); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, true); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + for (int i=0; i < 2; i++) { + final ProvenanceEventRecord event10 = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(event10); + assertEquals(10L, event10.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event10.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event10.getFlowFileUuid()); + + final ProvenanceEventRecord event13 = reader.getEvent(secondBlockOffset, 13L); + assertNotNull(event13); + assertEquals(13L, event13.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event13.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event13.getFlowFileUuid()); + } + } finally { + file.delete(); + } + } + + + @Test + public void testReadEventWithBlockOffsetThenPreviousBlockOffsetUncompressed() throws IOException { + dos.writeBoolean(false); + final int firstBlockOffset = baos.size(); + writeRecords(0, 10, false); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, false); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + for (int j=0; j < 2; j++) { + for (int i=0; i < 2; i++) { + final ProvenanceEventRecord event10 = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(event10); + assertEquals(10L, event10.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event10.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event10.getFlowFileUuid()); + + final ProvenanceEventRecord event13 = reader.getEvent(secondBlockOffset, 13L); + assertNotNull(event13); + assertEquals(13L, event13.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event13.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event13.getFlowFileUuid()); + } + + for (int i=0; i < 2; i++) { + final ProvenanceEventRecord event2 = reader.getEvent(firstBlockOffset, 2L); + assertNotNull(event2); + assertEquals(2L, event2.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event2.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event2.getFlowFileUuid()); + + final ProvenanceEventRecord event6 = reader.getEvent(firstBlockOffset, 6L); + assertNotNull(event6); + assertEquals(6L, event6.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event6.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event6.getFlowFileUuid()); + } + } + } finally { + file.delete(); + } + } + + + @Test + public void testReadEventWithBlockOffsetThenPreviousBlockOffsetCompressed() throws IOException { + dos.writeBoolean(true); + final int firstBlockOffset = baos.size(); + writeRecords(0, 10, true); + + final int secondBlockOffset = baos.size(); + writeRecords(10, 10, true); + + // write data to a file so that we can read it with the journal reader + final File dir = new File("target/testData"); + final File file = new File(dir, UUID.randomUUID().toString() + ".journal"); + dir.mkdirs(); + + try (final FileOutputStream fos = new FileOutputStream(file)) { + baos.writeTo(fos); + } + + // read the record and verify its contents + try (final StandardJournalReader reader = new StandardJournalReader(file)) { + for (int j=0; j < 2; j++) { + for (int i=0; i < 2; i++) { + final ProvenanceEventRecord event10 = reader.getEvent(secondBlockOffset, 10L); + assertNotNull(event10); + assertEquals(10L, event10.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event10.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event10.getFlowFileUuid()); + + final ProvenanceEventRecord event13 = reader.getEvent(secondBlockOffset, 13L); + assertNotNull(event13); + assertEquals(13L, event13.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event13.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event13.getFlowFileUuid()); + } + + for (int i=0; i < 2; i++) { + final ProvenanceEventRecord event2 = reader.getEvent(firstBlockOffset, 2L); + assertNotNull(event2); + assertEquals(2L, event2.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event2.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event2.getFlowFileUuid()); + + final ProvenanceEventRecord event6 = reader.getEvent(firstBlockOffset, 6L); + assertNotNull(event6); + assertEquals(6L, event6.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event6.getEventType()); + assertEquals("00000000-0000-0000-0000-000000000000", event6.getFlowFileUuid()); + } + } + } finally { + file.delete(); + } + } + + + + + private void writeRecord(final long id) throws IOException { + writeRecord(id, dos); + } + + private void writeRecords(final long startId, final int numRecords, final boolean compressed) throws IOException { + if ( compressed ) { + final CompressionOutputStream compressedOut = new CompressionOutputStream(dos); + for (long id = startId; id < startId + numRecords; id++) { + writeRecord(id, new DataOutputStream(compressedOut)); + } + compressedOut.close(); + } else { + for (long id = startId; id < startId + numRecords; id++) { + writeRecord(id, dos); + } + } + } + + private void writeRecord(final long id, final DataOutputStream dos) throws IOException { + // Create prov event to add to the stream + final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.CREATE) + .setFlowFileUUID("00000000-0000-0000-0000-000000000000") + .setComponentType("Unit Test") + .setComponentId(UUID.randomUUID().toString()) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(System.currentTimeMillis() - 1000L) + .setLineageStartDate(System.currentTimeMillis() - 2000L) + .setCurrentContentClaim(null, null, null, null, 0L) + .build(); + + // Serialize the prov event + final ByteArrayOutputStream serializationStream = new ByteArrayOutputStream(); + final StandardEventSerializer serializer = new StandardEventSerializer(); + serializer.serialize(event, new DataOutputStream(serializationStream)); + + // Write out to our stream the event length, followed by the id, and then the serialized event + final int recordLen = 8 + serializationStream.size(); + + dos.writeInt(recordLen); + dos.writeLong(id); + serializationStream.writeTo(dos); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java new file mode 100644 index 0000000..d5eab8e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.UUID; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.journaling.TestUtil; +import org.apache.nifi.provenance.journaling.io.StandardEventDeserializer; +import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; +import org.apache.nifi.remote.io.CompressionInputStream; +import org.junit.Test; + +public class TestStandardJournalWriter { + + @Test + public void testOneBlockOneRecordWriteCompressed() throws IOException { + final File journalFile = new File("target/" + UUID.randomUUID().toString()); + + final StandardEventSerializer serializer = new StandardEventSerializer(); + try { + try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) { + writer.beginNewBlock(); + writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L); + writer.finishBlock(); + } + + final byte[] data = Files.readAllBytes(journalFile.toPath()); + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final DataInputStream dis = new DataInputStream(bais); + + final String codecName = dis.readUTF(); + assertEquals(StandardEventSerializer.CODEC_NAME, codecName); + + final int version = dis.readInt(); + assertEquals(1, version); + + // compression flag + assertEquals(true, dis.readBoolean()); + + // read block start + final CompressionInputStream decompressedIn = new CompressionInputStream(bais); + final StandardEventDeserializer deserializer = new StandardEventDeserializer(); + + final DataInputStream decompressedDis = new DataInputStream(decompressedIn); + final int eventLength = decompressedDis.readInt(); + assertEquals(131, eventLength); + final ProvenanceEventRecord event = deserializer.deserialize(decompressedDis, 0); + assertEquals(1, event.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event.getEventType()); + + assertEquals(-1, decompressedIn.read()); + } finally { + journalFile.delete(); + } + } + + @Test + public void testManyBlocksOneRecordWriteCompressed() throws IOException { + final File journalFile = new File("target/" + UUID.randomUUID().toString()); + + final StandardEventSerializer serializer = new StandardEventSerializer(); + try { + try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) { + for (int i=0; i < 1024; i++) { + writer.beginNewBlock(); + writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L); + writer.finishBlock(); + } + } + + final byte[] data = Files.readAllBytes(journalFile.toPath()); + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final DataInputStream dis = new DataInputStream(bais); + + final String codecName = dis.readUTF(); + assertEquals(StandardEventSerializer.CODEC_NAME, codecName); + + final int version = dis.readInt(); + assertEquals(1, version); + + // compression flag + assertEquals(true, dis.readBoolean()); + + // read block start + for (int i=0; i < 1024; i++) { + final CompressionInputStream decompressedIn = new CompressionInputStream(bais); + final StandardEventDeserializer deserializer = new StandardEventDeserializer(); + + final DataInputStream decompressedDis = new DataInputStream(decompressedIn); + final int eventLength = decompressedDis.readInt(); + assertEquals(131, eventLength); + final ProvenanceEventRecord event = deserializer.deserialize(decompressedDis, 0); + assertEquals(1, event.getEventId()); + assertEquals(ProvenanceEventType.CREATE, event.getEventType()); + + if ( i == 1023 ) { + assertEquals(-1, decompressedIn.read()); + } + } + } finally { + journalFile.delete(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java new file mode 100644 index 0000000..d5c4037 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocReader.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.toc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; + +import org.junit.Test; + +public class TestStandardTocReader { + + @Test + public void testDetectsCompression() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file)) { + out.write(0); + out.write(0); + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + } + } finally { + file.delete(); + } + + + try (final OutputStream out = new FileOutputStream(file)) { + out.write(0); + out.write(1); + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertTrue(reader.isCompressed()); + } + } finally { + file.delete(); + } + } + + + @Test + public void testGetBlockIndex() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file); + final DataOutputStream dos = new DataOutputStream(out)) { + out.write(0); + out.write(0); + + for (int i=0; i < 1024; i++) { + dos.writeLong(i * 1024L); + } + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + + for (int i=0; i < 1024; i++) { + assertEquals(i * 1024, reader.getBlockOffset(i)); + } + } + } finally { + file.delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index f4f9d12..777130e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -64,7 +64,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { // default property values public static final int DEFAULT_BUFFER_SIZE = 10000; - private final RingBuffer<ProvenanceEventRecord> ringBuffer; + private final RingBuffer<StoredProvenanceEvent> ringBuffer; private final List<SearchableField> searchableFields; private final List<SearchableField> searchableAttributes; private final ExecutorService queryExecService; @@ -123,17 +123,17 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } @Override - public void registerEvents(final Iterable<ProvenanceEventRecord> events) { + public void registerEvents(final Collection<ProvenanceEventRecord> events) { for (final ProvenanceEventRecord event : events) { registerEvent(event); } } @Override - public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException { - return ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() { + public List<StoredProvenanceEvent> getEvents(final long firstRecordId, final int maxRecords) throws IOException { + return ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() { @Override - public boolean select(final ProvenanceEventRecord value) { + public boolean select(final StoredProvenanceEvent value) { return value.getEventId() >= firstRecordId; } }, maxRecords); @@ -145,21 +145,22 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { return (newest == null) ? null : newest.getEventId(); } - public ProvenanceEventRecord getEvent(final String identifier) throws IOException { - final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() { + public StoredProvenanceEvent getEvent(final String identifier) throws IOException { + final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() { @Override - public boolean select(final ProvenanceEventRecord event) { + public boolean select(final StoredProvenanceEvent event) { return identifier.equals(event.getFlowFileUuid()); } }, 1); + return records.isEmpty() ? null : records.get(0); } @Override - public ProvenanceEventRecord getEvent(final long id) { - final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() { + public StoredProvenanceEvent getEvent(final long id) { + final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() { @Override - public boolean select(final ProvenanceEventRecord event) { + public boolean select(final StoredProvenanceEvent event) { return event.getEventId() == id; } }, 1); @@ -407,6 +408,44 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { throw new UnsupportedOperationException(); } + + @Override + public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException { + final List<StoredProvenanceEvent> events = new ArrayList<>(storageLocations.size()); + for ( final StorageLocation location : storageLocations ) { + if ( !(location instanceof IdLocation) ) { + throw new IllegalArgumentException("Illegal Storage Location"); + } + + final long id = ((IdLocation) location).getId(); + final StoredProvenanceEvent event = getEvent(id); + if ( event != null ) { + events.add(event); + } + } + return events; + } + + @Override + public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException { + if ( !(location instanceof IdLocation) ) { + throw new IllegalArgumentException("Illegal Storage Location"); + } + + final long id = ((IdLocation) location).getId(); + return getEvent(id); + } + + @Override + public Long getEarliestEventTime() throws IOException { + final List<StoredProvenanceEvent> events = getEvents(0L, 1); + if ( events.isEmpty() ) { + return null; + } + + return events.get(0).getEventTime(); + } + @Override public ComputeLineageSubmission submitExpandParents(final long eventId) { final ProvenanceEventRecord event = getEvent(eventId); @@ -432,9 +471,6 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } } - public Lineage expandSpawnEventChildren(final String identifier) { - throw new UnsupportedOperationException(); - } @Override public ComputeLineageSubmission submitExpandChildren(final long eventId) { @@ -465,9 +501,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1); lineageSubmissionMap.put(result.getLineageIdentifier(), result); - final Filter<ProvenanceEventRecord> filter = new Filter<ProvenanceEventRecord>() { + final Filter<StoredProvenanceEvent> filter = new Filter<StoredProvenanceEvent>() { @Override - public boolean select(final ProvenanceEventRecord event) { + public boolean select(final StoredProvenanceEvent event) { if (flowFileUuids.contains(event.getFlowFileUuid())) { return true; } @@ -495,12 +531,12 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { private static class QueryRunnable implements Runnable { - private final RingBuffer<ProvenanceEventRecord> ringBuffer; + private final RingBuffer<StoredProvenanceEvent> ringBuffer; private final Filter<ProvenanceEventRecord> filter; private final AsyncQuerySubmission submission; private final int maxRecords; - public QueryRunnable(final RingBuffer<ProvenanceEventRecord> ringBuffer, final Filter<ProvenanceEventRecord> filter, final int maxRecords, final AsyncQuerySubmission submission) { + public QueryRunnable(final RingBuffer<StoredProvenanceEvent> ringBuffer, final Filter<ProvenanceEventRecord> filter, final int maxRecords, final AsyncQuerySubmission submission) { this.ringBuffer = ringBuffer; this.filter = filter; this.submission = submission; @@ -511,10 +547,10 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { public void run() { // Retrieve the most recent results and count the total number of matches final IntegerHolder matchingCount = new IntegerHolder(0); - final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>(maxRecords); - ringBuffer.forEach(new ForEachEvaluator<ProvenanceEventRecord>() { + final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>(maxRecords); + ringBuffer.forEach(new ForEachEvaluator<StoredProvenanceEvent>() { @Override - public boolean evaluate(final ProvenanceEventRecord record) { + public boolean evaluate(final StoredProvenanceEvent record) { if (filter.select(record)) { if (matchingCount.incrementAndGet() <= maxRecords) { matchingRecords.add(record); @@ -532,20 +568,21 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { private static class ComputeLineageRunnable implements Runnable { - private final RingBuffer<ProvenanceEventRecord> ringBuffer; - private final Filter<ProvenanceEventRecord> filter; + private final RingBuffer<StoredProvenanceEvent> ringBuffer; + private final Filter<StoredProvenanceEvent> filter; private final AsyncLineageSubmission submission; - public ComputeLineageRunnable(final RingBuffer<ProvenanceEventRecord> ringBuffer, final Filter<ProvenanceEventRecord> filter, final AsyncLineageSubmission submission) { + public ComputeLineageRunnable(final RingBuffer<StoredProvenanceEvent> ringBuffer, final Filter<StoredProvenanceEvent> filter, final AsyncLineageSubmission submission) { this.ringBuffer = ringBuffer; this.filter = filter; this.submission = submission; } @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) public void run() { - final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(filter); - submission.getResult().update(records); + final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(filter); + submission.getResult().update((List) records); } } @@ -577,7 +614,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } } - private static class IdEnrichedProvEvent implements ProvenanceEventRecord { + private static class IdEnrichedProvEvent implements StoredProvenanceEvent { private final ProvenanceEventRecord record; private final long id; @@ -741,5 +778,23 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { public Long getPreviousContentClaimOffset() { return record.getPreviousContentClaimOffset(); } + + @Override + public StorageLocation getStorageLocation() { + return new IdLocation(getEventId()); + } } + + private static class IdLocation implements StorageLocation { + private final long id; + + public IdLocation(final long id) { + this.id = id; + } + + public long getId() { + return id; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java index 3c3e401..fd27470 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java @@ -65,7 +65,7 @@ public class TestVolatileProvenanceRepository { repo.registerEvent(builder.build()); } - final List<ProvenanceEventRecord> retrieved = repo.getEvents(0L, 12); + final List<StoredProvenanceEvent> retrieved = repo.getEvents(0L, 12); assertEquals(10, retrieved.size()); for (int i = 0; i < 10; i++) {