[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-12 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144319079
  
--- Diff: LICENSE ---
@@ -564,4 +564,55 @@ ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
OUT OF OR IN
 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 SOFTWARE.
 
+The libarchive distribution as a whole is Copyright by Tim Kientzle
--- End diff --

We need to be more detailed and specific in our LICENSE than this.  We will 
need to break out particular items and their terms for those efforts as there 
are several licenses and considerations at play for the various components of 
this library. 

I would additionally recommend removing all extraneous parts of libarchive 
to reduce this footprint such as examples and test resources and those build 
scripts unneeded for our build.  


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-11 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144026084
  
--- Diff: extensions/http-curl/CMakeLists.txt ---
@@ -24,7 +24,7 @@ find_package(CURL REQUIRED)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include 
../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/)
--- End diff --

it is FlowConfiguration.h which include MergeContent.h



---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-11 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144019540
  
--- Diff: extensions/http-curl/CMakeLists.txt ---
@@ -24,7 +24,7 @@ find_package(CURL REQUIRED)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include 
../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/)
--- End diff --

oh that's far too generic of an include directories statement, that's why. 
we shouldn't be cross compiling merge content for curl. 


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-10 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912126
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
--- End diff --

same as blew zip. we will tar the tar file into a tar ball.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-10 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912068
  
--- Diff: extensions/http-curl/CMakeLists.txt ---
@@ -24,7 +24,7 @@ find_package(CURL REQUIRED)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include 
../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/)
--- End diff --

Scanning dependencies of target minifi-http-curl
[ 15%] Building CXX object 
extensions/http-curl/CMakeFiles/minifi-http-curl.dir/HttpCurlLoader.cpp.o
In file included from 
/Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/HttpCurlLoader.cpp:20:
In file included from 
/Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/core/FlowConfiguration.h:37:

/Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/processors/MergeContent.h:24:10:
 fatal error: 'archive_entry.h' file not found


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-10 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143758285
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
+session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+  }
+  return flowFile;
+}
+
+std::shared_ptr ZipMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".zip";
--- End diff --

very cool, thanks!


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143552652
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
+session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+  }
+  return flowFile;
+}
+
+std::shared_ptr ZipMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".zip";
--- End diff --

if two sep zip files fail into the same bin, we will create a single zip 
file which zip these two zip files.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551961
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
+session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+  }
+  return flowFile;
+}
+
+std::shared_ptr ZipMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".zip";
--- End diff --

yes, we will zip the two separate zip into a larger one


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551465
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
+read_size = stream->getSize();
+  else
+read_size = buffer_size_;
+  ret = archive_write_header(arch_, entry_);
+  ret += archive_write_data(arch_, buffer, read_size);
+  return ret;
+}
+uint64_t buffer_size_;
+struct archive *arch_;
+struct archive_entry *entry_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(std::string merge_type, 
std::deque> &flows, core::ProcessSession 
*session) :
+merge_type_(merge_type), flows_(flows), session_(session) {
+  size_ = 0;
+  stream_ = nullptr;
+}
+~WriteCallback() {
+}
+
+std::string merge_type_;
+std::deque> &flows_;
+core::ProcessSession *session_;
+std::shared_ptr stream_;
+int64_t size_;
+
+static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
+  WriteCallback *callback = (WriteCallback *) context;
+  la_ssize_t ret = 
callback->stream_->write(reinterpret_cast(const_cast(buff)), 
size);
+  if (ret > 0)
+callback->size_ += (int64_t) ret;
+  return ret;
+}
+
+int64_t process(std::shared_ptr stream) {
+  int64_t ret = 0;
+  struct archive *arch;
+
+  arch = archive_write_new();
+  if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+archive_write_set_format_pax_restricted(arch); // tar format
+  }
+  if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+archive_write_set_format_zip(arch); // zip format
+  }
+  archive_write_set_bytes_per_block(arch, 0);
+  archive_write_add_filter_none(arch);
+  this->stream_ = stream;
+  archive_write_open(arch, this, NULL, archive_write, NULL);
+
+  for (auto flow : flows_) {
+struct archive_entry *entry = archive_entry_new();
+std::string fileName;
+flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+archive_entry_set_pathname(entry, fileName.c_str());
+archive_entry_set_size(entry, flow->getSize());
+archive_entry_set_mode(entry, S_IFREG | 0755);
+if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+  std::string perm;
+  int permInt;
+  if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
+try {
+  permInt = std::stoi(perm);
+  archive_entry_set_perm(entry, (mode_t) permInt);
+} catch (...) {
+}
+  }
+}
+ReadCallback readCb(flow->getSize(), arch, entry);
--- End diff --

