Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551465
  
    --- Diff: libminifi/include/processors/MergeContent.h ---
    @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
     };
     
     
    +// Archive Class
    +class ArchiveMerge {
    +public:
    +  // Nest Callback Class for read stream
    +  class ReadCallback: public InputStreamCallback {
    +  public:
    +    ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
    +        buffer_size_(size), arch_(arch), entry_(entry) {
    +    }
    +    ~ReadCallback() {
    +    }
    +    int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +      uint8_t buffer[buffer_size_];
    +      int64_t ret = 0;
    +      uint64_t read_size;
    +      ret = stream->read(buffer, buffer_size_);
    +      if (!stream)
    +        read_size = stream->getSize();
    +      else
    +        read_size = buffer_size_;
    +      ret = archive_write_header(arch_, entry_);
    +      ret += archive_write_data(arch_, buffer, read_size);
    +      return ret;
    +    }
    +    uint64_t buffer_size_;
    +    struct archive *arch_;
    +    struct archive_entry *entry_;
    +  };
    +  // Nest Callback Class for write stream
    +  class WriteCallback: public OutputStreamCallback {
    +  public:
    +    WriteCallback(std::string merge_type, 
std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession 
*session) :
    +        merge_type_(merge_type), flows_(flows), session_(session) {
    +      size_ = 0;
    +      stream_ = nullptr;
    +    }
    +    ~WriteCallback() {
    +    }
    +
    +    std::string merge_type_;
    +    std::deque<std::shared_ptr<core::FlowFile>> &flows_;
    +    core::ProcessSession *session_;
    +    std::shared_ptr<io::BaseStream> stream_;
    +    int64_t size_;
    +
    +    static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
    +      WriteCallback *callback = (WriteCallback *) context;
    +      la_ssize_t ret = 
callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), 
size);
    +      if (ret > 0)
    +        callback->size_ += (int64_t) ret;
    +      return ret;
    +    }
    +
    +    int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +      int64_t ret = 0;
    +      struct archive *arch;
    +
    +      arch = archive_write_new();
    +      if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
    +        archive_write_set_format_pax_restricted(arch); // tar format
    +      }
    +      if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
    +        archive_write_set_format_zip(arch); // zip format
    +      }
    +      archive_write_set_bytes_per_block(arch, 0);
    +      archive_write_add_filter_none(arch);
    +      this->stream_ = stream;
    +      archive_write_open(arch, this, NULL, archive_write, NULL);
    +
    +      for (auto flow : flows_) {
    +        struct archive_entry *entry = archive_entry_new();
    +        std::string fileName;
    +        flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
    +        archive_entry_set_pathname(entry, fileName.c_str());
    +        archive_entry_set_size(entry, flow->getSize());
    +        archive_entry_set_mode(entry, S_IFREG | 0755);
    +        if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
    +          std::string perm;
    +          int permInt;
    +          if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
    +            try {
    +              permInt = std::stoi(perm);
    +              archive_entry_set_perm(entry, (mode_t) permInt);
    +            } catch (...) {
    +            }
    +          }
    +        }
    +        ReadCallback readCb(flow->getSize(), arch, entry);
    --- End diff --
    
    will try to do incremental read.


---

Reply via email to