Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551465 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: + ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : + buffer_size_(size), arch_(arch), entry_(entry) { + } + ~ReadCallback() { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) + read_size = stream->getSize(); + else + read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; + } + uint64_t buffer_size_; + struct archive *arch_; + struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: + WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession *session) : + merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; + } + ~WriteCallback() { + } + + std::string merge_type_; + std::deque<std::shared_ptr<core::FlowFile>> &flows_; + core::ProcessSession *session_; + std::shared_ptr<io::BaseStream> stream_; + int64_t size_; + + static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size); + if (ret > 0) + callback->size_ += (int64_t) ret; + return ret; + } + + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { + archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { + struct archive_entry *entry = archive_entry_new(); + std::string fileName; + flow->getAttribute(FlowAttributeKey(FILENAME), fileName); + archive_entry_set_pathname(entry, fileName.c_str()); + archive_entry_set_size(entry, flow->getSize()); + archive_entry_set_mode(entry, S_IFREG | 0755); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { + try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); + } catch (...) { + } + } + } + ReadCallback readCb(flow->getSize(), arch, entry); --- End diff -- will try to do incremental read.
---