This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 64acd8b1c5 NIFI-2827 Adding zstd-jni to the necessary pom.xml. It's 
already in the LICENSE. NIFI-2827 Update CompressContent.java to use zstd 
compression format NIFI-2827 Update test cases for CompressContent.java to 
include zstd format NIFI-2827 Update JsonRecordSetWriter.java to enable zstd 
compression format
64acd8b1c5 is described below

commit 64acd8b1c5f84473e3d07e23450570a9dc8c4242
Author: Matthew Hawkins <hawko2...@gmail.com>
AuthorDate: Fri Aug 12 02:36:27 2022 +1000

    NIFI-2827 Adding zstd-jni to the necessary pom.xml. It's already in the 
LICENSE.
    NIFI-2827 Update CompressContent.java to use zstd compression format
    NIFI-2827 Update test cases for CompressContent.java to include zstd format
    NIFI-2827 Update JsonRecordSetWriter.java to enable zstd compression format
    
    This closes #6294
    
    Signed-off-by: Mike Thomsen <mthom...@apache.org>
---
 minifi/pom.xml                                     |   5 ++++
 .../nifi-standard-processors/pom.xml               |   4 +++
 .../nifi/processors/standard/CompressContent.java  |  23 +++++++++++++---
 .../processors/standard/TestCompressContent.java   |  30 +++++++++++++++++++++
 .../resources/CompressedData/SampleFile.txt.zst    | Bin 0 -> 80 bytes
 nifi-nar-bundles/nifi-standard-bundle/pom.xml      |   5 ++++
 .../org/apache/nifi/json/JsonRecordSetWriter.java  |   9 +++++--
 7 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/minifi/pom.xml b/minifi/pom.xml
index e06e6543f9..9b367f8336 100644
--- a/minifi/pom.xml
+++ b/minifi/pom.xml
@@ -504,6 +504,11 @@ limitations under the License.
                 <artifactId>lzma-java</artifactId>
                 <version>1.3</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.luben</groupId>
+                <artifactId>zstd-jni</artifactId>
+                <version>1.5.2-3</version>
+            </dependency>
             <dependency>
                 <groupId>org.tukaani</groupId>
                 <artifactId>xz</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index b191e369be..9956a75881 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -221,6 +221,10 @@
             <groupId>com.github.jponge</groupId>
             <artifactId>lzma-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.tukaani</groupId>
             <artifactId>xz</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 60fb4c3653..6ad4192cd1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -23,6 +23,8 @@ import 
org.apache.commons.compress.compressors.CompressorStreamFactory;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
+import 
org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -82,7 +84,7 @@ import java.util.zip.InflaterInputStream;
 @SideEffectFree
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", 
"xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", 
"deflate"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", 
"xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", 
"deflate", "zstd"})
 @CapabilityDescription("Compresses or decompresses the contents of FlowFiles 
using a user-specified compression algorithm and updates the mime.type "
     + "attribute as appropriate. This processor operates in a very memory 
efficient way so very large objects well beyond the heap size "
     + "are generally fine to process")
@@ -104,16 +106,17 @@ public class CompressContent extends AbstractProcessor {
     public static final String COMPRESSION_FORMAT_SNAPPY_HADOOP = 
"snappy-hadoop";
     public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy 
framed";
     public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
+    public static final String COMPRESSION_FORMAT_ZSTD = "zstd";
 
     public static final String MODE_COMPRESS = "compress";
     public static final String MODE_DECOMPRESS = "decompress";
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new 
PropertyDescriptor.Builder()
         .name("Compression Format")
-        .description("The compression format to use. Valid values are: GZIP, 
Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and 
LZ4-Framed")
+        .description("The compression format to use. Valid values are: GZIP, 
Deflate, ZSTD, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and 
LZ4-Framed")
         .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, 
COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
             COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, 
COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP, 
COMPRESSION_FORMAT_SNAPPY_FRAMED,
-            COMPRESSION_FORMAT_LZ4_FRAMED)
+            COMPRESSION_FORMAT_LZ4_FRAMED, COMPRESSION_FORMAT_ZSTD)
         .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
         .required(true)
         .build();
@@ -132,7 +135,7 @@ public class CompressContent extends AbstractProcessor {
         .defaultValue("1")
         .required(true)
         .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
-        .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, 
COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, 
COMPRESSION_FORMAT_XZ_LZMA2)
+        .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, 
COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, 
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_ZSTD)
         .dependsOn(MODE, MODE_COMPRESS)
         .build();
 
