Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 72b9a0e1a -> 9af7faab4
MINIFICPP-264: CompressContent Processor This closes #151. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9af7faab Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9af7faab Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9af7faab Branch: refs/heads/master Commit: 9af7faab452c66a77e9a0623469369c2e1a76478 Parents: 72b9a0e Author: Bin Qiu <[email protected]> Authored: Mon Oct 23 20:17:01 2017 -0700 Committer: Marc Parisi <[email protected]> Committed: Thu Oct 26 10:10:15 2017 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- README.md | 1 + extensions/libarchive/ArchiveLoader.h | 7 +- extensions/libarchive/CompressContent.cpp | 173 ++++ extensions/libarchive/CompressContent.h | 381 ++++++++ extensions/rocksdb-repos/RocksDbStream.h | 2 +- libminifi/include/io/AtomicEntryStream.h | 2 +- libminifi/include/io/BaseStream.h | 2 +- libminifi/include/io/ClientSocket.h | 2 +- libminifi/include/io/DataStream.h | 4 +- libminifi/include/io/FileStream.h | 2 +- libminifi/src/io/FileStream.cpp | 3 +- .../test/archive-tests/CompressContentTests.cpp | 933 +++++++++++++++++++ 13 files changed, 1504 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 756be29..e28bae7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -130,7 +130,7 @@ endif() ## ENABLE FEATURE INFORMATION ADD_FEATURE_INFO("HTTP CURL" HTTP-CURL "This enables RESTProtocol, InvokeHTTP, and the HTTPClient for Site to Site") ADD_FEATURE_INFO("ROCKSDB REPOS" ROCKSDB-REPOS "This Enables persistent provenance, flowfile, and content repositories using RocksDB") -ADD_FEATURE_INFO("ARCHIVE EXTENSIONS" ARCHIVE-EXTENSIONS "This Enables libarchive functionality including MergeContent") +ADD_FEATURE_INFO("ARCHIVE EXTENSIONS" ARCHIVE-EXTENSIONS "This Enables libarchive functionality including MergeContent, CompressContent") ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 8f8af9c..1f7f269 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a * TailFile * MergeContent * ExtractText + * CompressContent * Provenance events generation is supported and are persisted using RocksDB. ## System Requirements http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/libarchive/ArchiveLoader.h ---------------------------------------------------------------------- diff --git a/extensions/libarchive/ArchiveLoader.h b/extensions/libarchive/ArchiveLoader.h index c7b1e36..e407483 100644 --- a/extensions/libarchive/ArchiveLoader.h +++ b/extensions/libarchive/ArchiveLoader.h @@ -19,6 +19,7 @@ #define EXTENSION_ARCHIVELOADER_H #include "MergeContent.h" +#include "CompressContent.h" #include "core/ClassLoader.h" class __attribute__((visibility("default"))) ArchiveFactory : public core::ObjectFactory { @@ -45,6 +46,7 @@ class __attribute__((visibility("default"))) ArchiveFactory : public core::Objec virtual std::vector<std::string> getClassNames() { std::vector<std::string> class_names; class_names.push_back("MergeContent"); + class_names.push_back("CompressContent"); return class_names; } @@ -53,8 +55,11 @@ class __attribute__((visibility("default"))) ArchiveFactory : public core::Objec std::transform(name.begin(), name.end(), name.begin(), ::tolower); if (name == "mergecontent") { return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::MergeContent>()); + } else if (name == "compresscontent") { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::CompressContent>()); + } else { + return nullptr; } - return nullptr; } static bool added; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/libarchive/CompressContent.cpp ---------------------------------------------------------------------- diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp new file mode 100644 index 0000000..4f250e9 --- /dev/null +++ b/extensions/libarchive/CompressContent.cpp @@ -0,0 +1,173 @@ +/** + * @file CompressContent.cpp + * CompressContent class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "CompressContent.h" +#include <stdio.h> +#include <algorithm> +#include <memory> +#include <string> +#include <map> +#include <set> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property CompressContent::CompressLevel("Compression Level", "The compression level to use; this is valid only when using GZIP compression.", "1"); +core::Property CompressContent::CompressMode("Mode", "Indicates whether the processor should compress content or decompress content.", MODE_COMPRESS); +core::Property CompressContent::CompressFormat("Compression Format", "The compression format to use.", COMPRESSION_FORMAT_ATTRIBUTE); +core::Property CompressContent::UpdateFileName("Update Filename", "Determines if filename extension need to be updated", "false"); +core::Relationship CompressContent::Success("success", "FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed"); +core::Relationship CompressContent::Failure("failure", "FlowFiles will be transferred to the failure relationship if they fail to compress/decompress"); + +void CompressContent::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(CompressLevel); + properties.insert(CompressMode); + properties.insert(CompressFormat); + properties.insert(UpdateFileName); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Failure); + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void CompressContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + if (context->getProperty(CompressLevel.getName(), value) && !value.empty()) { + core::Property::StringToInt(value, compressLevel_); + } + value = ""; + if (context->getProperty(CompressMode.getName(), value) && !value.empty()) { + this->compressMode_ = value; + } + value = ""; + if (context->getProperty(CompressFormat.getName(), value) && !value.empty()) { + this->compressFormat_ = value; + } + value = ""; + if (context->getProperty(UpdateFileName.getName(), value) && !value.empty()) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, updateFileName_); + } + logger_->log_info("Compress Content: Mode [%s] Format [%s] Level [%d] UpdateFileName [%d]", compressMode_, compressFormat_, compressLevel_, updateFileName_); + // update the mimeTypeMap + compressionFormatMimeTypeMap_["application/gzip"] = COMPRESSION_FORMAT_GZIP; + compressionFormatMimeTypeMap_["application/bzip2"] = COMPRESSION_FORMAT_BZIP2; + compressionFormatMimeTypeMap_["application/x-bzip2"] = COMPRESSION_FORMAT_BZIP2; + compressionFormatMimeTypeMap_["application/x-lzma"] = COMPRESSION_FORMAT_LZMA; + compressionFormatMimeTypeMap_["application/x-xz"] = COMPRESSION_FORMAT_XZ_LZMA2; + fileExtension_[COMPRESSION_FORMAT_GZIP] = ".gz"; + fileExtension_[COMPRESSION_FORMAT_LZMA] = ".lzma"; + fileExtension_[COMPRESSION_FORMAT_BZIP2] = ".bz2"; + fileExtension_[COMPRESSION_FORMAT_XZ_LZMA2] = ".xz"; +} + +void CompressContent::onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session) { + std::shared_ptr<core::FlowFile> flowFile = session->get(); + + if (!flowFile) { + return; + } + + std::string compressFormat = compressFormat_; + if (compressFormat_ == COMPRESSION_FORMAT_ATTRIBUTE) { + std::string attr; + flowFile->getAttribute(FlowAttributeKey(MIME_TYPE), attr); + if (attr.empty()) { + logger_->log_error("No %s attribute existed for the flow, route to failure", FlowAttributeKey(MIME_TYPE)); + session->transfer(flowFile, Failure); + return; + } + auto search = compressionFormatMimeTypeMap_.find(attr); + if (search != compressionFormatMimeTypeMap_.end()) { + compressFormat = search->second; + } else { + logger_->log_info("Mime type of %s is not indicated a support format, route to success", attr); + session->transfer(flowFile, Success); + return; + } + } + std::transform(compressFormat.begin(), compressFormat.end(), compressFormat.begin(), ::tolower); + std::string mimeType; + if (compressFormat == COMPRESSION_FORMAT_GZIP) { + mimeType = "application/gzip"; + } else if (compressFormat == COMPRESSION_FORMAT_BZIP2) { + mimeType = "application/bzip2"; + } else if (compressFormat == COMPRESSION_FORMAT_LZMA) { + mimeType = "application/x-lzma"; + } else if (compressFormat == COMPRESSION_FORMAT_XZ_LZMA2) { + mimeType = "application/x-xz"; + } else { + logger_->log_error("Compress format is invalid %s", compressFormat); + session->transfer(flowFile, Failure); + return; + } + + std::string fileExtension; + auto search = fileExtension_.find(compressFormat); + if (search != fileExtension_.end()) { + fileExtension = search->second; + } + std::shared_ptr<core::FlowFile> processFlowFile = session->create(flowFile); + CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session); + session->write(processFlowFile, &callback); + + if (callback.status_ < 0) { + logger_->log_error("Compress Content processing fail for the flow with UUID %s", flowFile->getUUIDStr()); + session->transfer(flowFile, Failure); + session->remove(processFlowFile); + } else { + std::string fileName; + processFlowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (compressMode_ == MODE_COMPRESS) { + session->putAttribute(processFlowFile, FlowAttributeKey(MIME_TYPE), mimeType); + if (updateFileName_) { + fileName = fileName + fileExtension; + session->putAttribute(processFlowFile, FlowAttributeKey(FILENAME), fileName); + } + } else { + session->removeAttribute(processFlowFile, FlowAttributeKey(MIME_TYPE)); + if (updateFileName_) { + if (fileName.size() >= fileExtension.size() && + fileName.compare(fileName.size() - fileExtension.size(), fileExtension.size(), fileExtension) == 0) { + fileName = fileName.substr(0, fileName.size() - fileExtension.size()); + session->putAttribute(processFlowFile, FlowAttributeKey(FILENAME), fileName); + } + } + } + logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", processFlowFile->getUUIDStr(), fileName); + session->transfer(processFlowFile, Success); + session->remove(flowFile); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/libarchive/CompressContent.h ---------------------------------------------------------------------- diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h new file mode 100644 index 0000000..b34aee7 --- /dev/null +++ b/extensions/libarchive/CompressContent.h @@ -0,0 +1,381 @@ +/** + * @file CompressContent.h + * CompressContent class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __COMPRESS_CONTENT_H__ +#define __COMPRESS_CONTENT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "archive_entry.h" +#include "archive.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define COMPRESSION_FORMAT_ATTRIBUTE "use mime.type attribute" +#define COMPRESSION_FORMAT_GZIP "gzip" +#define COMPRESSION_FORMAT_BZIP2 "bzip2" +#define COMPRESSION_FORMAT_XZ_LZMA2 "xz-lzma2" +#define COMPRESSION_FORMAT_LZMA "lzma" + +#define MODE_COMPRESS "compress" +#define MODE_DECOMPRESS "decompress" + +// CompressContent Class +class CompressContent: public core::Processor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit CompressContent(std::string name, uuid_t uuid = NULL) : + core::Processor(name, uuid), logger_(logging::LoggerFactory<CompressContent>::getLogger()), updateFileName_(false) { + } + // Destructor + virtual ~CompressContent() { + } + // Processor Name + static constexpr char const* ProcessorName = "CompressContent"; + // Supported Properties + static core::Property CompressMode; + static core::Property CompressLevel; + static core::Property CompressFormat; + static core::Property UpdateFileName; + + // Supported Relationships + static core::Relationship Failure; + static core::Relationship Success; + +public: + // Nest Callback Class for read stream from flow for compress + class ReadCallbackCompress: public InputStreamCallback { + public: + ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) : + flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) { + } + ~ReadCallbackCompress() { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int max_read = getpagesize(); + uint8_t buffer[max_read]; + int64_t ret = 0; + uint64_t read_size = 0; + + ret = archive_write_header(arch_, entry_); + if (ret != ARCHIVE_OK) { + logger_->log_error("Compress Content archive error %s", archive_error_string(arch_)); + status_ = -1; + return -1; + } + while (read_size < flow_->getSize()) { + ret = stream->read(buffer, sizeof(buffer)); + if (ret < 0) { + status_ = -1; + return -1; + } + if (ret > 0) { + ret = archive_write_data(arch_, buffer, ret); + if (ret < 0) { + logger_->log_error("Compress Content archive error %s", archive_error_string(arch_)); + status_ = -1; + return -1; + } + read_size += ret; + } else { + break; + } + } + return read_size; + } + std::shared_ptr<core::FlowFile> flow_; + struct archive *arch_; + struct archive_entry *entry_; + int status_; + std::shared_ptr<logging::Logger> logger_; + }; + // Nest Callback Class for read stream from flow for decompress + class ReadCallbackDecompress: public InputStreamCallback { + public: + ReadCallbackDecompress(std::shared_ptr<core::FlowFile> &flow) : + read_size_(0), offset_(0), flow_(flow) { + origin_offset_ = flow_->getOffset(); + } + ~ReadCallbackDecompress() { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + read_size_ = 0; + stream->seek(offset_); + int readRet = stream->read(buffer_, sizeof(buffer_)); + read_size_ = readRet; + if (readRet > 0) { + offset_ += read_size_; + } + return readRet; + } + int64_t read_size_; + uint8_t buffer_[8192]; + uint64_t offset_; + uint64_t origin_offset_; + std::shared_ptr<core::FlowFile> flow_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: + WriteCallback(std::string &compress_mode, int64_t compress_level, std::string &compress_format, + std::shared_ptr<core::FlowFile> &flow, std::shared_ptr<core::ProcessSession> &session) : + compress_mode_(compress_mode), compress_level_(compress_level), compress_format_(compress_format), + flow_(flow), session_(session), + logger_(logging::LoggerFactory<CompressContent>::getLogger()), + readDecompressCb_(flow) { + size_ = 0; + stream_ = nullptr; + status_ = 0; + } + ~WriteCallback() { + } + + std::string compress_mode_; + int64_t compress_level_; + std::string compress_format_; + std::shared_ptr<core::FlowFile> flow_; + std::shared_ptr<core::ProcessSession> session_; + std::shared_ptr<io::BaseStream> stream_; + int64_t size_; + std::shared_ptr<logging::Logger> logger_; + CompressContent::ReadCallbackDecompress readDecompressCb_; + int status_; + + static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size); + if (ret > 0) + callback->size_ += (int64_t) ret; + return ret; + } + + static ssize_t archive_read(struct archive *arch, void *context, const void **buff) { + WriteCallback *callback = (WriteCallback *) context; + callback->session_->read(callback->flow_, &callback->readDecompressCb_); + if (callback->readDecompressCb_.read_size_ >= 0) { + *buff = callback->readDecompressCb_.buffer_; + return callback->readDecompressCb_.read_size_; + } else { + archive_set_error(arch, EIO, "Error reading flowfile"); + return -1; + } + } + + static la_int64_t archive_skip(struct archive *a, void *client_data, la_int64_t request) { + return 0; + } + + void archive_write_log_error_cleanup(struct archive *arch) { + logger_->log_error("Compress Content archive write error %s", archive_error_string(arch)); + status_ = -1; + archive_write_free(arch); + } + + void archive_read_log_error_cleanup(struct archive *arch) { + logger_->log_error("Compress Content archive read error %s", archive_error_string(arch)); + status_ = -1; + archive_read_free(arch); + } + + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; + struct archive *arch; + int r; + + if (compress_mode_ == MODE_COMPRESS) { + arch = archive_write_new(); + if (!arch) { + status_ = -1; + return -1; + } + r = archive_write_set_format_ustar(arch); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + if (compress_format_ == COMPRESSION_FORMAT_GZIP) { + r = archive_write_add_filter_gzip(arch); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + std::string option; + option = "gzip:compression-level=" + std::to_string((int) compress_level_); + r = archive_write_set_options(arch, option.c_str()); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + } else if (compress_format_ == COMPRESSION_FORMAT_BZIP2) { + r = archive_write_add_filter_bzip2(arch); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + } else if (compress_format_ == COMPRESSION_FORMAT_LZMA) { + r = archive_write_add_filter_lzma(arch); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + } else if (compress_format_ == COMPRESSION_FORMAT_XZ_LZMA2) { + r = archive_write_add_filter_xz(arch); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + } else { + archive_write_log_error_cleanup(arch); + return -1; + } + r = archive_write_set_bytes_per_block(arch, 0); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + this->stream_ = stream; + r = archive_write_open(arch, this, NULL, archive_write, NULL); + if (r != ARCHIVE_OK) { + archive_write_log_error_cleanup(arch); + return -1; + } + struct archive_entry *entry = archive_entry_new(); + if (!entry) { + archive_write_log_error_cleanup(arch); + return -1; + } + std::string fileName; + flow_->getAttribute(FlowAttributeKey(FILENAME), fileName); + archive_entry_set_pathname(entry, fileName.c_str()); + archive_entry_set_size(entry, flow_->getSize()); + archive_entry_set_mode(entry, S_IFREG | 0755); + ReadCallbackCompress readCb(flow_, arch, entry); + session_->read(flow_, &readCb); + if (readCb.status_ < 0) { + archive_entry_free(entry); + archive_write_log_error_cleanup(arch); + status_ = -1; + return -1; + } + archive_entry_free(entry); + archive_write_close(arch); + archive_write_free(arch); + return size_; + } else { + arch = archive_read_new(); + if (!arch) { + status_ = -1; + return -1; + } + r = archive_read_support_format_all(arch); + if (r != ARCHIVE_OK) { + archive_read_log_error_cleanup(arch); + return -1; + } + r = archive_read_support_filter_all(arch); + if (r != ARCHIVE_OK) { + archive_read_log_error_cleanup(arch); + return -1; + } + this->stream_ = stream; + r = archive_read_open2(arch, this, NULL, archive_read, archive_skip, NULL); + if (r != ARCHIVE_OK) { + archive_read_log_error_cleanup(arch); + return -1; + } + struct archive_entry *entry; + if (archive_read_next_header(arch, &entry) != ARCHIVE_OK) { + archive_read_log_error_cleanup(arch); + return -1; + } + int entry_size = archive_entry_size(entry); + logger_->log_debug("Decompress Content archive entry size %d", entry_size); + size_ = 0; + while (size_ < entry_size) { + char buffer[8192]; + int ret = archive_read_data(arch, buffer, sizeof(buffer)); + if (ret < 0) { + archive_read_log_error_cleanup(arch); + return -1; + } + if (ret == 0) + break; + size_ += ret; + ret = stream_->write(reinterpret_cast<uint8_t*>(buffer), ret); + if (ret < 0) { + archive_read_log_error_cleanup(arch); + return -1; + } + } + archive_read_close(arch); + archive_read_free(arch); + return size_; + } + } + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi CompressContent + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + } + // OnTrigger method, implemented by NiFi CompressContent + virtual void onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session); + // Initialize, over write by NiFi CompressContent + virtual void initialize(void); + +protected: + +private: + std::shared_ptr<logging::Logger> logger_; + int64_t compressLevel_; + std::string compressMode_; + std::string compressFormat_; + bool updateFileName_; + std::map<std::string, std::string> compressionFormatMimeTypeMap_; + std::map<std::string, std::string> fileExtension_; +}; + +REGISTER_RESOURCE (CompressContent); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/rocksdb-repos/RocksDbStream.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h index da08899..e4feb62 100644 --- a/extensions/rocksdb-repos/RocksDbStream.h +++ b/extensions/rocksdb-repos/RocksDbStream.h @@ -65,7 +65,7 @@ class RocksDbStream : public io::BaseStream { */ void seek(uint64_t offset); - const uint32_t getSize() const { + const uint64_t getSize() const { return size_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/AtomicEntryStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h index 3eb456e..181b69d 100644 --- a/libminifi/include/io/AtomicEntryStream.h +++ b/libminifi/include/io/AtomicEntryStream.h @@ -61,7 +61,7 @@ class AtomicEntryStream : public BaseStream { */ void seek(uint64_t offset); - virtual const uint32_t getSize() const { + virtual const uint64_t getSize() const { return length_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/BaseStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h index dc810e3..3410be6 100644 --- a/libminifi/include/io/BaseStream.h +++ b/libminifi/include/io/BaseStream.h @@ -60,7 +60,7 @@ class BaseStream : public DataStream, public Serializable { int writeData(uint8_t *value, int size); - virtual void seek(uint32_t offset) { + virtual void seek(uint64_t offset) { if (composable_stream_ != this) { composable_stream_->seek(offset); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index c9ad90e..216ef3d 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -212,7 +212,7 @@ class Socket : public BaseStream { * Retrieve size of data stream * @return size of data stream **/ - const uint32_t getSize() const { + const uint64_t getSize() const { return DataStream::getSize(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/DataStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h index 2ebc9a4..e0fa4be 100644 --- a/libminifi/include/io/DataStream.h +++ b/libminifi/include/io/DataStream.h @@ -60,7 +60,7 @@ class DataStream { return 0; } - virtual void seek(uint32_t offset) { + virtual void seek(uint64_t offset) { readBuffer += offset; } @@ -117,7 +117,7 @@ class DataStream { * Retrieve size of data stream * @return size of data stream **/ - virtual const uint32_t getSize() const { + virtual const uint64_t getSize() const { return buffer.size(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/FileStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h index 0cddcc2..94d13b2 100644 --- a/libminifi/include/io/FileStream.h +++ b/libminifi/include/io/FileStream.h @@ -64,7 +64,7 @@ class FileStream : public io::BaseStream { */ void seek(uint64_t offset); - const uint32_t getSize() const { + const uint64_t getSize() const { return length_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/src/io/FileStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp index 93d6411..93a38df 100644 --- a/libminifi/src/io/FileStream.cpp +++ b/libminifi/src/io/FileStream.cpp @@ -47,7 +47,8 @@ FileStream::FileStream(const std::string &path) FileStream::FileStream(const std::string &path, uint32_t offset, bool write_enable) : logger_(logging::LoggerFactory<FileStream>::getLogger()), - path_(path) { + path_(path), + offset_(offset) { file_stream_ = std::unique_ptr<std::fstream>(new std::fstream()); if (write_enable) { file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/test/archive-tests/CompressContentTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp new file mode 100644 index 0000000..a01d632 --- /dev/null +++ b/libminifi/test/archive-tests/CompressContentTests.cpp @@ -0,0 +1,933 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include <uuid/uuid.h> +#include <fstream> +#include <map> +#include <memory> +#include <utility> +#include <string> +#include <set> +#include "FlowController.h" +#include "../TestBase.h" +#include "core/Core.h" +#include "../../include/core/FlowFile.h" +#include "../unit/ProvenanceTestHelper.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "CompressContent.h" +#include "io/FileStream.h" +#include "FlowFileRecord.h" +#include <sstream> +#include <iostream> + +static const char* EXPECT_COMPRESS_CONTENT = "/tmp/minifi-expect-compresscontent.txt"; +static const char* COMPRESS_CONTENT = "/tmp/minifi-compresscontent"; +static unsigned int globalSeed; + +class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback { + public: + explicit ReadCallback(uint64_t size) : + read_size_(0) { + buffer_size_ = size; + buffer_ = new uint8_t[buffer_size_]; + archive_buffer_ = nullptr; + } + ~ReadCallback() { + if (buffer_) + delete[] buffer_; + if (archive_buffer_) + delete[] archive_buffer_; + } + int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) { + int64_t ret = 0; + ret = stream->read(buffer_, buffer_size_); + if (!stream) + read_size_ = stream->getSize(); + else + read_size_ = buffer_size_; + return ret; + } + void archive_read() { + struct archive *a; + a = archive_read_new(); + archive_read_support_format_all(a); + archive_read_support_filter_all(a); + archive_read_open_memory(a, buffer_, read_size_); + struct archive_entry *ae; + + if (archive_read_next_header(a, &ae) == ARCHIVE_OK) { + int size = archive_entry_size(ae); + archive_buffer_ = new char[size]; + archive_buffer_size_ = size; + archive_read_data(a, archive_buffer_, size); + } + archive_read_free(a); + } + + uint8_t *buffer_; + uint64_t buffer_size_; + uint64_t read_size_; + char *archive_buffer_; + int archive_buffer_size_; +}; + +TEST_CASE("CompressFileGZip", "[compressfiletest1]") { + try { + std::ofstream expectfile; + expectfile.open(EXPECT_COMPRESS_CONTENT); + + for (int i = 0; i < 100000; i++) { + expectfile << std::to_string(rand_r(&globalSeed)%100); + } + expectfile.close(); + + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime); + REQUIRE(mime == "application/gzip"); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + callback.archive_read(); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_); + REQUIRE(expectContents == contents); + // write the compress content for next test + std::ofstream file(COMPRESS_CONTENT); + file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + file.close(); + file1.close(); + } + LogTestController::getInstance().reset(); + } catch (...) { + } +} + +TEST_CASE("DecompressFileGZip", "[compressfiletest2]") { + try { + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + REQUIRE(expectContents == contents); + file1.close(); + } + LogTestController::getInstance().reset(); + unlink(COMPRESS_CONTENT); + unlink(EXPECT_COMPRESS_CONTENT); + } catch (...) { + } +} + +TEST_CASE("CompressFileBZip", "[compressfiletest3]") { + try { + std::ofstream expectfile; + expectfile.open(EXPECT_COMPRESS_CONTENT); + + for (int i = 0; i < 100000; i++) { + expectfile << std::to_string(rand_r(&globalSeed)%100); + } + expectfile.close(); + + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime); + REQUIRE(mime == "application/bzip2"); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + callback.archive_read(); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_); + REQUIRE(expectContents == contents); + // write the compress content for next test + std::ofstream file(COMPRESS_CONTENT); + file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + file.close(); + file1.close(); + } + LogTestController::getInstance().reset(); + } catch (...) { + } +} + + +TEST_CASE("DecompressFileBZip", "[compressfiletest4]") { + try { + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + REQUIRE(expectContents == contents); + file1.close(); + } + LogTestController::getInstance().reset(); + unlink(COMPRESS_CONTENT); + unlink(EXPECT_COMPRESS_CONTENT); + } catch (...) { + } +} + +TEST_CASE("CompressFileLZMA", "[compressfiletest5]") { + try { + std::ofstream expectfile; + expectfile.open(EXPECT_COMPRESS_CONTENT); + + for (int i = 0; i < 100000; i++) { + expectfile << std::to_string(rand_r(&globalSeed)%100); + } + expectfile.close(); + + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + if (LogTestController::getInstance().contains("compression not supported on this platform")) { + // platform not support LZMA + LogTestController::getInstance().reset(); + return; + } + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime); + REQUIRE(mime == "application/x-lzma"); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + callback.archive_read(); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_); + REQUIRE(expectContents == contents); + // write the compress content for next test + std::ofstream file(COMPRESS_CONTENT); + file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + file.close(); + file1.close(); + } + LogTestController::getInstance().reset(); + } catch (...) { + } +} + + +TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") { + try { + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0); + flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma"); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + if (LogTestController::getInstance().contains("compression not supported on this platform")) { + // platform not support LZMA + LogTestController::getInstance().reset(); + return; + } + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + REQUIRE(expectContents == contents); + file1.close(); + } + LogTestController::getInstance().reset(); + unlink(COMPRESS_CONTENT); + unlink(EXPECT_COMPRESS_CONTENT); + } catch (...) { + } +} + +TEST_CASE("CompressFileXYLZMA", "[compressfiletest7]") { + try { + std::ofstream expectfile; + expectfile.open(EXPECT_COMPRESS_CONTENT); + + for (int i = 0; i < 100000; i++) { + expectfile << std::to_string(rand_r(&globalSeed)%100); + } + expectfile.close(); + + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + if (LogTestController::getInstance().contains("compression not supported on this platform")) { + // platform not support LZMA + LogTestController::getInstance().reset(); + return; + } + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime); + REQUIRE(mime == "application/x-xz"); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + callback.archive_read(); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_); + REQUIRE(expectContents == contents); + // write the compress content for next test + std::ofstream file(COMPRESS_CONTENT); + file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + file.close(); + file1.close(); + } + LogTestController::getInstance().reset(); + } catch (...) { + } +} + + +TEST_CASE("DecompressFileXYLZMA", "[compressfiletest8]") { + try { + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + // connection from compress processor to log attribute + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "compress successful output")); + connection->setSource(processor); + connection->setDestination(logAttributeProcessor); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logAttributeuuid); + processor->addConnection(connection); + // connection to compress processor + std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection"); + compressconnection->setDestination(processor); + compressconnection->setDestinationUUID(processoruuid); + processor->addConnection(compressconnection); + + std::set<core::Relationship> autoTerminatedRelationships; + core::Relationship failure("failure", ""); + autoTerminatedRelationships.insert(failure); + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + logAttributeProcessor->incrementActiveTasks(); + logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9"); + context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); + std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0); + flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz"); + income_connection->put(flow); + + REQUIRE(processor->getName() == "compresscontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + + if (LogTestController::getInstance().contains("compression not supported on this platform")) { + // platform not support LZMA + LogTestController::getInstance().reset(); + return; + } + + // validate the compress content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + REQUIRE(flow1->getSize() != flow->getSize()); + std::string mime; + REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false); + ReadCallback callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT); + std::ifstream file1; + file1.open(flowFileName, std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); + REQUIRE(expectContents == contents); + file1.close(); + } + LogTestController::getInstance().reset(); + unlink(COMPRESS_CONTENT); + unlink(EXPECT_COMPRESS_CONTENT); + } catch (...) { + } +} +
