This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 64acd8b1c5 NIFI-2827 Adding zstd-jni to the necessary pom.xml. It's already in the LICENSE. NIFI-2827 Update CompressContent.java to use zstd compression format NIFI-2827 Update test cases for CompressContent.java to include zstd format NIFI-2827 Update JsonRecordSetWriter.java to enable zstd compression format 64acd8b1c5 is described below commit 64acd8b1c5f84473e3d07e23450570a9dc8c4242 Author: Matthew Hawkins <hawko2...@gmail.com> AuthorDate: Fri Aug 12 02:36:27 2022 +1000 NIFI-2827 Adding zstd-jni to the necessary pom.xml. It's already in the LICENSE. NIFI-2827 Update CompressContent.java to use zstd compression format NIFI-2827 Update test cases for CompressContent.java to include zstd format NIFI-2827 Update JsonRecordSetWriter.java to enable zstd compression format This closes #6294 Signed-off-by: Mike Thomsen <mthom...@apache.org> --- minifi/pom.xml | 5 ++++ .../nifi-standard-processors/pom.xml | 4 +++ .../nifi/processors/standard/CompressContent.java | 23 +++++++++++++--- .../processors/standard/TestCompressContent.java | 30 +++++++++++++++++++++ .../resources/CompressedData/SampleFile.txt.zst | Bin 0 -> 80 bytes nifi-nar-bundles/nifi-standard-bundle/pom.xml | 5 ++++ .../org/apache/nifi/json/JsonRecordSetWriter.java | 9 +++++-- 7 files changed, 70 insertions(+), 6 deletions(-) diff --git a/minifi/pom.xml b/minifi/pom.xml index e06e6543f9..9b367f8336 100644 --- a/minifi/pom.xml +++ b/minifi/pom.xml @@ -504,6 +504,11 @@ limitations under the License. <artifactId>lzma-java</artifactId> <version>1.3</version> </dependency> + <dependency> + <groupId>com.github.luben</groupId> + <artifactId>zstd-jni</artifactId> + <version>1.5.2-3</version> + </dependency> <dependency> <groupId>org.tukaani</groupId> <artifactId>xz</artifactId> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index b191e369be..9956a75881 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -221,6 +221,10 @@ <groupId>com.github.jponge</groupId> <artifactId>lzma-java</artifactId> </dependency> + <dependency> + <groupId>com.github.luben</groupId> + <artifactId>zstd-jni</artifactId> + </dependency> <dependency> <groupId>org.tukaani</groupId> <artifactId>xz</artifactId> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index 60fb4c3653..6ad4192cd1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -23,6 +23,8 @@ import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream; +import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -82,7 +84,7 @@ import java.util.zip.InflaterInputStream; @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate"}) +@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate", "zstd"}) @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type " + "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size " + "are generally fine to process") @@ -104,16 +106,17 @@ public class CompressContent extends AbstractProcessor { public static final String COMPRESSION_FORMAT_SNAPPY_HADOOP = "snappy-hadoop"; public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed"; public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed"; + public static final String COMPRESSION_FORMAT_ZSTD = "zstd"; public static final String MODE_COMPRESS = "compress"; public static final String MODE_DECOMPRESS = "decompress"; public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() .name("Compression Format") - .description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed") + .description("The compression format to use. Valid values are: GZIP, Deflate, ZSTD, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed") .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP, COMPRESSION_FORMAT_SNAPPY_FRAMED, - COMPRESSION_FORMAT_LZ4_FRAMED) + COMPRESSION_FORMAT_LZ4_FRAMED, COMPRESSION_FORMAT_ZSTD) .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE) .required(true) .build(); @@ -132,7 +135,7 @@ public class CompressContent extends AbstractProcessor { .defaultValue("1") .required(true) .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") - .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_XZ_LZMA2) + .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_ZSTD) .dependsOn(MODE, MODE_COMPRESS) .build(); @@ -184,6 +187,7 @@ public class CompressContent extends AbstractProcessor { mimeTypeMap.put("application/x-snappy-hadoop", COMPRESSION_FORMAT_SNAPPY_HADOOP); mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED); mimeTypeMap.put("application/x-lz4-framed", COMPRESSION_FORMAT_LZ4_FRAMED); + mimeTypeMap.put("application/zstd", COMPRESSION_FORMAT_ZSTD); this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap); } @@ -273,6 +277,9 @@ public class CompressContent extends AbstractProcessor { case COMPRESSION_FORMAT_LZ4_FRAMED: fileExtension = ".lz4"; break; + case COMPRESSION_FORMAT_ZSTD: + fileExtension = ".zst"; + break; default: fileExtension = ""; break; @@ -328,6 +335,11 @@ public class CompressContent extends AbstractProcessor { mimeTypeRef.set("application/x-lz4-framed"); compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut); break; + case COMPRESSION_FORMAT_ZSTD: + final int zstdcompressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger() * 2; + compressionOut = new ZstdCompressorOutputStream(bufferedOut, zstdcompressionLevel); + mimeTypeRef.set("application/zstd"); + break; case COMPRESSION_FORMAT_BZIP2: default: mimeTypeRef.set("application/x-bzip2"); @@ -364,6 +376,9 @@ public class CompressContent extends AbstractProcessor { case COMPRESSION_FORMAT_LZ4_FRAMED: compressionIn = new FramedLZ4CompressorInputStream(bufferedIn, true); break; + case COMPRESSION_FORMAT_ZSTD: + compressionIn = new ZstdCompressorInputStream(bufferedIn); + break; default: compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java index 0445f2bd8b..0d8727a8e3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java @@ -347,4 +347,34 @@ public class TestCompressContent { flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); flowFile.assertAttributeEquals("filename", "SampleFile.txt"); } + + @Test + public void testZstdCompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ZSTD); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zstd"); + flowFile.assertAttributeEquals("filename", "SampleFile.txt.zst"); + } + + @Test + public void testZstdDecompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ZSTD); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zst")); + runner.run(); + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst new file mode 100755 index 0000000000..e427761608 Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst differ diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml index 694fa2acc5..20c3f4fc3f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml @@ -148,6 +148,11 @@ <artifactId>lzma-java</artifactId> <version>1.3</version> </dependency> + <dependency> + <groupId>com.github.luben</groupId> + <artifactId>zstd-jni</artifactId> + <version>1.5.2-3</version> + </dependency> <dependency> <groupId>org.tukaani</groupId> <artifactId>xz</artifactId> diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index dee9a4f7e9..153b1308bd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -72,6 +72,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements public static final String COMPRESSION_FORMAT_SNAPPY = "snappy"; public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed"; public static final String COMPRESSION_FORMAT_NONE = "none"; + public static final String COMPRESSION_FORMAT_ZSTD = "zstd"; static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder() .name("suppress-nulls") @@ -101,9 +102,9 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() .name("compression-format") .displayName("Compression Format") - .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed") + .description("The compression format to use. Valid values are: GZIP, BZIP2, ZSTD, XZ-LZMA2, LZMA, Snappy, and Snappy Framed") .allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, - COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED) + COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED, COMPRESSION_FORMAT_ZSTD) .defaultValue(COMPRESSION_FORMAT_NONE) .required(true) .build(); @@ -203,6 +204,10 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements mimeTypeRef = "application/x-bzip2"; compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut); break; + case COMPRESSION_FORMAT_ZSTD: + mimeTypeRef = "application/zstd"; + compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut); + break; default: mimeTypeRef = "application/json"; compressionOut = out;