will try to do incremental read.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551088
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
+read_size = stream->getSize();
+  else
+read_size = buffer_size_;
+  ret = archive_write_header(arch_, entry_);
+  ret += archive_write_data(arch_, buffer, read_size);
+  return ret;
+}
+uint64_t buffer_size_;
+struct archive *arch_;
+struct archive_entry *entry_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(std::string merge_type, 
std::deque> &flows, core::ProcessSession 
*session) :
+merge_type_(merge_type), flows_(flows), session_(session) {
+  size_ = 0;
+  stream_ = nullptr;
+}
+~WriteCallback() {
+}
+
+std::string merge_type_;
+std::deque> &flows_;
+core::ProcessSession *session_;
+std::shared_ptr stream_;
+int64_t size_;
+
+static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
+  WriteCallback *callback = (WriteCallback *) context;
+  la_ssize_t ret = 
callback->stream_->write(reinterpret_cast(const_cast(buff)), 
size);
+  if (ret > 0)
+callback->size_ += (int64_t) ret;
+  return ret;
+}
+
+int64_t process(std::shared_ptr stream) {
+  int64_t ret = 0;
+  struct archive *arch;
+
+  arch = archive_write_new();
+  if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+archive_write_set_format_pax_restricted(arch); // tar format
+  }
+  if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+archive_write_set_format_zip(arch); // zip format
+  }
+  archive_write_set_bytes_per_block(arch, 0);
+  archive_write_add_filter_none(arch);
+  this->stream_ = stream;
+  archive_write_open(arch, this, NULL, archive_write, NULL);
+
+  for (auto flow : flows_) {
+struct archive_entry *entry = archive_entry_new();
+std::string fileName;
+flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+archive_entry_set_pathname(entry, fileName.c_str());
+archive_entry_set_size(entry, flow->getSize());
+archive_entry_set_mode(entry, S_IFREG | 0755);
+if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+  std::string perm;
+  int permInt;
+  if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
+try {
+  permInt = std::stoi(perm);
+  archive_entry_set_perm(entry, (mode_t) permInt);
+} catch (...) {
--- End diff --

will do.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143550805
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
--- End diff --

!stream means that we read EOF after the operation, in this case, we did 
not get the full size.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514354
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
+read_size = stream->getSize();
+  else
+read_size = buffer_size_;
+  ret = archive_write_header(arch_, entry_);
+  ret += archive_write_data(arch_, buffer, read_size);
+  return ret;
+}
+uint64_t buffer_size_;
+struct archive *arch_;
+struct archive_entry *entry_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(std::string merge_type, 
std::deque> &flows, core::ProcessSession 
*session) :
+merge_type_(merge_type), flows_(flows), session_(session) {
+  size_ = 0;
+  stream_ = nullptr;
+}
+~WriteCallback() {
+}
+
+std::string merge_type_;
+std::deque> &flows_;
+core::ProcessSession *session_;
+std::shared_ptr stream_;
+int64_t size_;
+
+static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
+  WriteCallback *callback = (WriteCallback *) context;
+  la_ssize_t ret = 
callback->stream_->write(reinterpret_cast(const_cast(buff)), 
size);
+  if (ret > 0)
+callback->size_ += (int64_t) ret;
+  return ret;
+}
+
+int64_t process(std::shared_ptr stream) {
+  int64_t ret = 0;
+  struct archive *arch;
+
+  arch = archive_write_new();
+  if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+archive_write_set_format_pax_restricted(arch); // tar format
+  }
+  if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+archive_write_set_format_zip(arch); // zip format
+  }
+  archive_write_set_bytes_per_block(arch, 0);
+  archive_write_add_filter_none(arch);
+  this->stream_ = stream;
+  archive_write_open(arch, this, NULL, archive_write, NULL);
+
+  for (auto flow : flows_) {
+struct archive_entry *entry = archive_entry_new();
+std::string fileName;
+flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+archive_entry_set_pathname(entry, fileName.c_str());
+archive_entry_set_size(entry, flow->getSize());
+archive_entry_set_mode(entry, S_IFREG | 0755);
+if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+  std::string perm;
+  int permInt;
+  if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
+try {
+  permInt = std::stoi(perm);
+  archive_entry_set_perm(entry, (mode_t) permInt);
+} catch (...) {
--- End diff --

Could we log perm when this happens?


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143513518
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
--- End diff --

Checking whether stream is null after you attempt a function call on it 
seems counterintuitive. 


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514069
  
--- Diff: extensions/http-curl/CMakeLists.txt ---
@@ -24,7 +24,7 @@ find_package(CURL REQUIRED)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include 
../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/)
--- End diff --

Do you recall the error that necessitated adding lib archive here?


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514525
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
--- End diff --

what if your file is already a tar? Will we handle that case and then also 
rename it fileName.tar.tar?


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143514729
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
+session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+  }
+  return flowFile;
+}
+
+std::shared_ptr ZipMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque> &flows, std::string &header,
+std::string &footer, std::string &demarcator) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".zip";
--- End diff --

