http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java new file mode 100644 index 0000000..fae427e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.io; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +public class StandardEventSerializer implements Serializer { + public static final String CODEC_NAME = "StandardProvCodec"; + + @Override + public int getVersion() { + return 1; + } + + @Override + public String getCodecName() { + return CODEC_NAME; + } + + @Override + public void serialize(final ProvenanceEventRecord event, final DataOutputStream out) throws IOException { + final ProvenanceEventType recordType = event.getEventType(); + + out.writeUTF(event.getEventType().name()); + out.writeLong(event.getEventTime()); + out.writeLong(event.getFlowFileEntryDate()); + out.writeLong(event.getEventDuration()); + + writeUUIDs(out, event.getLineageIdentifiers()); + out.writeLong(event.getLineageStartDate()); + + writeNullableString(out, event.getComponentId()); + writeNullableString(out, event.getComponentType()); + writeUUID(out, event.getFlowFileUuid()); + writeNullableString(out, event.getDetails()); + + // Write FlowFile attributes + final Map<String, String> attrs = event.getPreviousAttributes(); + out.writeInt(attrs.size()); + for (final Map.Entry<String, String> entry : attrs.entrySet()) { + writeLongString(out, entry.getKey()); + writeLongString(out, entry.getValue()); + } + + final Map<String, String> attrUpdates = event.getUpdatedAttributes(); + out.writeInt(attrUpdates.size()); + for (final Map.Entry<String, String> entry : attrUpdates.entrySet()) { + writeLongString(out, entry.getKey()); + writeLongNullableString(out, entry.getValue()); + } + + // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. + if (event.getContentClaimSection() != null && event.getContentClaimContainer() != null && event.getContentClaimIdentifier() != null) { + out.writeBoolean(true); + out.writeUTF(event.getContentClaimContainer()); + out.writeUTF(event.getContentClaimSection()); + out.writeUTF(event.getContentClaimIdentifier()); + if (event.getContentClaimOffset() == null) { + out.writeLong(0L); + } else { + out.writeLong(event.getContentClaimOffset()); + } + out.writeLong(event.getFileSize()); + } else { + out.writeBoolean(false); + } + + // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. + if (event.getPreviousContentClaimSection() != null && event.getPreviousContentClaimContainer() != null && event.getPreviousContentClaimIdentifier() != null) { + out.writeBoolean(true); + out.writeUTF(event.getPreviousContentClaimContainer()); + out.writeUTF(event.getPreviousContentClaimSection()); + out.writeUTF(event.getPreviousContentClaimIdentifier()); + if (event.getPreviousContentClaimOffset() == null) { + out.writeLong(0L); + } else { + out.writeLong(event.getPreviousContentClaimOffset()); + } + + if (event.getPreviousFileSize() == null) { + out.writeLong(0L); + } else { + out.writeLong(event.getPreviousFileSize()); + } + } else { + out.writeBoolean(false); + } + + // write out the identifier of the destination queue. + writeNullableString(out, event.getSourceQueueIdentifier()); + + // Write type-specific info + if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) { + writeUUIDs(out, event.getParentUuids()); + writeUUIDs(out, event.getChildUuids()); + } else if (recordType == ProvenanceEventType.RECEIVE) { + writeNullableString(out, event.getTransitUri()); + writeNullableString(out, event.getSourceSystemFlowFileIdentifier()); + } else if (recordType == ProvenanceEventType.SEND) { + writeNullableString(out, event.getTransitUri()); + } else if (recordType == ProvenanceEventType.ADDINFO) { + writeNullableString(out, event.getAlternateIdentifierUri()); + } else if (recordType == ProvenanceEventType.ROUTE) { + writeNullableString(out, event.getRelationship()); + } + } + + private void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException { + if (toWrite == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(toWrite); + } + } + + private void writeLongNullableString(final DataOutputStream out, final String toWrite) throws IOException { + if (toWrite == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + writeLongString(out, toWrite); + } + } + + private void writeLongString(final DataOutputStream out, final String value) throws IOException { + final byte[] bytes = value.getBytes("UTF-8"); + out.writeInt(bytes.length); + out.write(bytes); + } + + static void writeUUID(final DataOutputStream out, final String uuid) throws IOException { + final UUID uuidObj = UUID.fromString(uuid); + out.writeLong(uuidObj.getMostSignificantBits()); + out.writeLong(uuidObj.getLeastSignificantBits()); + } + + static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException { + if (list == null) { + out.writeInt(0); + } else { + out.writeInt(list.size()); + for (final String value : list) { + writeUUID(out, value); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java new file mode 100644 index 0000000..535d1dd --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface JournalReader extends Closeable { + + /** + * Retrieve a specific event from the journal, given the offset of the Block and the ID of the event + * @param blockOffset + * @param eventId + * @return + * @throws IOException + */ + ProvenanceEventRecord getEvent(long blockOffset, long eventId) throws IOException; + + /** + * Retrieve the next event in the journal, or <code>null</code> if no more events exist + * @return + * @throws IOException + */ + ProvenanceEventRecord nextEvent() throws IOException; + + /** + * Returns the current byte offset into the Journal from which the next event (if any) will be read + * @return + */ + long getPosition(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java new file mode 100644 index 0000000..5108f49 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.ProvenanceEventRecord; + +/** + * Responsible for writing events to an append-only journal, or write-ahead-log. Events are written in "Blocks." + * These Blocks are used so that if we are compressing data, we can compress individual Blocks. This allows us + * to store a "Block Index" so that we can quickly lookup the start of a Block when reading the data to quickly + * obtain the data that we need. + */ +public interface JournalWriter extends Closeable { + + /** + * Returns the identifier of this journal. The identifier is unique per 'section' of the repository + * @return + */ + long getJournalId(); + + /** + * Writes the given events to the journal and assigns the events sequential ID's starting with the + * ID given + * + * @param records + * @param firstRecordId + * @return + * @throws IOException + */ + void write(Collection<ProvenanceEventRecord> events, long firstEventId) throws IOException; + + /** + * Returns the File that the Journal is writing to + */ + File getJournalFile(); + + /** + * Synchronizes changes to the underlying file system + * @throws IOException + */ + void sync() throws IOException; + + /** + * Returns the size of the journal + * @return + */ + long getSize(); + + /** + * Returns the number of events that have been written to this journal + * @return + */ + int getEventCount(); + + /** + * Returns the amount of time that has elapsed since the point at which the writer was created. + * @param timeUnit + * @return + */ + long getAge(TimeUnit timeUnit); + + /** + * Marks the end of a Block in the output file. If the previous Block has been finished and no new + * Block has been started, this method will return silently without doing anything. + * @throws IOException + */ + void finishBlock() throws IOException; + + /** + * Starts a new Block in the output file. If a Block has already been started, this method throws + * an IllegalStateException + * + * @throws IOException + */ + void beginNewBlock() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java new file mode 100644 index 0000000..82ef39b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.io.Deserializer; +import org.apache.nifi.provenance.journaling.io.Deserializers; +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.MinimumLengthInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Standard implementation of {@link JournalReader}. This reader reads data that is written + * in the format specified by {@link StandardJournalWriter} + */ +public class StandardJournalReader implements JournalReader { + private static final Logger logger = LoggerFactory.getLogger(StandardJournalReader.class); + + private final File file; + + private ByteCountingInputStream compressedStream; + private ByteCountingInputStream decompressedStream; + + private Deserializer deserializer; + private int serializationVersion; + private boolean compressed; + + private long lastEventIdRead = -1L; + + + public StandardJournalReader(final File file) throws IOException { + this.file = file; + resetStreams(); + } + + private void resetStreams() throws IOException { + final InputStream bufferedIn = new BufferedInputStream(new FileInputStream(file)); + compressedStream = new ByteCountingInputStream(bufferedIn); + final DataInputStream dis = new DataInputStream(compressedStream); + final String codecName = dis.readUTF(); + serializationVersion = dis.readInt(); + compressed = dis.readBoolean(); + deserializer = Deserializers.getDeserializer(codecName); + + resetDecompressedStream(); + } + + private void resetDecompressedStream() throws IOException { + if ( compressed ) { + decompressedStream = new ByteCountingInputStream(new BufferedInputStream(new CompressionInputStream(compressedStream)), compressedStream.getBytesConsumed()); + } else { + decompressedStream = compressedStream; + } + } + + @Override + public ProvenanceEventRecord nextEvent() throws IOException { + return nextEvent(true); + } + + @Override + public long getPosition() { + return decompressedStream.getBytesConsumed(); + } + + private boolean isData(final InputStream in) throws IOException { + in.mark(1); + final int b = in.read(); + if ( b < 0 ) { + return false; + } + in.reset(); + + return true; + } + + ProvenanceEventRecord nextEvent(final boolean spanBlocks) throws IOException { + boolean isData = isData(decompressedStream); + if ( !isData ) { + if ( !spanBlocks ) { + return null; + } + + // we are allowed to span blocks. We're out of data but if we are compressed, it could + // just mean that the block has ended. + if ( !compressed ) { + return null; + } + + isData = isData(compressedStream); + if ( !isData ) { + return null; + } + + // There is no data in the compressed InputStream but there is in the underlying stream. + // This means we've hit the end of our block. We will create a new CompressionInputStream + // so that we can continue reading. + resetDecompressedStream(); + } + + try { + final DataInputStream dis = new DataInputStream(decompressedStream); + final int eventLength = dis.readInt(); + + final LimitingInputStream limitingInputStream = new LimitingInputStream(dis, eventLength); + final MinimumLengthInputStream minStream = new MinimumLengthInputStream(limitingInputStream, eventLength); + final ProvenanceEventRecord event = deserializer.deserialize(new DataInputStream(minStream), serializationVersion); + lastEventIdRead = event.getEventId(); + return event; + } catch (final EOFException eof) { + logger.warn("{} Found unexpected End-of-File when reading from journal", this); + return null; + } + } + + @Override + public ProvenanceEventRecord getEvent(final long blockOffset, final long eventId) throws IOException { + // If the requested event ID is less than the last event that we read, we need to reset to the beginning + // of the file. We do this because we know that the ID's are always increasing, so if we need an ID less + // than the previous ID, we have to go backward in the file. We can't do this with streams, so start the + // stream over. + if ( eventId < lastEventIdRead ) { + close(); + resetStreams(); + } + + final long bytesToSkip = blockOffset - compressedStream.getBytesConsumed(); + if ( bytesToSkip > 0 ) { + StreamUtils.skip(compressedStream, bytesToSkip); + resetDecompressedStream(); + } + + ProvenanceEventRecord event; + while ((event = nextEvent()) != null) { + if ( event.getEventId() == eventId ) { + return event; + } + } + + throw new IOException("Could not find event with ID " + eventId); + } + + @Override + public void close() throws IOException { + decompressedStream.close(); + } + + @Override + public String toString() { + return "StandardJournalReader[" + file + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java new file mode 100644 index 0000000..5a289fe --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.io.Serializer; +import org.apache.nifi.remote.io.CompressionOutputStream; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + + +/** + * <p> + * Standard implementation of {@link JournalWriter}. + * </p> + * + * <p> + * Writes out to a journal file using the format: + * + * <pre> + * <header> + * <begin block 1> + * <record 1> + * <record 2> + * <record 3> + * <end block 1> + * <begin block 2> + * <record 4> + * <record 5> + * <end block 2> + * ... + * <begin block N> + * <record N> + * <end block N> + * </pre> + * + * Where <header> is defined as: + * <pre> + * String: serialization codec name (retrieved from serializer) + * --> 2 bytes for length of string + * --> N bytes for actual serialization codec name + * int: serialization version + * boolean: compressed: 1 -> compressed, 0 -> not compressed + * </pre> + * + * And <record> is defined as: + * <pre> + * bytes 0-3: int: record length + * bytes 4-11: long: record id + * bytes 12-N: serialized event according to the applied {@link Serializer} + * </pre> + * </p> + * + * <p> + * The structure of the <begin block> and <end block> element depend on whether or not + * compression is enabled. If the journal is not compressed, these elements are 0 bytes. + * If the journal is compressed, these are the compression header and compression footer, respectively. + * </p> + * + */ +public class StandardJournalWriter implements JournalWriter { + private final long journalId; + private final File journalFile; + private final boolean compressed; + private final Serializer serializer; + private final long creationTime = System.nanoTime(); + + private int eventCount; + private boolean blockStarted = false; + + private final FileOutputStream fos; + private ByteCountingOutputStream uncompressedStream; + private OutputStream compressedStream; + private ByteCountingOutputStream out; + + + public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException { + this.journalId = journalId; + this.journalFile = journalFile; + this.compressed = compressed; + this.serializer = serializer; + this.fos = new FileOutputStream(journalFile); + + uncompressedStream = new ByteCountingOutputStream(fos); + writeHeader(uncompressedStream); + + if (compressed) { + compressedStream = new CompressionOutputStream(uncompressedStream); + } else { + compressedStream = fos; + } + + this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten()); + } + + private void writeHeader(final OutputStream out) throws IOException { + final DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(serializer.getCodecName()); + dos.writeInt(serializer.getVersion()); + dos.writeBoolean(compressed); + dos.flush(); + } + + @Override + public long getJournalId() { + return journalId; + } + + @Override + public void close() throws IOException { + finishBlock(); + + if ( compressedStream != null ) { + compressedStream.flush(); + compressedStream.close(); + } + } + + @Override + public void write(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream serializerDos = new DataOutputStream(baos); + + final BufferedOutputStream bos = new BufferedOutputStream(out); + final DataOutputStream outDos = new DataOutputStream(bos); + + try { + long id = firstEventId; + for ( final ProvenanceEventRecord event : events ) { + serializer.serialize(event, serializerDos); + serializerDos.flush(); + + final int recordLength = 8 + baos.size(); // record length is length of ID (8 bytes) plus length of serialized record + outDos.writeInt(recordLength); + outDos.writeLong(id++); + baos.writeTo(outDos); + baos.reset(); + + eventCount++; + } + } finally { + outDos.flush(); + } + } + + + @Override + public File getJournalFile() { + return journalFile; + } + + @Override + public void sync() throws IOException { + fos.getFD().sync(); + } + + @Override + public long getSize() { + return out.getBytesWritten(); + } + + @Override + public int getEventCount() { + return eventCount; + } + + @Override + public long getAge(final TimeUnit timeUnit) { + return timeUnit.convert(System.nanoTime() - creationTime, TimeUnit.NANOSECONDS); + } + + @Override + public void finishBlock() throws IOException { + if ( !blockStarted ) { + return; + } + + blockStarted = false; + if ( !compressed ) { + return; + } + + // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed + // such that calling close() will write out the Compression footer and become unusable but not + // close the underlying stream because the whole point of CompressionOutputStream as opposed to + // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single + // stream. + compressedStream.close(); + } + + @Override + public void beginNewBlock() throws IOException { + if ( blockStarted ) { + throw new IllegalStateException("Block is already started"); + } + blockStarted = true; + + if ( !compressed ) { + return; + } + if ( eventCount == 0 ) { + return; + } + + this.compressedStream = new CompressionOutputStream(uncompressedStream); + this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten()); + } + + @Override + public String toString() { + return "Journal Writer for " + journalFile; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java new file mode 100644 index 0000000..51f84a2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.partition; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.index.EventIndexSearcher; +import org.apache.nifi.provenance.journaling.index.LuceneIndexSearcher; +import org.apache.nifi.provenance.journaling.index.LuceneIndexWriter; +import org.apache.nifi.provenance.journaling.index.QueryUtils; +import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; +import org.apache.nifi.provenance.journaling.journals.JournalReader; +import org.apache.nifi.provenance.journaling.journals.JournalWriter; +import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; +import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter; +import org.apache.nifi.provenance.journaling.tasks.CompressionTask; +import org.apache.nifi.provenance.journaling.toc.StandardTocWriter; +import org.apache.nifi.provenance.journaling.toc.TocJournalReader; +import org.apache.nifi.provenance.journaling.toc.TocWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JournalingPartition implements Partition { + private static final Logger logger = LoggerFactory.getLogger(JournalingPartition.class); + private static final String JOURNAL_FILE_EXTENSION = ".journal"; + + private final String containerName; + private final String sectionName; + + private final File section; + private final File journalsDir; + private final JournalingRepositoryConfig config; + private final ExecutorService executor; + private final LuceneIndexWriter indexWriter; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private JournalWriter journalWriter; + private TocWriter tocWriter; + private int numEventsAtEndOfLastBlock = 0; + private volatile long maxEventId = -1L; + private volatile Long earliestEventTime = null; + + public JournalingPartition(final String containerName, final String sectionName, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException { + this.containerName = containerName; + this.sectionName = sectionName; + this.section = sectionDir; + this.journalsDir = new File(section, "journals"); + this.config = config; + this.executor = executor; + + if (!journalsDir.exists() && !journalsDir.mkdirs()) { + throw new IOException("Could not create directory " + section); + } + + if ( journalsDir.exists() && journalsDir.isFile() ) { + throw new IOException("Could not create directory " + section + " because a file already exists with this name"); + } + + if ( config.isReadOnly() ) { + indexWriter = null; + } else { + final File indexDir = new File(section, "index"); + indexWriter = new LuceneIndexWriter(indexDir, config); + } + } + + + public EventIndexSearcher newIndexSearcher() throws IOException { + if (config.isReadOnly()) { + return new LuceneIndexSearcher(new File(section, "index")); + } + + return indexWriter.newIndexSearcher(); + } + + protected JournalWriter getJournalWriter(final long firstEventId) throws IOException { + if ( config.isReadOnly() ) { + throw new IllegalStateException("Cannot update repository because it is read-only"); + } + + if (isRolloverNecessary()) { + rollover(firstEventId); + } + + return journalWriter; + } + + @Override + public List<JournaledProvenanceEvent> registerEvents(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException { + writeLock.lock(); + try { + final JournalWriter writer = getJournalWriter(firstEventId); + + if ( !events.isEmpty() ) { + final int eventsWritten = writer.getEventCount(); + if ( eventsWritten - numEventsAtEndOfLastBlock > config.getBlockSize() ) { + writer.finishBlock(); + tocWriter.addBlockOffset(writer.getSize()); + numEventsAtEndOfLastBlock = eventsWritten; + writer.beginNewBlock(); + } + } + + writer.write(events, firstEventId); + + final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(events.size()); + long id = firstEventId; + for (final ProvenanceEventRecord event : events) { + final JournaledStorageLocation location = new JournaledStorageLocation(containerName, sectionName, + String.valueOf(writer.getJournalId()), tocWriter.getCurrentBlockIndex(), id++); + final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location); + storedEvents.add(storedEvent); + } + + indexWriter.index(storedEvents); + + if ( config.isAlwaysSync() ) { + writer.sync(); + } + + // update the maxEventId; we don't need a compareAndSet because the AtomicLong is modified + // only within a write lock. But we use AtomicLong so that we + if ( id > maxEventId ) { + maxEventId = id; + } + + if ( earliestEventTime == null ) { + Long earliest = null; + for ( final ProvenanceEventRecord event : events ) { + if ( earliest == null || event.getEventTime() < earliest ) { + earliest = event.getEventTime(); + } + } + + earliestEventTime = earliest; + } + + return storedEvents; + } finally { + writeLock.unlock(); + } + } + + // MUST be called with either the read lock or write lock held. + // determines whether or not we need to roll over the journal writer and toc writer. + private boolean isRolloverNecessary() { + if ( journalWriter == null ) { + return true; + } + + final long ageSeconds = journalWriter.getAge(TimeUnit.SECONDS); + final long rolloverSeconds = config.getJournalRolloverPeriod(TimeUnit.SECONDS); + if ( ageSeconds >= rolloverSeconds ) { + return true; + } + + if ( journalWriter.getSize() > config.getJournalCapacity() ) { + return true; + } + + return false; + } + + // MUST be called with write lock held. + private void rollover(final long firstEventId) throws IOException { + // if we have a writer already, close it and initiate rollover actions + if ( journalWriter != null ) { + journalWriter.finishBlock(); + journalWriter.close(); + tocWriter.close(); + indexWriter.sync(); + + if ( config.isCompressOnRollover() ) { + final File finishedFile = journalWriter.getJournalFile(); + final File finishedTocFile = tocWriter.getFile(); + executor.submit(new CompressionTask(finishedFile, journalWriter.getJournalId(), finishedTocFile)); + } + } + + // create new writers and reset state. + final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION); + journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer()); + tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false); + numEventsAtEndOfLastBlock = 0; + } + + + private Long getJournalId(final File file) { + long journalId; + final int dotIndex = file.getName().indexOf("."); + if ( dotIndex < 0 ) { + journalId = 0L; + } else { + try { + journalId = Long.parseLong(file.getName().substring(0, dotIndex)); + } catch (final NumberFormatException nfe) { + return null; + } + } + + return journalId; + } + + @Override + public void restore() throws IOException { + // delete or rename files if stopped during rollover; compress any files that haven't been compressed + if ( !config.isReadOnly() ) { + final File[] children = journalsDir.listFiles(); + if ( children != null ) { + // find the latest journal. + File latestJournal = null; + long latestJournalId = -1L; + + final List<File> journalFiles = new ArrayList<>(); + + // find any journal files that either haven't been compressed or were partially compressed when + // we last shutdown and then restart compression. + for ( final File file : children ) { + final String filename = file.getName(); + if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) { + continue; + } + + final Long journalId = getJournalId(file); + if ( journalId != null && journalId > latestJournalId ) { + latestJournal = file; + latestJournalId = journalId; + } + + journalFiles.add(file); + + if ( !config.isCompressOnRollover() ) { + continue; + } + + if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) { + final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, "")); + if ( uncompressedFile.exists() ) { + // both the compressed and uncompressed version of this journal exist. The Compression Task was + // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task. + final File tocFile = QueryUtils.getTocFile(uncompressedFile); + executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile)); + } else { + // The compressed file exists but the uncompressed file does not. This means that we have finished + // writing the compressed file and deleted the original journal file but then shutdown before + // renaming the compressed file to the original filename. We can simply rename the compressed file + // to the original file and then address the TOC file. + final boolean rename = CompressionTask.rename(file, uncompressedFile); + if ( !rename ) { + logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile); + continue; + } + + // Check if the compressed TOC file exists. If not, we are finished. + // If it does exist, then we know that it is complete, as described above, so we will go + // ahead and replace the uncompressed version. + final File tocFile = QueryUtils.getTocFile(uncompressedFile); + final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION); + if ( !compressedTocFile.exists() ) { + continue; + } + + tocFile.delete(); + + final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile); + if ( !renamedTocFile ) { + logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile); + } + } + } + } + + // Get the first event in the earliest journal file so that we know what the earliest time available is + Collections.sort(journalFiles, new Comparator<File>() { + @Override + public int compare(final File o1, final File o2) { + return Long.compare(getJournalId(o1), getJournalId(o2)); + } + }); + + for ( final File journal : journalFiles ) { + try (final JournalReader reader = new StandardJournalReader(journal)) { + final ProvenanceEventRecord record = reader.nextEvent(); + this.earliestEventTime = record.getEventTime(); + break; + } catch (final IOException ioe) { + } + } + + // Whatever was the last journal for this partition, we need to remove anything for that journal + // from the index and re-add them, and then sync the index. This allows us to avoid syncing + // the index each time (we sync only on rollover) but allows us to still ensure that we index + // all events. + if ( latestJournal != null ) { + try { + reindex(latestJournal); + } catch (final EOFException eof) { + } + } + } + } + } + + + private void reindex(final File journalFile) throws IOException { + try (final TocJournalReader reader = new TocJournalReader(containerName, sectionName, String.valueOf(getJournalId(journalFile)), journalFile)) { + indexWriter.delete(containerName, sectionName, String.valueOf(getJournalId(journalFile))); + + long maxId = -1L; + final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(1000); + JournaledProvenanceEvent event; + while ((event = reader.nextJournaledEvent()) != null ) { + storedEvents.add(event); + maxId = event.getEventId(); + + if ( storedEvents.size() == 1000 ) { + indexWriter.index(storedEvents); + storedEvents.clear(); + } + } + + if ( !storedEvents.isEmpty() ) { + indexWriter.index(storedEvents); + } + + indexWriter.sync(); + this.maxEventId = maxId; + } + } + + + @Override + public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxRecords) throws IOException { + try (final EventIndexSearcher searcher = indexWriter.newIndexSearcher()) { + return searcher.getEvents(minEventId, maxRecords); + } + } + + @Override + public void shutdown() { + if ( journalWriter != null ) { + try { + journalWriter.finishBlock(); + } catch (final IOException ioe) { + logger.warn("Failed to finish writing Block to {} due to {}", journalWriter, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + + try { + journalWriter.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", journalWriter, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + + try { + tocWriter.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", tocWriter, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + if ( indexWriter != null ) { + try { + indexWriter.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", indexWriter, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + + @Override + public long getMaxEventId() { + return maxEventId; + } + + @Override + public Long getEarliestEventTime() throws IOException { + return earliestEventTime; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java new file mode 100644 index 0000000..e77c8d5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.partition; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.index.EventIndexSearcher; + + +/** + * Represents a single Partition of the Journaling Provenance Repository. The repository is split into multiple + * partitions in order to provide higher throughput. + * + * Implementations of this interface MUST be threadsafe. + */ +public interface Partition { + + /** + * Returns a new EventIndexSearcher that can be used to search the events in this partition + * @return + * @throws IOException + */ + EventIndexSearcher newIndexSearcher() throws IOException; + + /** + * Registers the given events with this partition. This includes persisting the events and indexing + * them so that they are searchable. + * @param events + * @return + */ + List<JournaledProvenanceEvent> registerEvents(Collection<ProvenanceEventRecord> events, long firstEventId) throws IOException; + + /** + * Restore state after a restart of NiFi + */ + void restore() throws IOException; + + /** + * Shuts down the Partition so that it can no longer be used + */ + void shutdown(); + + /** + * Returns the largest event ID stored in this partition + * @return + */ + long getMaxEventId(); + + /** + * Returns the locations of events that have an id at least equal to minEventId, returning the events + * with the smallest ID's possible that are greater than minEventId + * + * @param minEventId + * @param maxRecords + * @return + */ + List<JournaledStorageLocation> getEvents(long minEventId, int maxRecords) throws IOException; + + /** + * Returns the timestamp of the earliest event in this Partition, or <code>null</code> if the Partition + * contains no events + * @return + * @throws IOException + */ + Long getEarliestEventTime() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java new file mode 100644 index 0000000..8c680f5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.partition; + +import java.io.IOException; + +public interface PartitionAction<T> { + T perform(Partition partition) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java new file mode 100644 index 0000000..edbf75b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.partition; + +import java.io.IOException; +import java.util.Set; + + +/** + * The PartitionManager is responsible for accessing and maintaining the Partitions so that they are + * are written to efficiently and in a thread-safe manner. + */ +public interface PartitionManager { + + /** + * Performs the given action against one of the partitions + * + * @param action the action to perform + * @param writeAction specifies whether or not the action writes to the repository + * @return + * @throws IOException + */ + <T> T withPartition(PartitionAction<T> action, boolean writeAction) throws IOException; + + /** + * Performs the given action against one of the partitions + * + * @param action the action to perform + * @param writeAction specifies whether or not the action writes to the repository + * @throws IOException + */ + void withPartition(VoidPartitionAction action, boolean writeAction) throws IOException; + + /** + * Performs the given Action on each partition and returns the set of results. + * + * @param action the action to perform + * @param writeAction specifies whether or not the action writes to the repository + * @return + */ + <T> Set<T> withEachPartition(PartitionAction<T> action) throws IOException; + + /** + * Performs the given Action to each partition, optionally waiting for the action to complete + * @param action + * @param writeAction + * @param async if <code>true</code>, will perform the action asynchronously; if <code>false</code>, will + * wait for the action to complete before returning + */ + void withEachPartition(VoidPartitionAction action, boolean async); + + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java new file mode 100644 index 0000000..4ac0fc6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.partition; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.util.Tuple; + +public class QueuingPartitionManager implements PartitionManager { + + private final JournalingRepositoryConfig config; + private final BlockingQueue<Partition> partitionQueue; + private final JournalingPartition[] partitionArray; + private final ExecutorService executor; + private volatile boolean shutdown = false; + + private final AtomicInteger blacklistedCount = new AtomicInteger(0); + + public QueuingPartitionManager(final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException { + this.config = config; + this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount()); + this.partitionArray = new JournalingPartition[config.getPartitionCount()]; + + final List<Tuple<String, File>> containerTuples = new ArrayList<>(config.getContainers().size()); + for ( final Map.Entry<String, File> entry : config.getContainers().entrySet() ) { + containerTuples.add(new Tuple<>(entry.getKey(), entry.getValue())); + } + + for (int i=0; i < config.getPartitionCount(); i++) { + final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size()); + final File section = new File(tuple.getValue(), String.valueOf(i)); + + final JournalingPartition partition = new JournalingPartition(tuple.getKey(), String.valueOf(i), section, config, executor); + partitionQueue.offer(partition); + partitionArray[i] = partition; + } + + this.executor = executor; + } + + @Override + public void shutdown() { + this.shutdown = true; + + for ( final Partition partition : partitionArray ) { + partition.shutdown(); + } + } + + private Partition nextPartition() { + Partition partition = null; + + while(partition == null) { + if (shutdown) { + throw new RuntimeException("Journaling Provenance Repository is shutting down"); + } + + try { + partition = partitionQueue.poll(1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + } + + if ( partition == null ) { + if ( blacklistedCount.get() >= config.getPartitionCount() ) { + throw new RuntimeException("Cannot persist to the Journal Provenance Repository because all partitions have been blacklisted due to write failures"); + } + } + } + + return partition; + } + + @Override + public <T> T withPartition(final PartitionAction<T> action, final boolean writeAction) throws IOException { + final Partition partition = nextPartition(); + + boolean ioe = false; + try { + return action.perform(partition); + } catch (final IOException e) { + ioe = true; + throw e; + } finally { + if ( ioe && writeAction ) { + // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted! + blacklistedCount.incrementAndGet(); + } else { + partitionQueue.offer(partition); + } + } + } + + @Override + public void withPartition(final VoidPartitionAction action, final boolean writeAction) throws IOException { + final Partition partition = nextPartition(); + + boolean ioe = false; + try { + action.perform(partition); + } catch (final IOException e) { + ioe = true; + throw e; + } finally { + if ( ioe && writeAction ) { + // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted! + blacklistedCount.incrementAndGet(); + } else { + partitionQueue.offer(partition); + } + } + } + + + @Override + public <T> Set<T> withEachPartition(final PartitionAction<T> action) throws IOException { + final Set<T> results = new HashSet<>(partitionArray.length); + + // TODO: Do not use blacklisted partitions. + final Map<Partition, Future<T>> futures = new HashMap<>(partitionArray.length); + for ( final Partition partition : partitionArray ) { + final Callable<T> callable = new Callable<T>() { + @Override + public T call() throws Exception { + return action.perform(partition); + } + }; + + final Future<T> future = executor.submit(callable); + futures.put(partition, future); + } + + for ( final Map.Entry<Partition, Future<T>> entry : futures.entrySet() ) { + try { + final T result = entry.getValue().get(); + results.add(result); + } catch (final ExecutionException ee) { + final Throwable cause = ee.getCause(); + if ( cause instanceof IOException ) { + throw (IOException) cause; + } else { + throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + return results; + } + + @Override + public void withEachPartition(final VoidPartitionAction action, final boolean async) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java new file mode 100644 index 0000000..beaa187 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.partition; + +import java.io.IOException; + +public interface VoidPartitionAction { + void perform(Partition partition) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java new file mode 100644 index 0000000..c23a405 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.tasks; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; +import org.apache.nifi.provenance.journaling.journals.JournalReader; +import org.apache.nifi.provenance.journaling.journals.JournalWriter; +import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; +import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter; +import org.apache.nifi.provenance.journaling.toc.StandardTocReader; +import org.apache.nifi.provenance.journaling.toc.StandardTocWriter; +import org.apache.nifi.provenance.journaling.toc.TocReader; +import org.apache.nifi.provenance.journaling.toc.TocWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompressionTask implements Runnable { + public static final String FILE_EXTENSION = ".compress"; + + private static final Logger logger = LoggerFactory.getLogger(CompressionTask.class); + + private final File journalFile; + private final long journalId; + private final File tocFile; + + public CompressionTask(final File journalFile, final long journalId, final File tocFile) { + this.journalFile = journalFile; + this.journalId = journalId; + this.tocFile = tocFile; + } + + public void compress(final JournalReader reader, final JournalWriter writer, final TocReader tocReader, final TocWriter tocWriter) throws IOException { + ProvenanceEventRecord event; + + int blockIndex = 0; + long blockOffset = tocReader.getBlockOffset(blockIndex); + tocWriter.addBlockOffset(blockOffset); + long nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1); + + try { + while ((event = reader.nextEvent()) != null) { + // Check if we've gone beyond the offset of the next block. If so, write + // out a new block in the TOC. + final long newPosition = reader.getPosition(); + if ( newPosition > nextBlockOffset && nextBlockOffset > 0 ) { + blockIndex++; + blockOffset = tocReader.getBlockOffset(blockIndex); + tocWriter.addBlockOffset(writer.getSize()); + + nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1); + } + + // Write the event to the compressed writer + writer.write(Collections.singleton(event), event.getEventId()); + } + } catch (final EOFException eof) { + logger.warn("Found unexpected End-of-File when compressing {}", reader); + } + } + + /** + * Attempts to delete the given file up to 10 times, waiting a bit in between each failed + * iteration, in case another process (for example, a virus scanner) has the file locked + * + * @param file + * @return + */ + private boolean delete(final File file) { + for (int i=0; i < 10; i++) { + if ( file.delete() || !file.exists() ) { + return true; + } + + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + } + } + + return false; + } + + /** + * Attempts to rename the given original file to the renamed file up to 20 times, waiting a bit + * in between each failed iteration, in case another process (for example, a virus scanner) has + * the file locked + * + * @param original + * @param renamed + * @return + */ + public static boolean rename(final File original, final File renamed) { + for (int i=0; i < 20; i++) { + if ( original.renameTo(renamed) ) { + return true; + } + + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + } + } + + return false; + } + + @Override + public void run() { + try { + final File compressedFile = new File(journalFile.getParentFile(), journalFile.getName() + FILE_EXTENSION); + final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + FILE_EXTENSION); + + try (final JournalReader journalReader = new StandardJournalReader(journalFile); + final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer()); + final TocReader tocReader = new StandardTocReader(tocFile); + final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true)) { + + compress(journalReader, compressedWriter, tocReader, compressedTocWriter); + compressedWriter.sync(); + } + + final boolean deletedJournal = delete(journalFile); + if ( !deletedJournal ) { + delete(compressedFile); + delete(compressedTocFile); + logger.error("Failed to remove Journal file {}; considering compression task a failure", journalFile); + return; + } + + final boolean deletedToc = delete(tocFile); + if ( !deletedToc ) { + delete(compressedFile); + delete(compressedTocFile); + logger.error("Failed to remove TOC file for {}; considering compression task a failure", journalFile); + return; + } + + final boolean renamedJournal = rename(compressedFile, journalFile); + if ( !renamedJournal ) { + logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedFile, journalFile); + } + + final boolean renamedToc = rename(compressedTocFile, tocFile); + if ( !renamedToc ) { + logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedTocFile, tocFile); + } + + logger.info("Successfully compressed Journal File {}"); + } catch (final IOException ioe) { + logger.error("Failed to compress Journal File {} due to {}", journalFile, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java new file mode 100644 index 0000000..995acf9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.toc; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +/** + * Standard implementation of TocReader. + * + * Expects .toc file to be in the following format; + * + * byte 0: version + * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed + * byte 2-9: long: offset of block 0 + * byte 10-17: long: offset of block 1 + * ... + * byte (N*8+2)-(N*8+9): long: offset of block N + */ +public class StandardTocReader implements TocReader { + private final boolean compressed; + private final long[] offsets; + + public StandardTocReader(final File file) throws IOException { + try (final FileInputStream fis = new FileInputStream(file); + final DataInputStream dis = new DataInputStream(fis)) { + + final int version = dis.read(); + if ( version < 0 ) { + throw new EOFException(); + } + + final int compressionFlag = dis.read(); + if ( compressionFlag < 0 ) { + throw new EOFException(); + } + + if ( compressionFlag == 0 ) { + compressed = false; + } else if ( compressionFlag == 1 ) { + compressed = true; + } else { + throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); + } + + final int numBlocks = (int) ((file.length() - 2) / 8); + offsets = new long[numBlocks]; + + for (int i=0; i < numBlocks; i++) { + offsets[i] = dis.readLong(); + } + } + } + + @Override + public boolean isCompressed() { + return compressed; + } + + @Override + public long getBlockOffset(final int blockIndex) { + if ( blockIndex >= offsets.length ) { + return -1L; + } + return offsets[blockIndex]; + } + + @Override + public void close() throws IOException { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java new file mode 100644 index 0000000..6058282 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.toc; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; + +/** + * Standard implementation of {@link TocWriter}. + * + * Format of .toc file: + * byte 0: version + * byte 1: compressed: 0 -> not compressed, 1 -> compressed + * byte 2-9: long: offset of block 0 + * byte 10-17: long: offset of block 1 + * ... + * byte (N*8+2)-(N*8+9): long: offset of block N + */ +public class StandardTocWriter implements TocWriter { + public static final byte VERSION = 1; + + private final File file; + private final FileOutputStream fos; + private int index = 0; + + /** + * Creates a StandardTocWriter that writes to the given file. + * @param file the file to write to + * @param compressionFlag whether or not the journal is compressed + * @throws FileNotFoundException + */ + public StandardTocWriter(final File file, final boolean compressionFlag) throws IOException { + if ( file.exists() ) { + throw new FileAlreadyExistsException(file.getAbsolutePath()); + } + + if ( !file.getParentFile().exists() && !file.getParentFile().mkdirs() ) { + throw new IOException("Could not create directory " + file.getParent()); + } + + this.file = file; + fos = new FileOutputStream(file); + + fos.write(VERSION); + fos.write(compressionFlag ? 1 : 0); + fos.flush(); + fos.getFD().sync(); + } + + @Override + public void addBlockOffset(final long offset) throws IOException { + final BufferedOutputStream bos = new BufferedOutputStream(fos); + final DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(offset); + dos.flush(); + + fos.getFD().sync(); + } + + @Override + public int getCurrentBlockIndex() { + return index; + } + + @Override + public void close() throws IOException { + fos.close(); + } + + @Override + public File getFile() { + return file; + } + + @Override + public String toString() { + return "TOC Writer for " + file; + } +}