@@ -184,6 +187,7 @@ public class CompressContent extends AbstractProcessor {
         mimeTypeMap.put("application/x-snappy-hadoop", 
COMPRESSION_FORMAT_SNAPPY_HADOOP);
         mimeTypeMap.put("application/x-snappy-framed", 
COMPRESSION_FORMAT_SNAPPY_FRAMED);
         mimeTypeMap.put("application/x-lz4-framed", 
COMPRESSION_FORMAT_LZ4_FRAMED);
+        mimeTypeMap.put("application/zstd", COMPRESSION_FORMAT_ZSTD);
         this.compressionFormatMimeTypeMap = 
Collections.unmodifiableMap(mimeTypeMap);
     }
 
@@ -273,6 +277,9 @@ public class CompressContent extends AbstractProcessor {
             case COMPRESSION_FORMAT_LZ4_FRAMED:
                 fileExtension = ".lz4";
                 break;
+            case COMPRESSION_FORMAT_ZSTD:
+                fileExtension = ".zst";
+                break;
             default:
                 fileExtension = "";
                 break;
@@ -328,6 +335,11 @@ public class CompressContent extends AbstractProcessor {
                                     
mimeTypeRef.set("application/x-lz4-framed");
                                     compressionOut = new 
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
 bufferedOut);
                                     break;
+                                case COMPRESSION_FORMAT_ZSTD:
+                                    final int zstdcompressionLevel = 
context.getProperty(COMPRESSION_LEVEL).asInteger() * 2;
+                                    compressionOut = new 
ZstdCompressorOutputStream(bufferedOut, zstdcompressionLevel);
+                                    mimeTypeRef.set("application/zstd");
+                                    break;
                                 case COMPRESSION_FORMAT_BZIP2:
                                 default:
                                     mimeTypeRef.set("application/x-bzip2");
@@ -364,6 +376,9 @@ public class CompressContent extends AbstractProcessor {
                                 case COMPRESSION_FORMAT_LZ4_FRAMED:
                                     compressionIn = new 
FramedLZ4CompressorInputStream(bufferedIn, true);
                                     break;
+                                case COMPRESSION_FORMAT_ZSTD:
+                                    compressionIn = new 
ZstdCompressorInputStream(bufferedIn);
+                                    break;
                                 default:
                                     compressionIn = new 
CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(),
 bufferedIn);
                             }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 0445f2bd8b..0d8727a8e3 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -347,4 +347,34 @@ public class TestCompressContent {
         
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
         flowFile.assertAttributeEquals("filename", "SampleFile.txt");
     }
+
+    @Test
+    public void testZstdCompress() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, 
CompressContent.MODE_COMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, 
CompressContent.COMPRESSION_FORMAT_ZSTD);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/zstd");
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt.zst");
+    }
+
+    @Test
+    public void testZstdDecompress() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, 
CompressContent.MODE_DECOMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, 
CompressContent.COMPRESSION_FORMAT_ZSTD);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+        
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zst"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst
new file mode 100755
index 0000000000..e427761608
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst
 differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index 694fa2acc5..20c3f4fc3f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -148,6 +148,11 @@
                 <artifactId>lzma-java</artifactId>
                 <version>1.3</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.luben</groupId>
+                <artifactId>zstd-jni</artifactId>
+                <version>1.5.2-3</version>
+            </dependency>
             <dependency>
                 <groupId>org.tukaani</groupId>
                 <artifactId>xz</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index dee9a4f7e9..153b1308bd 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -72,6 +72,7 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
     public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
     public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy 
framed";
     public static final String COMPRESSION_FORMAT_NONE = "none";
+    public static final String COMPRESSION_FORMAT_ZSTD = "zstd";
 
     static final PropertyDescriptor SUPPRESS_NULLS = new 
PropertyDescriptor.Builder()
             .name("suppress-nulls")
@@ -101,9 +102,9 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
     public static final PropertyDescriptor COMPRESSION_FORMAT = new 
PropertyDescriptor.Builder()
             .name("compression-format")
             .displayName("Compression Format")
-            .description("The compression format to use. Valid values are: 
GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
+            .description("The compression format to use. Valid values are: 
GZIP, BZIP2, ZSTD, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
             .allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP, 
COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2,
-                    COMPRESSION_FORMAT_SNAPPY, 
COMPRESSION_FORMAT_SNAPPY_FRAMED)
+                    COMPRESSION_FORMAT_SNAPPY, 
COMPRESSION_FORMAT_SNAPPY_FRAMED, COMPRESSION_FORMAT_ZSTD)
             .defaultValue(COMPRESSION_FORMAT_NONE)
             .required(true)
             .build();
@@ -203,6 +204,10 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
                     mimeTypeRef = "application/x-bzip2";
                     compressionOut = new 
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
 bufferedOut);
                     break;
+                case COMPRESSION_FORMAT_ZSTD:
+                    mimeTypeRef = "application/zstd";
+                    compressionOut = new 
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
 bufferedOut);
+                    break;
                 default:
                     mimeTypeRef = "application/json";
                     compressionOut = out;

Reply via email to