Same here...I don't really mind if it's named file.zip.zip, but if we have 
the flow going through zip twice, will we zip the two separate zips into a 
larger one or combine them? This isn't clear to me from reading the code. 


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143513901
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
+read_size = stream->getSize();
+  else
+read_size = buffer_size_;
+  ret = archive_write_header(arch_, entry_);
+  ret += archive_write_data(arch_, buffer, read_size);
+  return ret;
+}
+uint64_t buffer_size_;
+struct archive *arch_;
+struct archive_entry *entry_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(std::string merge_type, 
std::deque> &flows, core::ProcessSession 
*session) :
+merge_type_(merge_type), flows_(flows), session_(session) {
+  size_ = 0;
+  stream_ = nullptr;
+}
+~WriteCallback() {
+}
+
+std::string merge_type_;
+std::deque> &flows_;
+core::ProcessSession *session_;
+std::shared_ptr stream_;
+int64_t size_;
+
+static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
+  WriteCallback *callback = (WriteCallback *) context;
+  la_ssize_t ret = 
callback->stream_->write(reinterpret_cast(const_cast(buff)), 
size);
+  if (ret > 0)
+callback->size_ += (int64_t) ret;
+  return ret;
+}
+
+int64_t process(std::shared_ptr stream) {
+  int64_t ret = 0;
+  struct archive *arch;
+
+  arch = archive_write_new();
+  if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+archive_write_set_format_pax_restricted(arch); // tar format
+  }
+  if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+archive_write_set_format_zip(arch); // zip format
+  }
+  archive_write_set_bytes_per_block(arch, 0);
+  archive_write_add_filter_none(arch);
+  this->stream_ = stream;
+  archive_write_open(arch, this, NULL, archive_write, NULL);
+
+  for (auto flow : flows_) {
+struct archive_entry *entry = archive_entry_new();
+std::string fileName;
+flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+archive_entry_set_pathname(entry, fileName.c_str());
+archive_entry_set_size(entry, flow->getSize());
+archive_entry_set_mode(entry, S_IFREG | 0755);
+if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+  std::string perm;
+  int permInt;
+  if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
+try {
+  permInt = std::stoi(perm);
+  archive_entry_set_perm(entry, (mode_t) permInt);
+} catch (...) {
+}
+  }
+}
+ReadCallback readCb(flow->getSize(), arch, entry);
--- End diff --

We instantiate the entire buffer of the archive into memory? If that is our 
process, should we not have memory limits?


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146

Archive merge

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp archive_merge

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #146


commit 7f052c84312ba988a016de126f334576f6f121ed
Author: Marc Parisi 
Date:   2017-06-30T14:05:15Z

MINIFI-338: Convert processor threads to use thread pools

commit 97c8a7f11e4e5cd4fd86439dd1b8744382fd7e2c
Author: Marc Parisi 
Date:   2017-07-05T16:13:45Z

MINIFI-338: Address linter errors

commit 9d500354a3a4c5538ee425162482cb5e8af1bf00
Author: Marc 
Date:   2017-07-20T23:28:26Z

MINIFI-338: Improve wait decay per pull request comments

commit 58c21e0bf95197bf746b7f2c45ba5b3f66010d29
Author: Bin Qiu 
Date:   2017-08-02T23:18:16Z

MINIFI-338: Convert processor threads to use thread pools

Author: Marc Parisi 

Signed-off-by: Bin Qiu 

This closes #177

commit e98ff6844b059763d587b0151c016c466a330461
Author: Andrew I. Christianson 
Date:   2017-08-08T17:16:19Z

MINIFI-367 port tests to use boost::filesystem vs. stat.h for better 
portability

This closes #124.

Signed-off-by: Marc Parisi 

commit 058bb84ba18be6e3da26573849d05bde8ce8fdb0
Author: Marc Parisi 
Date:   2017-08-15T19:05:41Z

MINIFI-375: Remove forward slash from urls

This closes #127.

Signed-off-by: Aldrin Piri 

commit 951c03e3db705c0472f052dd3d883dca01913c80
Author: Andrew I. Christianson 
Date:   2017-08-16T15:56:34Z

MINIFI-376 removed defunct references to curlbuild.h

This closes #128.

Signed-off-by: Aldrin Piri 

commit 893e87d145031b48b98b353d6bf9137a0cc19d63
Author: Andrew I. Christianson 
Date:   2017-08-09T15:53:04Z

MINIFI-368 exclude hidden files when scanning for src files

This closes #125.

Signed-off-by: Aldrin Piri 

commit 7492146bdf3954268af9f3efac89e38bfd95dfd4
Author: Bin Qiu 
Date:   2017-09-20T00:15:10Z

Merge remote-tracking branch 'upstream/master'

commit 453307735b4853ead0f1216579861fe1056ef6a0
Author: Bin Qiu 
Date:   2017-10-05T04:43:08Z

Merge remote-tracking branch 'apache/master'

commit ed7989d39c7e460ddae661ef905f237cae8e9e8f
Author: Bin Qiu 
Date:   2017-10-09T15:57:15Z

MINIFICPP-72: Add Tar and Zip Support for MergeContent




---