NIFI-388: Refactored to make compression codec flexible
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/57ba3bc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/57ba3bc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/57ba3bc0 Branch: refs/heads/prov-query-language Commit: 57ba3bc046202e120b2e60fecc03d74fa0e1f18a Parents: cb05722 Author: Mark Payne <[email protected]> Authored: Mon Mar 2 13:48:22 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Mar 2 13:48:22 2015 -0500 ---------------------------------------------------------------------- .../journals/CompressedOutputStream.java | 36 ++++++ .../journaling/journals/CompressionCodec.java | 45 ++++++++ .../journals/DeflatorCompressionCodec.java | 113 +++++++++++++++++++ .../journals/StandardJournalReader.java | 21 ++-- .../journals/StandardJournalWriter.java | 45 ++++---- .../partition/JournalingPartition.java | 2 +- .../journaling/tasks/CompressionTask.java | 3 +- .../journals/TestJournalReadWrite.java | 3 +- .../journals/TestStandardJournalReader.java | 11 ++ .../journals/TestStandardJournalWriter.java | 16 ++- 10 files changed, 257 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java new file mode 100644 index 0000000..618dd88 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.IOException; +import java.io.OutputStream; + +public abstract class CompressedOutputStream extends OutputStream { + + /** + * Begins a new compression block + * @throws IOException + */ + public abstract void beginNewBlock() throws IOException; + + /** + * Ends the current compression block + * @throws IOException + */ + public abstract void finishBlock() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java new file mode 100644 index 0000000..f6e856e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface CompressionCodec { + /** + * Returns the name of the compression codec + * @return + */ + String getName(); + + /** + * Wraps the given OutputStream so that data written will be compressed + * @param out + * @return + * @throws IOException + */ + CompressedOutputStream newCompressionOutputStream(OutputStream out) throws IOException; + + /** + * Wraps the given InputStream so that data read will be decompressed + * @param in + * @return + * @throws IOException + */ + InputStream newCompressionInputStream(InputStream in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java new file mode 100644 index 0000000..b9f2959 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.journals; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; + +public class DeflatorCompressionCodec implements CompressionCodec { + public static final String DEFLATOR_COMPRESSION_CODEC = "deflator-compression-codec"; + + @Override + public String getName() { + return DEFLATOR_COMPRESSION_CODEC; + } + + @Override + public CompressedOutputStream newCompressionOutputStream(final OutputStream out) throws IOException { + return new DeflatorOutputStream(out); + } + + @Override + public InputStream newCompressionInputStream(final InputStream in) throws IOException { + return new CompressionInputStream(in); + } + + + private static class DeflatorOutputStream extends CompressedOutputStream { + private final OutputStream originalOut; + private CompressionOutputStream compressionOutput; + + public DeflatorOutputStream(final OutputStream out) { + this.originalOut = out; + } + + private void verifyState() { + if ( compressionOutput == null ) { + throw new IllegalStateException("No Compression Block has been created"); + } + } + + @Override + public void write(final int b) throws IOException { + verifyState(); + compressionOutput.write(b); + } + + @Override + public void write(final byte[] b) throws IOException { + verifyState(); + compressionOutput.write(b); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + verifyState(); + compressionOutput.write(b, off, len); + } + + @Override + public void flush() throws IOException { + if ( compressionOutput != null ) { + compressionOutput.flush(); + } + } + + @Override + public void close() throws IOException { + if ( compressionOutput != null ) { + compressionOutput.close(); + } + + originalOut.close(); + } + + @Override + public void beginNewBlock() throws IOException { + compressionOutput = new CompressionOutputStream(originalOut); + } + + @Override + public void finishBlock() throws IOException { + // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed + // such that calling close() will write out the Compression footer and become unusable but not + // close the underlying stream because the whole point of CompressionOutputStream as opposed to + // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single + // stream. + if ( compressionOutput == null ) { + return; + } else { + compressionOutput.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java index 13878f8..9a937b4 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java @@ -27,7 +27,6 @@ import java.io.InputStream; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.journaling.io.Deserializer; import org.apache.nifi.provenance.journaling.io.Deserializers; -import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.MinimumLengthInputStream; @@ -49,7 +48,7 @@ public class StandardJournalReader implements JournalReader { private Deserializer deserializer; private int serializationVersion; - private boolean compressed; + private CompressionCodec compressionCodec = null; private long lastEventIdRead = -1L; @@ -68,7 +67,15 @@ public class StandardJournalReader implements JournalReader { StandardJournalMagicHeader.read(dis); final String codecName = dis.readUTF(); serializationVersion = dis.readInt(); - compressed = dis.readBoolean(); + final boolean compressed = dis.readBoolean(); + if ( compressed ) { + final String compressionCodecName = dis.readUTF(); + if ( DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC.equals(compressionCodecName) ) { + compressionCodec = new DeflatorCompressionCodec(); + } else { + throw new IOException(file + " is compressed using unknown Compression Codec " + compressionCodecName); + } + } deserializer = Deserializers.getDeserializer(codecName); resetDecompressedStream(); @@ -83,10 +90,10 @@ public class StandardJournalReader implements JournalReader { private void resetDecompressedStream() throws IOException { - if ( compressed ) { - decompressedStream = new ByteCountingInputStream(new BufferedInputStream(new CompressionInputStream(compressedStream)), compressedStream.getBytesConsumed()); - } else { + if ( compressionCodec == null ) { decompressedStream = compressedStream; + } else { + decompressedStream = new ByteCountingInputStream(new BufferedInputStream(compressionCodec.newCompressionInputStream(compressedStream)), compressedStream.getBytesConsumed()); } } @@ -129,7 +136,7 @@ public class StandardJournalReader implements JournalReader { // we are allowed to span blocks. We're out of data but if we are compressed, it could // just mean that the block has ended. - if ( !compressed ) { + if ( compressionCodec == null ) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java index a9cb361..d18b05b 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.journaling.io.Serializer; -import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; @@ -67,11 +66,13 @@ import org.slf4j.LoggerFactory; * * Where <header> is defined as: * <pre> + * magic header "NiFiProvJournal_1" * String: serialization codec name (retrieved from serializer) * --> 2 bytes for length of string * --> N bytes for actual serialization codec name * int: serialization version * boolean: compressed: 1 -> compressed, 0 -> not compressed + * String : if compressed, name of compression codec; otherwise, not present * </pre> * * And <record> is defined as: @@ -94,7 +95,7 @@ public class StandardJournalWriter implements JournalWriter { private final long journalId; private final File journalFile; - private final boolean compressed; + private final CompressionCodec compressionCodec; private final Serializer serializer; private final long creationTime = System.nanoTime(); private final String description; @@ -111,7 +112,7 @@ public class StandardJournalWriter implements JournalWriter { private long recordCount = 1L; - public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException { + public StandardJournalWriter(final long journalId, final File journalFile, final CompressionCodec compressionCodec, final Serializer serializer) throws IOException { if ( journalFile.exists() ) { // Check if there is actually any data here. try (final InputStream fis = new FileInputStream(journalFile); @@ -133,7 +134,7 @@ public class StandardJournalWriter implements JournalWriter { this.journalId = journalId; this.journalFile = journalFile; - this.compressed = compressed; + this.compressionCodec = compressionCodec; this.serializer = serializer; this.description = "Journal Writer for " + journalFile; this.fos = new FileOutputStream(journalFile); @@ -141,8 +142,10 @@ public class StandardJournalWriter implements JournalWriter { uncompressedStream = new ByteCountingOutputStream(fos); writeHeader(uncompressedStream); - if (compressed) { - compressedStream = new CompressionOutputStream(uncompressedStream); + if (compressionCodec != null) { + final CompressedOutputStream cos = compressionCodec.newCompressionOutputStream(uncompressedStream); + cos.beginNewBlock(); + compressedStream = cos; } else { compressedStream = fos; } @@ -155,7 +158,13 @@ public class StandardJournalWriter implements JournalWriter { StandardJournalMagicHeader.write(out); dos.writeUTF(serializer.getCodecName()); dos.writeInt(serializer.getVersion()); + + final boolean compressed = compressionCodec != null; dos.writeBoolean(compressed); + if ( compressed ) { + dos.writeUTF(compressionCodec.getName()); + } + dos.flush(); } @@ -258,6 +267,7 @@ public class StandardJournalWriter implements JournalWriter { public long getAge(final TimeUnit timeUnit) { return timeUnit.convert(System.nanoTime() - creationTime, TimeUnit.NANOSECONDS); } + @Override public void finishBlock() throws IOException { @@ -266,16 +276,10 @@ public class StandardJournalWriter implements JournalWriter { } blockStarted = false; - if ( !compressed ) { - return; + + if ( compressedStream instanceof CompressedOutputStream ) { + ((CompressedOutputStream) compressedStream).finishBlock(); } - - // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed - // such that calling close() will write out the Compression footer and become unusable but not - // close the underlying stream because the whole point of CompressionOutputStream as opposed to - // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single - // stream. - compressedStream.close(); } @Override @@ -285,15 +289,10 @@ public class StandardJournalWriter implements JournalWriter { } blockStarted = true; - if ( !compressed ) { - return; - } - if ( eventCount == 0 ) { - return; + if ( compressedStream instanceof CompressedOutputStream ) { + ((CompressedOutputStream) compressedStream).beginNewBlock(); + this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten()); } - - this.compressedStream = new CompressionOutputStream(uncompressedStream); - this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java index bba6899..31371af 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java @@ -252,7 +252,7 @@ public class JournalingPartition implements Partition { // create new writers and reset state. final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION); - journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer()); + journalWriter = new StandardJournalWriter(firstEventId, journalFile, null, new StandardEventSerializer()); try { tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync()); tocWriter.addBlockOffset(journalWriter.getSize()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java index fc9fb46..7977620 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; +import org.apache.nifi.provenance.journaling.journals.DeflatorCompressionCodec; import org.apache.nifi.provenance.journaling.journals.JournalReader; import org.apache.nifi.provenance.journaling.journals.JournalWriter; import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; @@ -153,7 +154,7 @@ public class CompressionTask implements Callable<Long> { } try (final JournalReader journalReader = new StandardJournalReader(journalFile); - final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer()); + final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, new DeflatorCompressionCodec(), new StandardEventSerializer()); final TocReader tocReader = new StandardTocReader(tocFile); final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java index f2266e2..89eace7 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java @@ -42,7 +42,8 @@ public class TestJournalReadWrite { final StandardEventSerializer serializer = new StandardEventSerializer(); try { - try (final StandardJournalWriter writer = new StandardJournalWriter(journalId, journalFile, compressed, serializer)) { + final CompressionCodec codec = compressed ? new DeflatorCompressionCodec() : null; + try (final StandardJournalWriter writer = new StandardJournalWriter(journalId, journalFile, codec, serializer)) { for (int block=0; block < 100; block++) { writer.beginNewBlock(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java index 9f0ba99..f29af1b 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java @@ -209,6 +209,8 @@ public class TestStandardJournalReader { @Test public void testReadFirstEventCompressed() throws IOException { dos.writeBoolean(true); + dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC); + writeRecords(88L, 1, true); // write data to a file so that we can read it with the journal reader @@ -235,6 +237,8 @@ public class TestStandardJournalReader { @Test public void testReadManyCompressed() throws IOException { dos.writeBoolean(true); + dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC); + writeRecords(0, 1024, true); // write data to a file so that we can read it with the journal reader @@ -266,6 +270,7 @@ public class TestStandardJournalReader { @Test public void testReadFirstEventWithBlockOffsetCompressed() throws IOException { dos.writeBoolean(true); + dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC); writeRecords(0, 10, true); final int secondBlockOffset = baos.size(); @@ -295,6 +300,8 @@ public class TestStandardJournalReader { @Test public void testReadSubsequentEventWithBlockOffsetCompressed() throws IOException { dos.writeBoolean(true); + dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC); + writeRecords(0, 10, true); final int secondBlockOffset = baos.size(); @@ -324,6 +331,8 @@ public class TestStandardJournalReader { @Test public void testReadMultipleEventsWithBlockOffsetCompressed() throws IOException { dos.writeBoolean(true); + dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC); + writeRecords(0, 10, true); final int secondBlockOffset = baos.size(); @@ -417,6 +426,8 @@ public class TestStandardJournalReader { @Test public void testReadEventWithBlockOffsetThenPreviousBlockOffsetCompressed() throws IOException { dos.writeBoolean(true); + dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC); + final int firstBlockOffset = baos.size(); writeRecords(0, 10, true); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java index e8a6787..956df80 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java @@ -46,7 +46,7 @@ public class TestStandardJournalWriter { try { assertTrue( journalFile.createNewFile() ); - try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) { + try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) { } } finally { @@ -60,11 +60,11 @@ public class TestStandardJournalWriter { try { assertTrue( journalFile.createNewFile() ); - try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) { + try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) { writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L); } - try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) { + try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) { Assert.fail("StandardJournalWriter attempted to overwrite existing file"); } catch (final FileAlreadyExistsException faee) { // expected @@ -80,7 +80,7 @@ public class TestStandardJournalWriter { final StandardEventSerializer serializer = new StandardEventSerializer(); try { - try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) { + try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), serializer)) { writer.beginNewBlock(); writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L); writer.finishBlock(); @@ -100,6 +100,10 @@ public class TestStandardJournalWriter { // compression flag assertEquals(true, dis.readBoolean()); + // compression codec name + final String compressionCodecName = dis.readUTF(); + assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, compressionCodecName); + // read block start final CompressionInputStream decompressedIn = new CompressionInputStream(bais); final StandardEventDeserializer deserializer = new StandardEventDeserializer(); @@ -123,7 +127,7 @@ public class TestStandardJournalWriter { final StandardEventSerializer serializer = new StandardEventSerializer(); try { - try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) { + try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), serializer)) { for (int i=0; i < 1024; i++) { writer.beginNewBlock(); writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L); @@ -145,6 +149,8 @@ public class TestStandardJournalWriter { // compression flag assertEquals(true, dis.readBoolean()); + assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, dis.readUTF()); + // read block start for (int i=0; i < 1024; i++) { final CompressionInputStream decompressedIn = new CompressionInputStream(bais);
