[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144319079 --- Diff: LICENSE --- @@ -564,4 +564,55 @@ ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +The libarchive distribution as a whole is Copyright by Tim Kientzle --- End diff -- We need to be more detailed and specific in our LICENSE than this. We will need to break out particular items and their terms for those efforts as there are several licenses and considerations at play for the various components of this library. I would additionally recommend removing all extraneous parts of libarchive to reduce this footprint such as examples and test resources and those build scripts unneeded for our build. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144026084 --- Diff: extensions/http-curl/CMakeLists.txt --- @@ -24,7 +24,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/) --- End diff -- it is FlowConfiguration.h which include MergeContent.h ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144019540 --- Diff: extensions/http-curl/CMakeLists.txt --- @@ -24,7 +24,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/) --- End diff -- oh that's far too generic of an include directories statement, that's why. we shouldn't be cross compiling merge content for curl. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912126 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; --- End diff -- same as blew zip. we will tar the tar file into a tar ball. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912068 --- Diff: extensions/http-curl/CMakeLists.txt --- @@ -24,7 +24,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/) --- End diff -- Scanning dependencies of target minifi-http-curl [ 15%] Building CXX object extensions/http-curl/CMakeFiles/minifi-http-curl.dir/HttpCurlLoader.cpp.o In file included from /Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/HttpCurlLoader.cpp:20: In file included from /Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/core/FlowConfiguration.h:37: /Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/processors/MergeContent.h:24:10: fatal error: 'archive_entry.h' file not found ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143758285 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; +session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + } + return flowFile; +} + +std::shared_ptr ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".zip"; --- End diff -- very cool, thanks! ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143552652 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; +session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + } + return flowFile; +} + +std::shared_ptr ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".zip"; --- End diff -- if two sep zip files fail into the same bin, we will create a single zip file which zip these two zip files. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551961 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; +session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + } + return flowFile; +} + +std::shared_ptr ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".zip"; --- End diff -- yes, we will zip the two separate zip into a larger one ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551465 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) +read_size = stream->getSize(); + else +read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; +} +uint64_t buffer_size_; +struct archive *arch_; +struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(std::string merge_type, std::deque> &flows, core::ProcessSession *session) : +merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; +} +~WriteCallback() { +} + +std::string merge_type_; +std::deque> &flows_; +core::ProcessSession *session_; +std::shared_ptr stream_; +int64_t size_; + +static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast(const_cast(buff)), size); + if (ret > 0) +callback->size_ += (int64_t) ret; + return ret; +} + +int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { +archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { +archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { +struct archive_entry *entry = archive_entry_new(); +std::string fileName; +flow->getAttribute(FlowAttributeKey(FILENAME), fileName); +archive_entry_set_pathname(entry, fileName.c_str()); +archive_entry_set_size(entry, flow->getSize()); +archive_entry_set_mode(entry, S_IFREG | 0755); +if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { +try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); +} catch (...) { +} + } +} +ReadCallback readCb(flow->getSize(), arch, entry); --- End diff -- will try to do incremental read. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551088 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) +read_size = stream->getSize(); + else +read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; +} +uint64_t buffer_size_; +struct archive *arch_; +struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(std::string merge_type, std::deque> &flows, core::ProcessSession *session) : +merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; +} +~WriteCallback() { +} + +std::string merge_type_; +std::deque> &flows_; +core::ProcessSession *session_; +std::shared_ptr stream_; +int64_t size_; + +static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast(const_cast(buff)), size); + if (ret > 0) +callback->size_ += (int64_t) ret; + return ret; +} + +int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { +archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { +archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { +struct archive_entry *entry = archive_entry_new(); +std::string fileName; +flow->getAttribute(FlowAttributeKey(FILENAME), fileName); +archive_entry_set_pathname(entry, fileName.c_str()); +archive_entry_set_size(entry, flow->getSize()); +archive_entry_set_mode(entry, S_IFREG | 0755); +if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { +try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); +} catch (...) { --- End diff -- will do. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143550805 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) --- End diff -- !stream means that we read EOF after the operation, in this case, we did not get the full size. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514354 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) +read_size = stream->getSize(); + else +read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; +} +uint64_t buffer_size_; +struct archive *arch_; +struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(std::string merge_type, std::deque> &flows, core::ProcessSession *session) : +merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; +} +~WriteCallback() { +} + +std::string merge_type_; +std::deque> &flows_; +core::ProcessSession *session_; +std::shared_ptr stream_; +int64_t size_; + +static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast(const_cast(buff)), size); + if (ret > 0) +callback->size_ += (int64_t) ret; + return ret; +} + +int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { +archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { +archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { +struct archive_entry *entry = archive_entry_new(); +std::string fileName; +flow->getAttribute(FlowAttributeKey(FILENAME), fileName); +archive_entry_set_pathname(entry, fileName.c_str()); +archive_entry_set_size(entry, flow->getSize()); +archive_entry_set_mode(entry, S_IFREG | 0755); +if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { +try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); +} catch (...) { --- End diff -- Could we log perm when this happens? ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143513518 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) --- End diff -- Checking whether stream is null after you attempt a function call on it seems counterintuitive. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514069 --- Diff: extensions/http-curl/CMakeLists.txt --- @@ -24,7 +24,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/) --- End diff -- Do you recall the error that necessitated adding lib archive here? ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514525 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; --- End diff -- what if your file is already a tar? Will we handle that case and then also rename it fileName.tar.tar? ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514729 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; +session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + } + return flowFile; +} + +std::shared_ptr ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, std::string &header, +std::string &footer, std::string &demarcator) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session); + session->write(flowFile, &callback); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".zip"; --- End diff -- Same here...I don't really mind if it's named file.zip.zip, but if we have the flow going through zip twice, will we zip the two separate zips into a larger one or combine them? This isn't clear to me from reading the code. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143513901 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) +read_size = stream->getSize(); + else +read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; +} +uint64_t buffer_size_; +struct archive *arch_; +struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(std::string merge_type, std::deque> &flows, core::ProcessSession *session) : +merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; +} +~WriteCallback() { +} + +std::string merge_type_; +std::deque> &flows_; +core::ProcessSession *session_; +std::shared_ptr stream_; +int64_t size_; + +static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast(const_cast(buff)), size); + if (ret > 0) +callback->size_ += (int64_t) ret; + return ret; +} + +int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { +archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { +archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { +struct archive_entry *entry = archive_entry_new(); +std::string fileName; +flow->getAttribute(FlowAttributeKey(FILENAME), fileName); +archive_entry_set_pathname(entry, fileName.c_str()); +archive_entry_set_size(entry, flow->getSize()); +archive_entry_set_mode(entry, S_IFREG | 0755); +if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { +try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); +} catch (...) { +} + } +} +ReadCallback readCb(flow->getSize(), arch, entry); --- End diff -- We instantiate the entire buffer of the archive into memory? If that is our process, should we not have memory limits? ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/146 Archive merge Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp archive_merge Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/146.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #146 commit 7f052c84312ba988a016de126f334576f6f121ed Author: Marc Parisi Date: 2017-06-30T14:05:15Z MINIFI-338: Convert processor threads to use thread pools commit 97c8a7f11e4e5cd4fd86439dd1b8744382fd7e2c Author: Marc Parisi Date: 2017-07-05T16:13:45Z MINIFI-338: Address linter errors commit 9d500354a3a4c5538ee425162482cb5e8af1bf00 Author: Marc Date: 2017-07-20T23:28:26Z MINIFI-338: Improve wait decay per pull request comments commit 58c21e0bf95197bf746b7f2c45ba5b3f66010d29 Author: Bin Qiu Date: 2017-08-02T23:18:16Z MINIFI-338: Convert processor threads to use thread pools Author: Marc Parisi Signed-off-by: Bin Qiu This closes #177 commit e98ff6844b059763d587b0151c016c466a330461 Author: Andrew I. Christianson Date: 2017-08-08T17:16:19Z MINIFI-367 port tests to use boost::filesystem vs. stat.h for better portability This closes #124. Signed-off-by: Marc Parisi commit 058bb84ba18be6e3da26573849d05bde8ce8fdb0 Author: Marc Parisi Date: 2017-08-15T19:05:41Z MINIFI-375: Remove forward slash from urls This closes #127. Signed-off-by: Aldrin Piri commit 951c03e3db705c0472f052dd3d883dca01913c80 Author: Andrew I. Christianson Date: 2017-08-16T15:56:34Z MINIFI-376 removed defunct references to curlbuild.h This closes #128. Signed-off-by: Aldrin Piri commit 893e87d145031b48b98b353d6bf9137a0cc19d63 Author: Andrew I. Christianson Date: 2017-08-09T15:53:04Z MINIFI-368 exclude hidden files when scanning for src files This closes #125. Signed-off-by: Aldrin Piri commit 7492146bdf3954268af9f3efac89e38bfd95dfd4 Author: Bin Qiu Date: 2017-09-20T00:15:10Z Merge remote-tracking branch 'upstream/master' commit 453307735b4853ead0f1216579861fe1056ef6a0 Author: Bin Qiu Date: 2017-10-05T04:43:08Z Merge remote-tracking branch 'apache/master' commit ed7989d39c7e460ddae661ef905f237cae8e9e8f Author: Bin Qiu Date: 2017-10-09T15:57:15Z MINIFICPP-72: Add Tar and Zip Support for MergeContent ---