NIFI-388: Refactored to make compression codec flexible

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/57ba3bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/57ba3bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/57ba3bc0

Branch: refs/heads/prov-query-language
Commit: 57ba3bc046202e120b2e60fecc03d74fa0e1f18a
Parents: cb05722
Author: Mark Payne <[email protected]>
Authored: Mon Mar 2 13:48:22 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Mar 2 13:48:22 2015 -0500

----------------------------------------------------------------------
 .../journals/CompressedOutputStream.java        |  36 ++++++
 .../journaling/journals/CompressionCodec.java   |  45 ++++++++
 .../journals/DeflatorCompressionCodec.java      | 113 +++++++++++++++++++
 .../journals/StandardJournalReader.java         |  21 ++--
 .../journals/StandardJournalWriter.java         |  45 ++++----
 .../partition/JournalingPartition.java          |   2 +-
 .../journaling/tasks/CompressionTask.java       |   3 +-
 .../journals/TestJournalReadWrite.java          |   3 +-
 .../journals/TestStandardJournalReader.java     |  11 ++
 .../journals/TestStandardJournalWriter.java     |  16 ++-
 10 files changed, 257 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
new file mode 100644
index 0000000..618dd88
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class CompressedOutputStream extends OutputStream {
+
+    /**
+     * Begins a new compression block
+     * @throws IOException
+     */
+    public abstract void beginNewBlock() throws IOException;
+
+    /**
+     * Ends the current compression block
+     * @throws IOException
+     */
+    public abstract void finishBlock() throws IOException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
new file mode 100644
index 0000000..f6e856e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface CompressionCodec {
+    /**
+     * Returns the name of the compression codec
+     * @return
+     */
+    String getName();
+    
+    /**
+     * Wraps the given OutputStream so that data written will be compressed
+     * @param out
+     * @return
+     * @throws IOException
+     */
+    CompressedOutputStream newCompressionOutputStream(OutputStream out) throws 
IOException;
+    
+    /**
+     * Wraps the given InputStream so that data read will be decompressed
+     * @param in
+     * @return
+     * @throws IOException
+     */
+    InputStream newCompressionInputStream(InputStream in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
new file mode 100644
index 0000000..b9f2959
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+
+public class DeflatorCompressionCodec implements CompressionCodec {
+    public static final String DEFLATOR_COMPRESSION_CODEC = 
"deflator-compression-codec";
+    
+    @Override
+    public String getName() {
+        return DEFLATOR_COMPRESSION_CODEC;
+    }
+
+    @Override
+    public CompressedOutputStream newCompressionOutputStream(final 
OutputStream out) throws IOException {
+        return new DeflatorOutputStream(out);
+    }
+
+    @Override
+    public InputStream newCompressionInputStream(final InputStream in) throws 
IOException {
+        return new CompressionInputStream(in);
+    }
+
+    
+    private static class DeflatorOutputStream extends CompressedOutputStream {
+        private final OutputStream originalOut;
+        private CompressionOutputStream compressionOutput;
+        
+        public DeflatorOutputStream(final OutputStream out) {
+            this.originalOut = out;
+        }
+        
+        private void verifyState() {
+            if ( compressionOutput == null ) {
+                throw new IllegalStateException("No Compression Block has been 
created");
+            }
+        }
+        
+        @Override
+        public void write(final int b) throws IOException {
+            verifyState();
+            compressionOutput.write(b);
+        }
+        
+        @Override
+        public void write(final byte[] b) throws IOException {
+            verifyState();
+            compressionOutput.write(b);
+        }
+        
+        @Override
+        public void write(final byte[] b, final int off, final int len) throws 
IOException {
+            verifyState();
+            compressionOutput.write(b, off, len);
+        }
+        
+        @Override
+        public void flush() throws IOException {
+            if ( compressionOutput != null ) {
+                compressionOutput.flush();
+            }
+        }
+        
+        @Override
+        public void close() throws IOException {
+            if ( compressionOutput != null ) {
+                compressionOutput.close();
+            }
+            
+            originalOut.close();
+        }
+        
+        @Override
+        public void beginNewBlock() throws IOException {
+            compressionOutput = new CompressionOutputStream(originalOut);
+        }
+        
+        @Override
+        public void finishBlock() throws IOException {
+            // Calling close() on CompressionOutputStream doesn't close the 
underlying stream -- it is designed
+            // such that calling close() will write out the Compression footer 
and become unusable but not
+            // close the underlying stream because the whole point of 
CompressionOutputStream as opposed to
+            // GZIPOutputStream is that with CompressionOutputStream we can 
concatenate many together on a single
+            // stream.
+            if ( compressionOutput == null ) {
+                return;
+            } else {
+                compressionOutput.close();
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index 13878f8..9a937b4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.journaling.io.Deserializer;
 import org.apache.nifi.provenance.journaling.io.Deserializers;
-import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.MinimumLengthInputStream;
@@ -49,7 +48,7 @@ public class StandardJournalReader implements JournalReader {
     
     private Deserializer deserializer;
     private int serializationVersion;
-    private boolean compressed;
+    private CompressionCodec compressionCodec = null;
     
     private long lastEventIdRead = -1L;
     
@@ -68,7 +67,15 @@ public class StandardJournalReader implements JournalReader {
             StandardJournalMagicHeader.read(dis);
             final String codecName = dis.readUTF();
             serializationVersion = dis.readInt();
-            compressed = dis.readBoolean();
+            final boolean compressed = dis.readBoolean();
+            if ( compressed ) {
+                final String compressionCodecName = dis.readUTF();
+                if ( 
DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC.equals(compressionCodecName)
 ) {
+                    compressionCodec = new DeflatorCompressionCodec();
+                } else {
+                    throw new IOException(file + " is compressed using unknown 
Compression Codec " + compressionCodecName);
+                }
+            }
             deserializer = Deserializers.getDeserializer(codecName);
             
             resetDecompressedStream();
@@ -83,10 +90,10 @@ public class StandardJournalReader implements JournalReader 
{
     
     
     private void resetDecompressedStream() throws IOException {
-        if ( compressed ) {
-            decompressedStream = new ByteCountingInputStream(new 
BufferedInputStream(new CompressionInputStream(compressedStream)), 
compressedStream.getBytesConsumed());
-        } else {
+        if ( compressionCodec == null ) {
             decompressedStream = compressedStream;
+        } else {
+            decompressedStream = new ByteCountingInputStream(new 
BufferedInputStream(compressionCodec.newCompressionInputStream(compressedStream)),
 compressedStream.getBytesConsumed());
         }
     }
     
@@ -129,7 +136,7 @@ public class StandardJournalReader implements JournalReader 
{
             
             // we are allowed to span blocks. We're out of data but if we are 
compressed, it could
             // just mean that the block has ended.
-            if ( !compressed ) {
+            if ( compressionCodec == null ) {
                 return null;
             }
             

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index a9cb361..d18b05b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.journaling.io.Serializer;
-import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
@@ -67,11 +66,13 @@ import org.slf4j.LoggerFactory;
  * 
  * Where &lt;header&gt; is defined as:
  * <pre>
+ *  magic header "NiFiProvJournal_1"
  *  String: serialization codec name (retrieved from serializer)
  *      --> 2 bytes for length of string
  *      --> N bytes for actual serialization codec name
  *  int: serialization version
  *  boolean: compressed: 1 -> compressed, 0 -> not compressed
+ *  String : if compressed, name of compression codec; otherwise, not present
  * </pre>
  * 
  * And &lt;record&gt; is defined as:
@@ -94,7 +95,7 @@ public class StandardJournalWriter implements JournalWriter {
     
     private final long journalId;
     private final File journalFile;
-    private final boolean compressed;
+    private final CompressionCodec compressionCodec;
     private final Serializer serializer;
     private final long creationTime = System.nanoTime();
     private final String description;
@@ -111,7 +112,7 @@ public class StandardJournalWriter implements JournalWriter 
{
     private long recordCount = 1L;
     
     
-    public StandardJournalWriter(final long journalId, final File journalFile, 
final boolean compressed, final Serializer serializer) throws IOException {
+    public StandardJournalWriter(final long journalId, final File journalFile, 
final CompressionCodec compressionCodec, final Serializer serializer) throws 
IOException {
         if ( journalFile.exists() ) {
             // Check if there is actually any data here.
             try (final InputStream fis = new FileInputStream(journalFile);
@@ -133,7 +134,7 @@ public class StandardJournalWriter implements JournalWriter 
{
         
         this.journalId = journalId;
         this.journalFile = journalFile;
-        this.compressed = compressed;
+        this.compressionCodec = compressionCodec;
         this.serializer = serializer;
         this.description = "Journal Writer for " + journalFile;
         this.fos = new FileOutputStream(journalFile);
@@ -141,8 +142,10 @@ public class StandardJournalWriter implements 
JournalWriter {
         uncompressedStream = new ByteCountingOutputStream(fos);
         writeHeader(uncompressedStream);
         
-        if (compressed) {
-            compressedStream = new CompressionOutputStream(uncompressedStream);
+        if (compressionCodec != null) {
+            final CompressedOutputStream cos = 
compressionCodec.newCompressionOutputStream(uncompressedStream);
+            cos.beginNewBlock();
+            compressedStream = cos;
         } else {
             compressedStream = fos;
         }
@@ -155,7 +158,13 @@ public class StandardJournalWriter implements 
JournalWriter {
         StandardJournalMagicHeader.write(out);
         dos.writeUTF(serializer.getCodecName());
         dos.writeInt(serializer.getVersion());
+        
+        final boolean compressed = compressionCodec != null;
         dos.writeBoolean(compressed);
+        if ( compressed ) {
+            dos.writeUTF(compressionCodec.getName());
+        }
+        
         dos.flush();
     }
     
@@ -258,6 +267,7 @@ public class StandardJournalWriter implements JournalWriter 
{
     public long getAge(final TimeUnit timeUnit) {
         return timeUnit.convert(System.nanoTime() - creationTime, 
TimeUnit.NANOSECONDS);
     }
+    
 
     @Override
     public void finishBlock() throws IOException {
@@ -266,16 +276,10 @@ public class StandardJournalWriter implements 
JournalWriter {
         }
         
         blockStarted = false;
-        if ( !compressed ) {
-            return;
+        
+        if ( compressedStream instanceof CompressedOutputStream ) {
+            ((CompressedOutputStream) compressedStream).finishBlock();
         }
-
-        // Calling close() on CompressionOutputStream doesn't close the 
underlying stream -- it is designed
-        // such that calling close() will write out the Compression footer and 
become unusable but not
-        // close the underlying stream because the whole point of 
CompressionOutputStream as opposed to
-        // GZIPOutputStream is that with CompressionOutputStream we can 
concatenate many together on a single
-        // stream.
-        compressedStream.close();
     }
     
     @Override
@@ -285,15 +289,10 @@ public class StandardJournalWriter implements 
JournalWriter {
         }
         blockStarted = true;
         
-        if ( !compressed ) {
-            return;
-        }
-        if ( eventCount == 0 ) {
-            return;
+        if ( compressedStream instanceof CompressedOutputStream ) {
+            ((CompressedOutputStream) compressedStream).beginNewBlock();
+            this.out = new ByteCountingOutputStream(compressedStream, 
uncompressedStream.getBytesWritten());
         }
-        
-        this.compressedStream = new 
CompressionOutputStream(uncompressedStream);
-        this.out = new ByteCountingOutputStream(compressedStream, 
uncompressedStream.getBytesWritten());
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index bba6899..31371af 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -252,7 +252,7 @@ public class JournalingPartition implements Partition {
         
         // create new writers and reset state.
         final File journalFile = new File(journalsDir, firstEventId + 
JOURNAL_FILE_EXTENSION);
-        journalWriter = new StandardJournalWriter(firstEventId, journalFile, 
false, new StandardEventSerializer());
+        journalWriter = new StandardJournalWriter(firstEventId, journalFile, 
null, new StandardEventSerializer());
         try {
             tocWriter = new 
StandardTocWriter(QueryUtils.getTocFile(journalFile), false, 
config.isAlwaysSync());
             tocWriter.addBlockOffset(journalWriter.getSize());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
index fc9fb46..7977620 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
+import org.apache.nifi.provenance.journaling.journals.DeflatorCompressionCodec;
 import org.apache.nifi.provenance.journaling.journals.JournalReader;
 import org.apache.nifi.provenance.journaling.journals.JournalWriter;
 import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
@@ -153,7 +154,7 @@ public class CompressionTask implements Callable<Long> {
             }
             
             try (final JournalReader journalReader = new 
StandardJournalReader(journalFile);
-                final JournalWriter compressedWriter = new 
StandardJournalWriter(journalId, compressedFile, true, new 
StandardEventSerializer());
+                final JournalWriter compressedWriter = new 
StandardJournalWriter(journalId, compressedFile, new 
DeflatorCompressionCodec(), new StandardEventSerializer());
                 final TocReader tocReader = new StandardTocReader(tocFile);
                 final TocWriter compressedTocWriter = new 
StandardTocWriter(compressedTocFile, true, false)) {
                 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
index f2266e2..89eace7 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
@@ -42,7 +42,8 @@ public class TestJournalReadWrite {
         final StandardEventSerializer serializer = new 
StandardEventSerializer();
         
         try {
-            try (final StandardJournalWriter writer = new 
StandardJournalWriter(journalId, journalFile, compressed, serializer)) {
+            final CompressionCodec codec = compressed ? new 
DeflatorCompressionCodec() : null;
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(journalId, journalFile, codec, serializer)) {
                 for (int block=0; block < 100; block++) {
                     writer.beginNewBlock();
                     

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
index 9f0ba99..f29af1b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
@@ -209,6 +209,8 @@ public class TestStandardJournalReader {
     @Test
     public void testReadFirstEventCompressed() throws IOException {
         dos.writeBoolean(true);
+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
         writeRecords(88L, 1, true);
         
         // write data to a file so that we can read it with the journal reader
@@ -235,6 +237,8 @@ public class TestStandardJournalReader {
     @Test
     public void testReadManyCompressed() throws IOException {
         dos.writeBoolean(true);
+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
         writeRecords(0, 1024, true);
         
         // write data to a file so that we can read it with the journal reader
@@ -266,6 +270,7 @@ public class TestStandardJournalReader {
     @Test
     public void testReadFirstEventWithBlockOffsetCompressed() throws 
IOException {
         dos.writeBoolean(true);
+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
         writeRecords(0, 10, true);
         
         final int secondBlockOffset = baos.size();
@@ -295,6 +300,8 @@ public class TestStandardJournalReader {
     @Test
     public void testReadSubsequentEventWithBlockOffsetCompressed() throws 
IOException {
         dos.writeBoolean(true);
+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
         writeRecords(0, 10, true);
         
         final int secondBlockOffset = baos.size();
@@ -324,6 +331,8 @@ public class TestStandardJournalReader {
     @Test
     public void testReadMultipleEventsWithBlockOffsetCompressed() throws 
IOException {
         dos.writeBoolean(true);
+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
         writeRecords(0, 10, true);
         
         final int secondBlockOffset = baos.size();
@@ -417,6 +426,8 @@ public class TestStandardJournalReader {
     @Test
     public void 
testReadEventWithBlockOffsetThenPreviousBlockOffsetCompressed() throws 
IOException {
         dos.writeBoolean(true);
+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
         final int firstBlockOffset = baos.size();
         writeRecords(0, 10, true);
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57ba3bc0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
index e8a6787..956df80 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
@@ -46,7 +46,7 @@ public class TestStandardJournalWriter {
         try {
             assertTrue( journalFile.createNewFile() );
             
-            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new 
StandardEventSerializer())) {
                 
             }
         } finally {
@@ -60,11 +60,11 @@ public class TestStandardJournalWriter {
         try {
             assertTrue( journalFile.createNewFile() );
             
-            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new 
StandardEventSerializer())) {
                 
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
             }
             
-            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new 
StandardEventSerializer())) {
                 Assert.fail("StandardJournalWriter attempted to overwrite 
existing file");
             } catch (final FileAlreadyExistsException faee) {
                 // expected
@@ -80,7 +80,7 @@ public class TestStandardJournalWriter {
         
         final StandardEventSerializer serializer = new 
StandardEventSerializer();
         try {
-            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, true, serializer)) {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), 
serializer)) {
                 writer.beginNewBlock();
                 
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
                 writer.finishBlock();
@@ -100,6 +100,10 @@ public class TestStandardJournalWriter {
             // compression flag
             assertEquals(true, dis.readBoolean());
             
+            // compression codec name
+            final String compressionCodecName = dis.readUTF();
+            assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, 
compressionCodecName);
+            
             // read block start
             final CompressionInputStream decompressedIn = new 
CompressionInputStream(bais);
             final StandardEventDeserializer deserializer = new 
StandardEventDeserializer();
@@ -123,7 +127,7 @@ public class TestStandardJournalWriter {
         
         final StandardEventSerializer serializer = new 
StandardEventSerializer();
         try {
-            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, true, serializer)) {
+            try (final StandardJournalWriter writer = new 
StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), 
serializer)) {
                 for (int i=0; i < 1024; i++) {
                     writer.beginNewBlock();
                     
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
@@ -145,6 +149,8 @@ public class TestStandardJournalWriter {
             // compression flag
             assertEquals(true, dis.readBoolean());
             
+            assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, 
dis.readUTF());
+            
             // read block start
             for (int i=0; i < 1024; i++) {
                 final CompressionInputStream decompressedIn = new 
CompressionInputStream(bais);

Reply via email to