[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/133#discussion_r138110601 --- Diff: libminifi/include/processors/BinFiles.h --- @@ -0,0 +1,295 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! +* Create a new Bin +*/ + explicit Bin(uint64_t minSize, uint64_t maxSize, int minEntries, int maxEntries, std::string fileCount, std::string groupId) + : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), +groupId_(groupId), logger_(logging::LoggerFactory::getLogger()) { +queued_data_size_ = 0; +creation_dated_ = getTimeMillis(); +std::shared_ptr id_generator = utils::IdGenerator::getIdGenerator(); +char uuidStr[37] = { 0 }; +id_generator->generate(uuid_); +uuid_unparse_lower(uuid_, uuidStr); +uuid_str_ = uuidStr; +logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_); + } + virtual ~Bin() { +logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_); + } + // check whether the bin is full + bool isFull() { +if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_) + return true; +else + return false; + } + // check whether the bin meet the min required size and entries + bool isFullEnough() { +return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); + } + // check whether the bin is older than the time specified in msec + bool isOlderThan(uint64_t duration) { +uint64_t currentTime = getTimeMillis(); +if (currentTime > (creation_dated_ + duration)) + return true; +else + return false; + } + std::deque<std::shared_ptr> & getFlowFile() { +return queue_; + } + // offer the flowfile to the bin + bool offer(std::shared_ptr flow) { +if (!fileCount_.empty()) { + std::string value; + if (flow->getAttribute(fileCount_, value)) { +try { + // for defrag case using the identification + int count = std::stoi(value); + maxEntries_ = count; + minEntries_ = count; +} catch (...) { + +} + } +} + +if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) + return false; + +queue_.push_back(flow); +queued_data_size_ += flow->getSize(); +logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", +uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); + +return true; + } + // getBinAge + uint64_t getBinAge() { +return creation_dated_; + } + int getSize() { +return queue_.size(); + } + // Get the UUID as string + std::string getUUIDStr() { +return uuid_str_; + } + std::string getGroupId() { +return groupId_; + } + + protected: + + private: + uint64_t minSize_; + uint64_t maxSize_; + int maxEntries_; + int minE
[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/133#discussion_r138110287 --- Diff: libminifi/include/processors/BinFiles.h --- @@ -0,0 +1,295 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! +* Create a new Bin +*/ + explicit Bin(uint64_t minSize, uint64_t maxSize, int minEntries, int maxEntries, std::string fileCount, std::string groupId) + : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), +groupId_(groupId), logger_(logging::LoggerFactory::getLogger()) { +queued_data_size_ = 0; +creation_dated_ = getTimeMillis(); +std::shared_ptr id_generator = utils::IdGenerator::getIdGenerator(); +char uuidStr[37] = { 0 }; +id_generator->generate(uuid_); +uuid_unparse_lower(uuid_, uuidStr); +uuid_str_ = uuidStr; +logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_); + } + virtual ~Bin() { +logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_); + } + // check whether the bin is full + bool isFull() { +if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_) + return true; +else + return false; + } + // check whether the bin meet the min required size and entries + bool isFullEnough() { +return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); + } + // check whether the bin is older than the time specified in msec + bool isOlderThan(uint64_t duration) { +uint64_t currentTime = getTimeMillis(); +if (currentTime > (creation_dated_ + duration)) + return true; +else + return false; + } + std::deque<std::shared_ptr> & getFlowFile() { +return queue_; + } + // offer the flowfile to the bin + bool offer(std::shared_ptr flow) { +if (!fileCount_.empty()) { + std::string value; + if (flow->getAttribute(fileCount_, value)) { +try { + // for defrag case using the identification + int count = std::stoi(value); + maxEntries_ = count; + minEntries_ = count; +} catch (...) { + +} + } +} + +if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) + return false; + +queue_.push_back(flow); +queued_data_size_ += flow->getSize(); +logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", +uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); + +return true; + } + // getBinAge + uint64_t getBinAge() { +return creation_dated_; + } + int getSize() { +return queue_.size(); + } + // Get the UUID as string + std::string getUUIDStr() { +return uuid_str_; + } + std::string getGroupId() { +return groupId_; + } + + protected: + + private: + uint64_t minSize_; + uint64_t maxSize_; + int maxEntries_; + int minE
[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/133#discussion_r138110422 --- Diff: libminifi/include/processors/BinFiles.h --- @@ -0,0 +1,295 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! +* Create a new Bin +*/ + explicit Bin(uint64_t minSize, uint64_t maxSize, int minEntries, int maxEntries, std::string fileCount, std::string groupId) + : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), +groupId_(groupId), logger_(logging::LoggerFactory::getLogger()) { +queued_data_size_ = 0; +creation_dated_ = getTimeMillis(); +std::shared_ptr id_generator = utils::IdGenerator::getIdGenerator(); +char uuidStr[37] = { 0 }; +id_generator->generate(uuid_); +uuid_unparse_lower(uuid_, uuidStr); +uuid_str_ = uuidStr; +logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_); + } + virtual ~Bin() { +logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_); + } + // check whether the bin is full + bool isFull() { +if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_) + return true; +else + return false; + } + // check whether the bin meet the min required size and entries + bool isFullEnough() { +return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); + } + // check whether the bin is older than the time specified in msec + bool isOlderThan(uint64_t duration) { +uint64_t currentTime = getTimeMillis(); +if (currentTime > (creation_dated_ + duration)) + return true; +else + return false; + } + std::deque<std::shared_ptr> & getFlowFile() { +return queue_; + } + // offer the flowfile to the bin + bool offer(std::shared_ptr flow) { +if (!fileCount_.empty()) { + std::string value; + if (flow->getAttribute(fileCount_, value)) { +try { + // for defrag case using the identification + int count = std::stoi(value); + maxEntries_ = count; + minEntries_ = count; +} catch (...) { + +} + } +} + +if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) + return false; + +queue_.push_back(flow); +queued_data_size_ += flow->getSize(); +logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", +uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); + +return true; + } + // getBinAge + uint64_t getBinAge() { +return creation_dated_; + } + int getSize() { +return queue_.size(); + } + // Get the UUID as string + std::string getUUIDStr() { +return uuid_str_; + } + std::string getGroupId() { +return groupId_; + } + + protected: + + private: + uint64_t minSize_; + uint64_t maxSize_; + int maxEntries_; + int minE
[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/133#discussion_r138110384 --- Diff: libminifi/include/processors/BinFiles.h --- @@ -0,0 +1,295 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! +* Create a new Bin +*/ + explicit Bin(uint64_t minSize, uint64_t maxSize, int minEntries, int maxEntries, std::string fileCount, std::string groupId) + : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), +groupId_(groupId), logger_(logging::LoggerFactory::getLogger()) { +queued_data_size_ = 0; +creation_dated_ = getTimeMillis(); +std::shared_ptr id_generator = utils::IdGenerator::getIdGenerator(); +char uuidStr[37] = { 0 }; +id_generator->generate(uuid_); +uuid_unparse_lower(uuid_, uuidStr); +uuid_str_ = uuidStr; +logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_); + } + virtual ~Bin() { +logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_); + } + // check whether the bin is full + bool isFull() { +if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_) + return true; +else + return false; + } + // check whether the bin meet the min required size and entries + bool isFullEnough() { +return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); + } + // check whether the bin is older than the time specified in msec + bool isOlderThan(uint64_t duration) { +uint64_t currentTime = getTimeMillis(); +if (currentTime > (creation_dated_ + duration)) + return true; +else + return false; + } + std::deque<std::shared_ptr> & getFlowFile() { +return queue_; + } + // offer the flowfile to the bin + bool offer(std::shared_ptr flow) { +if (!fileCount_.empty()) { + std::string value; + if (flow->getAttribute(fileCount_, value)) { +try { + // for defrag case using the identification + int count = std::stoi(value); + maxEntries_ = count; + minEntries_ = count; +} catch (...) { + +} + } +} + +if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) + return false; + +queue_.push_back(flow); +queued_data_size_ += flow->getSize(); +logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", +uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); + +return true; + } + // getBinAge + uint64_t getBinAge() { +return creation_dated_; + } + int getSize() { +return queue_.size(); + } + // Get the UUID as string + std::string getUUIDStr() { +return uuid_str_; + } + std::string getGroupId() { +return groupId_; + } + + protected: + + private: + uint64_t minSize_; + uint64_t maxSize_; + int maxEntries_; + int minE
[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/133#discussion_r138109572 --- Diff: libminifi/include/processors/BinFiles.h --- @@ -0,0 +1,295 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! +* Create a new Bin +*/ + explicit Bin(uint64_t minSize, uint64_t maxSize, int minEntries, int maxEntries, std::string fileCount, std::string groupId) + : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), +groupId_(groupId), logger_(logging::LoggerFactory::getLogger()) { +queued_data_size_ = 0; +creation_dated_ = getTimeMillis(); +std::shared_ptr id_generator = utils::IdGenerator::getIdGenerator(); +char uuidStr[37] = { 0 }; +id_generator->generate(uuid_); +uuid_unparse_lower(uuid_, uuidStr); +uuid_str_ = uuidStr; +logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_); + } + virtual ~Bin() { +logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_); + } + // check whether the bin is full + bool isFull() { +if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_) + return true; +else + return false; + } + // check whether the bin meet the min required size and entries + bool isFullEnough() { +return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); + } + // check whether the bin is older than the time specified in msec + bool isOlderThan(uint64_t duration) { --- End diff -- sure. ---
[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/133#discussion_r138109269 --- Diff: libminifi/include/processors/BinFiles.h --- @@ -0,0 +1,295 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! +* Create a new Bin +*/ + explicit Bin(uint64_t minSize, uint64_t maxSize, int minEntries, int maxEntries, std::string fileCount, std::string groupId) + : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), +groupId_(groupId), logger_(logging::LoggerFactory::getLogger()) { +queued_data_size_ = 0; +creation_dated_ = getTimeMillis(); +std::shared_ptr id_generator = utils::IdGenerator::getIdGenerator(); +char uuidStr[37] = { 0 }; +id_generator->generate(uuid_); +uuid_unparse_lower(uuid_, uuidStr); +uuid_str_ = uuidStr; +logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_); + } + virtual ~Bin() { +logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_); + } + // check whether the bin is full + bool isFull() { +if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_) + return true; +else + return false; + } + // check whether the bin meet the min required size and entries + bool isFullEnough() { --- End diff -- i minic with java did, it indicated whether the bin's full enough (meet the min requirement) for processing. ---
[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/133#discussion_r138108806 --- Diff: libminifi/include/processors/BinFiles.h --- @@ -0,0 +1,295 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! +* Create a new Bin --- End diff -- will do ---
[GitHub] nifi-minifi-cpp pull request #134: MINIFI-339: Add C2 base allowing for 1 pr...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/134#discussion_r137859968 --- Diff: libminifi/include/c2/C2Payload.h --- @@ -0,0 +1,187 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ +#define LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ + +#include +#include +#include +#include "core/state/UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +enum Operation { + ACKNOWLEDGE, + START, + STOP, + RESTART, + DESCRIBE, + HEARTBEAT, + UPDATE, + VALIDATE, + CLEAR +}; + +enum Direction { + TRANSMIT, + RECEIVE +}; + +class C2ContentResponse { + public: + C2ContentResponse(Operation op); + + C2ContentResponse(const C2ContentResponse ); + + C2ContentResponse(const C2ContentResponse &); + + C2ContentResponse & operator=(const C2ContentResponse &); + + C2ContentResponse & operator=(const C2ContentResponse ); + + Operation op; + // determines if the operation is required + bool required; + // identifier + std::string ident; + // delay before running + uint32_t delay; + // max time before this response will no longer be honored. + uint64_t ttl; + // name applied to commands + std::string name; + // commands that correspond with the operation. + std::map<std::string, std::string> operation_arguments; +// std::vector content; +}; + +/** + * C2Payload is an update for the state manager. + * Note that the payload can either consist of other payloads or + * have content directly within it, represented by C2ContentResponse objects, above. + * + * Payloads can also contain raw data, which can be binary data. + */ +class C2Payload : public state::Update { + public: + virtual ~C2Payload() { + + } + + C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false); + + C2Payload(Operation op, bool resp = false, bool isRaw = false); + + C2Payload(Operation op, state::UpdateState state, bool resp = false, bool isRaw = false); + + C2Payload(const C2Payload ); + + C2Payload(const C2Payload &); + + void setIdentifier(const std::string ); + + std::string getIdentifier() const; + + void setLabel(const std::string label) { +label_ = label; + } + + std::string getLabel() const { +return label_; + } + + /** + * Gets the operation for this payload. May be nested or a single operation. + */ + Operation getOperation() const; + + /** + * Validate the payload, if necessary and/or possible. + */ + virtual bool validate(); + + /** + * Get content responses from this payload. + */ + const std::vector () const; + + /** + * Add a content response to this payload. + */ + void addContent(const C2ContentResponse &); + + /** + * Determines if this object contains raw data. + */ + bool isRaw() const; + + /** + * Sets raw data within this object. + */ + void setRawData(const std::string ); + + /** + * Returns raw data. + */ + std::string getRawData() const; + + /** + * Add a nested payload. + * @param payload payload to move into this object. + */ + void addPayload(const C2Payload &); + /** + * Get nested payloads. + */ + const std::vector () const; + + C2Payload =(const C2Payload &); + C2Payload =(const C2Payload ); + + protected: + + // identifier for this payload. + std::string ident_; + + std::string label_; + + std::vector payloads_; + + std:
[GitHub] nifi-minifi-cpp pull request #134: MINIFI-339: Add C2 base allowing for 1 pr...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/134#discussion_r137858437 --- Diff: libminifi/include/c2/C2Payload.h --- @@ -0,0 +1,187 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ +#define LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ + +#include +#include +#include +#include "core/state/UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +enum Operation { + ACKNOWLEDGE, + START, + STOP, + RESTART, + DESCRIBE, + HEARTBEAT, + UPDATE, + VALIDATE, + CLEAR +}; + +enum Direction { + TRANSMIT, + RECEIVE +}; + +class C2ContentResponse { + public: + C2ContentResponse(Operation op); + + C2ContentResponse(const C2ContentResponse ); + + C2ContentResponse(const C2ContentResponse &); + + C2ContentResponse & operator=(const C2ContentResponse &); + + C2ContentResponse & operator=(const C2ContentResponse ); + + Operation op; + // determines if the operation is required + bool required; + // identifier + std::string ident; + // delay before running + uint32_t delay; + // max time before this response will no longer be honored. + uint64_t ttl; + // name applied to commands + std::string name; + // commands that correspond with the operation. + std::map<std::string, std::string> operation_arguments; +// std::vector content; +}; + +/** + * C2Payload is an update for the state manager. + * Note that the payload can either consist of other payloads or + * have content directly within it, represented by C2ContentResponse objects, above. + * + * Payloads can also contain raw data, which can be binary data. + */ +class C2Payload : public state::Update { + public: + virtual ~C2Payload() { + + } + + C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false); + + C2Payload(Operation op, bool resp = false, bool isRaw = false); + + C2Payload(Operation op, state::UpdateState state, bool resp = false, bool isRaw = false); + + C2Payload(const C2Payload ); + + C2Payload(const C2Payload &); + + void setIdentifier(const std::string ); + + std::string getIdentifier() const; + + void setLabel(const std::string label) { +label_ = label; + } + + std::string getLabel() const { +return label_; + } + + /** + * Gets the operation for this payload. May be nested or a single operation. + */ + Operation getOperation() const; + + /** + * Validate the payload, if necessary and/or possible. + */ + virtual bool validate(); + + /** + * Get content responses from this payload. + */ + const std::vector () const; + + /** + * Add a content response to this payload. + */ + void addContent(const C2ContentResponse &); + + /** + * Determines if this object contains raw data. + */ + bool isRaw() const; + + /** + * Sets raw data within this object. + */ + void setRawData(const std::string ); + + /** + * Returns raw data. + */ + std::string getRawData() const; + + /** + * Add a nested payload. + * @param payload payload to move into this object. + */ + void addPayload(const C2Payload &); + /** + * Get nested payloads. + */ + const std::vector () const; + + C2Payload =(const C2Payload &); + C2Payload =(const C2Payload ); + + protected: + + // identifier for this payload. + std::string ident_; + + std::string label_; + + std::vector payloads_; + + std:
[GitHub] nifi-minifi-cpp pull request #134: MINIFI-339: Add C2 base allowing for 1 pr...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/134#discussion_r137857666 --- Diff: libminifi/include/ResourceClaim.h --- @@ -55,23 +58,20 @@ class ResourceClaim : public std::enable_shared_from_this { ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager> claim_manager, bool deleted = false); // Destructor - virtual ~ResourceClaim() { + ~ResourceClaim() { --- End diff -- why remove the virtual? ---
[GitHub] nifi-minifi-cpp issue #133: Merge Content processor
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/133 Please review and provide feedback. ---
[GitHub] nifi-minifi-cpp pull request #133: Merge Content processor
GitHub user benqiu2016 opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/133 Merge Content processor Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/benqiu2016/nifi-minifi-cpp merge_content Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #133 commit f68df5cac543e799dba91447e86413ffe4292a40 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-08-24T17:04:32Z Merge Content processor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #123: MINIFI-363: Set format macro declaration to avoi...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/123 @phrocker the CI build is failing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/117 [config.txt](https://github.com/apache/nifi-minifi-cpp/files/1166193/config.txt) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/117 @phrocker the normal flow that i run is attached. one get file connected to a RPG --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/117 overall looks good. may be some optimization for the queue. Please some tests in long duration to make sure it is not breaking the master because it is big change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128773288 --- Diff: libminifi/include/utils/ThreadPool.h --- @@ -246,15 +349,67 @@ void ThreadPool::startWorkers() { template void ThreadPool::run_tasks() { auto waitperiod = std::chrono::milliseconds(1) * 100; + uint64_t wait_decay_ = 0; while (running_.load()) { +// if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible +// we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially +// wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should +// be more likely to run. This is intentional. +if (wait_decay_ > 1000) { + std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); +} Worker task; if (!worker_queue_.try_dequeue(task)) { + std::unique_lock lock(worker_queue_mutex_); tasks_available_.wait_for(lock, waitperiod); continue; } -task.run(); +else { + + std::unique_lock lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { +continue; + } +} + +bool wait_to_run = false; +if (task.getTimeSlice() > 1) { + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto ms = std::chrono::duration_cast(now); + if (task.getTimeSlice() > ms.count()) { +wait_to_run = true; + } +} +// if we have to wait we re-queue the worker. +if (wait_to_run) { + { +std::unique_lock lock(worker_queue_mutex_); +if (!task_status_[task.getIdentifier()]) { + continue; +} + } + worker_queue_.enqueue(std::move(task)); --- End diff -- OK. it is possible to sort the queue or somehow to make it such that the head of the queue is the first to expire. In this case, we can avoid enqueue/dequeue for all the items in the queues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128621371 --- Diff: libminifi/include/utils/ThreadPool.h --- @@ -246,15 +349,67 @@ void ThreadPool::startWorkers() { template void ThreadPool::run_tasks() { auto waitperiod = std::chrono::milliseconds(1) * 100; + uint64_t wait_decay_ = 0; while (running_.load()) { +// if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible +// we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially +// wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should +// be more likely to run. This is intentional. +if (wait_decay_ > 1000) { + std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); --- End diff -- we increase wait_decay if there is not task to run. so the wait_decay may become a very large number if we do not have task to run for a long time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128620813 --- Diff: libminifi/include/utils/ThreadPool.h --- @@ -246,15 +349,67 @@ void ThreadPool::startWorkers() { template void ThreadPool::run_tasks() { auto waitperiod = std::chrono::milliseconds(1) * 100; + uint64_t wait_decay_ = 0; while (running_.load()) { +// if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible +// we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially +// wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should +// be more likely to run. This is intentional. +if (wait_decay_ > 1000) { + std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); +} Worker task; if (!worker_queue_.try_dequeue(task)) { + std::unique_lock lock(worker_queue_mutex_); tasks_available_.wait_for(lock, waitperiod); continue; } -task.run(); +else { + + std::unique_lock lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { +continue; + } +} + +bool wait_to_run = false; +if (task.getTimeSlice() > 1) { + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto ms = std::chrono::duration_cast(now); + if (task.getTimeSlice() > ms.count()) { +wait_to_run = true; + } +} +// if we have to wait we re-queue the worker. +if (wait_to_run) { + { +std::unique_lock lock(worker_queue_mutex_); +if (!task_status_[task.getIdentifier()]) { + continue; +} + } + worker_queue_.enqueue(std::move(task)); --- End diff -- do we need to enqueue to head? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #110: MINIFI-249: Update prov repo to better abstract ...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/110 @phrocker looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #119: MINIFI-70: enhance site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/119 @phrocker it is passed now after i retrigger the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #119: MINIFI-70: enhance site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/119 @phrocker it is run OK in my MAC. run OK in Xcode 7.3 and fail in 8.3 in travis. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 closed the pull request at: https://github.com/apache/nifi-minifi-cpp/pull/114 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #114: site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/114 @phrocker used https://github.com/apache/nifi-minifi-cpp/pull/119 which has a single merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #119: MINIFI-70: enhance site2site port negotia...
GitHub user benqiu2016 opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/119 MINIFI-70: enhance site2site port negotiation Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/benqiu2016/nifi-minifi-cpp port_negotiation_merge Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/119.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #119 commit 7bf30a97907b9622addd11aaf8194f72cbbc47a4 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-07-18T17:24:35Z MINIFI-70: enhance site2site port negotiation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #114: site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/114 Thanks for the review. Can we merge the PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #114: site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/114 @phrocker @kevdoran use lock, extract token code, set default port. we need to support /nifi-api/controller which is common to 1.x and 0.x. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #114: site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/114 @phrocker if we use a stack based lock guard, in order to control the period of lock, i need to add scope for the lock guard inside {}. it make code less readable. Anyway. i can change my code to use that if you prefer the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r126240211 --- Diff: libminifi/src/RemoteProcessorGroupPort.cpp --- @@ -20,6 +20,9 @@ #include "../include/RemoteProcessorGroupPort.h" +#include +#include +#include --- End diff -- for now, i think we are tied couple curl with our code like others InvokeHttp, etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r126239506 --- Diff: libminifi/include/utils/HTTPUtils.h --- @@ -88,6 +90,40 @@ struct HTTPRequestResponse { }; +static void parse_url(std::string , std::string , int , std::string ) { --- End diff -- for now, we can document the same in the README.md to specify the URL format if it is OK with you. As for the access control, i looked at the doc, it looks like we need to use token, etc for a secure cluster. I will need to ask around. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r126211327 --- Diff: libminifi/src/RemoteProcessorGroupPort.cpp --- @@ -150,6 +194,87 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr return; } +void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { + if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty()) + return; + + std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller/"; + + this->site2site_port_ = -1; + CURL *http_session = curl_easy_init(); + + curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str()); + + utils::HTTPRequestResponse content; + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, + ::HTTPRequestResponse::recieve_write); + + curl_easy_setopt(http_session, CURLOPT_WRITEDATA, + static_cast<void*>()); + + CURLcode res = curl_easy_perform(http_session); + + if (res == CURLE_OK) { +std::string response_body(content.data.begin(), content.data.end()); +int64_t http_code = 0; +curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, _code); +char *content_type; +/* ask for the content-type */ +curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, _type); + +bool isSuccess = ((int32_t) (http_code / 100)) == 2 +&& res != CURLE_ABORTED_BY_CALLBACK; +bool body_empty = IsNullOrEmpty(content.data); + +if (isSuccess && !body_empty) { + std::string controller = std::move(response_body); + logger_->log_debug("controller config %s", controller.c_str()); + Json::Value value; + Json::Reader reader; + bool parsingSuccessful = reader.parse(controller, value); + if (parsingSuccessful && !value.empty()) { +Json::Value controllerValue = value["controller"]; +if (!controllerValue.empty()) { + Json::Value port = controllerValue["remoteSiteListeningPort"]; + if (!port.empty()) +this->site2site_port_ = port.asInt(); + Json::Value secure = controllerValue["siteToSiteSecure"]; + if (!secure.empty()) +this->site2site_secure_ = secure.asBool(); +} +logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_); + } +} else { + logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo"); +} + } else { +logger_->log_error( +"ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed %s\n", +curl_easy_strerror(res)); + } + curl_easy_cleanup(http_session); +} + +void RemoteProcessorGroupPort::refreshPeerList() { + refreshRemoteSite2SiteInfo(); + if (site2site_port_ == -1) +return; + + this->site2site_peer_status_list_.clear(); --- End diff -- refreshRemoteSite2SiteInfo can fail. if it is failed. it will set the this->site2site_port_ = -1; if that's the case, we do not need to clear the list. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r126210180 --- Diff: libminifi/src/RemoteProcessorGroupPort.cpp --- @@ -150,6 +194,87 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr return; } +void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { + if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty()) + return; + + std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller/"; --- End diff -- the /nifi-api/controller return it return the site2site port and we can use that to do the site2site negotiation with master to find the peer info { "revision": { "clientId": "d40fb824-070b-4547-9b1c-f50f5ba0a677" }, "controller": { "id": "fe4a3a42-53b6-4af1-a80d-6fdfe60de97f", "name": "NiFi Flow", "comments": "", "runningCount": 3, "stoppedCount": 2, "invalidCount": 0, "disabledCount": 0, "inputPortCount": 1, "outputPortCount": 1, "remoteSiteListeningPort": 10001, "siteToSiteSecure": false, "instanceId": "9d841c51-ab00-422e-811c-53c6dc2f8e59", "inputPorts": [ { "id": "471deef6-2a6e-4a7d-912a-81cc17e3a204", "name": " From Node A", "comments": "", "state": "RUNNING" } ], "outputPorts": [ { "id": "75f88005-0a87-4fef-8320-6219cdbcf18b", "name": "To A", "comments": "", "state": "STOPPED" } ] } } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r126209183 --- Diff: libminifi/include/Site2SiteClientProtocol.h --- @@ -387,6 +387,15 @@ class DataPacket { }; +/** + * Site2Site Peer + */ + typedef struct Site2SitePeerStatus { + std::string host_; + int port_; + bool isSecure_; + } Site2SitePeerStatus; --- End diff -- yes, there is a flowFileCount, i want to reduce the complexity of looking at flowFileCount and doing round robin. If we need to use flowFileCount, it means that we need refresh the peer status periodically and even that, we can only catch a snapshot of the flowFileCount based on the refreshing interval, it is not accurate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r126208386 --- Diff: libminifi/include/utils/HTTPUtils.h --- @@ -88,6 +90,40 @@ struct HTTPRequestResponse { }; +static void parse_url(std::string , std::string , int , std::string ) { --- End diff -- now it was designed for support ipv4 now. If we need a RFC compliant URL parser, we can certainly use a open source compliant URL parser. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #114: site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/114 @phrocker please review --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #114: site2site port negotiation
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/114 @phrocker please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123530192 --- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp --- @@ -314,10 +314,8 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor auto schedulingStrategyStr = node["scheduling strategy"].as(); checkRequiredField(, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY); auto schedulingPeriodStr = node["scheduling period"].as(); - checkRequiredField(, "host", CONFIG_YAML_PROVENANCE_REPORT_KEY); - auto hostStr = node["host"].as(); - checkRequiredField(, "port", CONFIG_YAML_PROVENANCE_REPORT_KEY); - auto portStr = node["port"].as(); + checkRequiredField(, "url", CONFIG_YAML_PROVENANCE_REPORT_KEY); --- End diff -- i want to avoid confusion so that we have a consistent way to config the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123529970 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -344,7 +427,8 @@ int Site2SiteClientProtocol::readRequestType(RequestType ) { return -1; } -int Site2SiteClientProtocol::readRespond(RespondCode , std::string ) { +int Site2SiteClientProtocol::readRespond(RespondCode , --- End diff -- i will revert that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123529541 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -319,6 +344,64 @@ void Site2SiteClientProtocol::tearDown() { _peerState = IDLE; } +bool Site2SiteClientProtocol::getPeerList(std::vector ) { + if (establish() && handShake()) { +int status = this->writeRequestType(REQUEST_PEER_LIST); + +if (status <= 0) { + tearDown(); + return false; +} + +uint32_t number; +status = peer_->read(number); + +if (status <= 0) { + tearDown(); + return false; +} + +for (int i = 0; i < number; i++) { --- End diff -- it is config by the NiFI cluster server to let client evenly distribute the node cross peers. MiNiFI java is doing the similar thing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123529125 --- Diff: libminifi/src/RemoteProcessorGroupPort.cpp --- @@ -150,6 +147,87 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr return; } +void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { + if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty()) + return; + + std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller/"; + + this->site2site_port_ = -1; + CURL *http_session = curl_easy_init(); + + curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str()); + + utils::HTTPRequestResponse content; + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, + ::HTTPRequestResponse::recieve_write); + + curl_easy_setopt(http_session, CURLOPT_WRITEDATA, + static_cast<void*>()); + + CURLcode res = curl_easy_perform(http_session); + + if (res == CURLE_OK) { +std::string response_body(content.data.begin(), content.data.end()); +int64_t http_code = 0; +curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, _code); +char *content_type; +/* ask for the content-type */ +curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, _type); + +bool isSuccess = ((int32_t) (http_code / 100)) == 2 +&& res != CURLE_ABORTED_BY_CALLBACK; +bool body_empty = IsNullOrEmpty(content.data); + +if (isSuccess && !body_empty) { + std::string controller = std::move(response_body); + logger_->log_debug("controller config %s", controller.c_str()); + Json::Value value; + Json::Reader reader; + bool parsingSuccessful = reader.parse(controller, value); + if (parsingSuccessful && !value.empty()) { +Json::Value controllerValue = value["controller"]; +if (!controllerValue.empty()) { + Json::Value port = controllerValue["remoteSiteListeningPort"]; + if (!port.empty()) +this->site2site_port_ = port.asInt(); + Json::Value secure = controllerValue["siteToSiteSecure"]; + if (!secure.empty()) +this->site2site_secure_ = secure.asBool(); +} +logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_); + } +} else { + logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo"); +} + } else { +logger_->log_error( +"ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed %s\n", +curl_easy_strerror(res)); + } + curl_easy_cleanup(http_session); +} + +void RemoteProcessorGroupPort::refreshPeerList() { + refreshRemoteSite2SiteInfo(); + if (site2site_port_ == -1) +return; + + this->site2site_peer_status_list_.clear(); + + std::unique_ptr < Site2SiteClientProtocol> protocol; + protocol = std::unique_ptr < Site2SiteClientProtocol + > (new Site2SiteClientProtocol(nullptr)); --- End diff -- refresh is done once, i would like to avoid hanging a protocol around. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123528501 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -278,26 +301,28 @@ bool Site2SiteClientProtocol::handShake() { } switch (code) { -case PROPERTIES_OK: - logger_->log_info("Site2Site HandShake Completed"); - _peerState = HANDSHAKED; - return true; -case PORT_NOT_IN_VALID_STATE: -case UNKNOWN_PORT: -case PORTS_DESTINATION_FULL: - logger_->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; -default: - logger_->log_info("HandShake Failed because of unknown respond code %d", code); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; + case PROPERTIES_OK: +logger_->log_info("Site2Site HandShake Completed"); +_peerState = HANDSHAKED; +return true; + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: +logger_->log_error( +"Site2Site HandShake Failed because destination port is either invalid or full"); +ret = -1; +/* --- End diff -- will remove that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123528403 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -38,7 +39,8 @@ namespace minifi { bool Site2SiteClientProtocol::establish() { if (_peerState != IDLE) { -logger_->log_error("Site2Site peer state is not idle while try to establish"); +logger_->log_error( --- End diff -- OK, i will change the format, i want to reduce the line length to make it easy to read --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123527545 --- Diff: libminifi/src/RemoteProcessorGroupPort.cpp --- @@ -76,27 +89,20 @@ void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr properties; - properties.insert(hostName); - properties.insert(port); properties.insert(portUUID); setSupportedProperties(properties); // Set the supported relationships std::set relationships; relationships.insert(relation); setSupportedRelationships(relationships); + curl_global_init(CURL_GLOBAL_DEFAULT); --- End diff -- OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123527427 --- Diff: libminifi/src/RemoteProcessorGroupPort.cpp --- @@ -54,19 +57,29 @@ bool create = true) { std::unique_ptr nextProtocol = nullptr; if (!available_protocols_.try_dequeue(nextProtocol)) { if (create) { + refreshPeerList(); // create - nextProtocol = std::unique_ptr(new Site2SiteClientProtocol(nullptr)); - nextProtocol->setPortId(protocol_uuid_); - std::unique_ptr str = std::unique_ptr(stream_factory_->createSocket(host_, port_)); - std::unique_ptr peer_ = std::unique_ptr(new Site2SitePeer(std::move(str), host_, port_)); - nextProtocol->setPeer(std::move(peer_)); + for (auto peer : site2site_peer_status_list_) { +std::unique_ptr protocol = nullptr; +protocol = std::unique_ptr < Site2SiteClientProtocol +> (new Site2SiteClientProtocol(nullptr)); +protocol->setPortId(protocol_uuid_); +std::unique_ptr str = +std::unique_ptr < org::apache::nifi::minifi::io::DataStream +> (stream_factory_->createSocket(peer.host_, peer.port_)); +std::unique_ptr peer_ = std::unique_ptr < Site2SitePeer +> (new Site2SitePeer(std::move(str), peer.host_, peer.port_)); +protocol->setPeer(std::move(peer_)); +returnProtocol(std::move(protocol)); + } } +available_protocols_.try_dequeue(nextProtocol); } return std::move(nextProtocol); } void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr return_protocol) { - if (available_protocols_.size_approx() >= max_concurrent_tasks_) { + if (available_protocols_.size_approx() >= (max_concurrent_tasks_ * site2site_peer_status_list_.size())) { --- End diff -- i need to round robin the peer even for a single thread to distribute the load cross all peers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/114#discussion_r123526843 --- Diff: libminifi/src/RemoteProcessorGroupPort.cpp --- @@ -54,19 +57,29 @@ bool create = true) { std::unique_ptr nextProtocol = nullptr; if (!available_protocols_.try_dequeue(nextProtocol)) { if (create) { + refreshPeerList(); // create - nextProtocol = std::unique_ptr(new Site2SiteClientProtocol(nullptr)); - nextProtocol->setPortId(protocol_uuid_); - std::unique_ptr str = std::unique_ptr(stream_factory_->createSocket(host_, port_)); - std::unique_ptr peer_ = std::unique_ptr(new Site2SitePeer(std::move(str), host_, port_)); - nextProtocol->setPeer(std::move(peer_)); + for (auto peer : site2site_peer_status_list_) { --- End diff -- i want to prepopulate to reduce the time to establish the socket/negotiation. Also we need to round robin them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #114: site2site port negotiation
GitHub user benqiu2016 opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/114 site2site port negotiation Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/benqiu2016/nifi-minifi-cpp port_negotiation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/114.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #114 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #110: MINIFI-249: Update prov repo to better abstract ...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/110 it is a very large commit. Overall looks good . Couple of comments over all 1) for the volatile repo, have we run the some test over night and monitor the memory to see whether we have memory leak. 2) have we consider use RAM FS to store the content instead we write the volatile content repo. In this case, provenance and flowfile repo can be RAM FS also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #110: MINIFI-249: Update prov repo to better ab...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/110#discussion_r123119696 --- Diff: libminifi/include/core/repository/AtomicRepoEntries.h --- @@ -0,0 +1,501 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ref_count_hip. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static uint16_t accounting_size = sizeof(std::vector) + sizeof(std::string) + sizeof(size_t); + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +/** + * Purpose: Repo value represents an item that will support a move operation within an AtomicEntry + * + * Justification: Since AtomicEntry is a static entry that does not move or change, the underlying + * RepoValue can be changed to support atomic operations. + */ +template +class RepoValue { + public: + + explicit RepoValue() { + } + + /** + * Constructor that populates the item allowing for a custom key comparator. + * @param key key for this repo value. + * @param ptr buffer + * @param size size buffer + * @param comparator custom comparator. + */ + explicit RepoValue(T key, const uint8_t *ptr, size_t size, std::function<bool(T, T)> comparator = nullptr) + : key_(key), +comparator_(comparator) { +if (nullptr == ptr) { + size = 0; +} +buffer_.resize(size); +if (size > 0) { + std::memcpy(buffer_.data(), ptr, size); +} + } + + /** + * RepoValue that moves the other object into this. + */ + explicit RepoValue(RepoValue &) +noexcept : key_(std::move(other.key_)), + buffer_(std::move(other.buffer_)), + comparator_(std::move(other.comparator_)) { + } + + ~RepoValue() + { + } + + T () { +return key_; + } + + /** + * Sets the key, relacing the custom comparator if needed. + */ + void setKey(const T key, std::function<bool(T,T)> comparator = nullptr) { +key_ = key; +comparator_ = comparator; + } + + /** + * Determines if the key is the same using the custom comparator + * @param other object to compare against + * @return result of the comparison + */ + inline bool isEqual(RepoValue *other) + { +return comparator_ == nullptr ? key_ == other->key_ : comparator_(key_,other->key_); + } + + /** + * Determines if the key is the same using the custom comparator + * @param other object to compare against + * @return result of the comparison + */ + inline bool isKey(T other) + { +return comparator_ == nullptr ? key_ == other : comparator_(key_,other); + } + + /** + * Clears the buffer. + */ + void clearBuffer() { +buffer_.resize(0); +buffer_.clear(); + } + + /** + * Return the size of the memory within the key + * buffer, the size of timestamp, and the general + * system word size + */ + uint64_t size() { +return buffer_.size(); + } + + size_t getBufferSize() { +return buffer_.size(); + } + + const uint8_t *getBuffer() + { +return buffer_.data(); + } + + /** + * Places the contents of buffer into str + * @param strnig into which we are placing the memory contained in buffer. +
[GitHub] nifi-minifi-cpp pull request #110: MINIFI-249: Update prov repo to better ab...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/110#discussion_r123118428 --- Diff: libminifi/include/core/repository/VolatileRepository.h --- @@ -331,22 +132,240 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr else return false; } - /** - * Purges the volatile repository. - */ - void purge(); - private: std::map<std::string, std::shared_ptr> connectionMap; - - std::atomic current_size_; + // current size of the volatile repo. + std::atomic current_size_; + // current index. std::atomic current_index_; - std::vector<AtomicEntry*> value_vector_; + // value vector. --- End diff -- i thought you were using a map to store the key and resource claim, why we need the value_vector_ here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #110: MINIFI-249: Update prov repo to better ab...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/110#discussion_r123110689 --- Diff: libminifi/include/core/FlowFile.h --- @@ -224,6 +224,10 @@ class FlowFile { void setStoredToRepository(bool storedInRepository) { stored = storedInRepository; +if (!stored && nullptr != claim_) +{ + claim_->decreaseFlowFileRecordOwnedCount(); --- End diff -- why we need to decreaseFlowFoleRecordOwnedCount here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: MINIFI-262: Configuration listener
Github user benqiu2016 closed the pull request at: https://github.com/apache/nifi-minifi-cpp/pull/107 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #107: MINIFI-262: Configuration listener
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/107 @phrocker https://github.com/apache/nifi-minifi-cpp/pull/112 for a single commit close this one for now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #112: MINIFI-262: Configuration Listener
GitHub user benqiu2016 opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/112 MINIFI-262: Configuration Listener Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/benqiu2016/nifi-minifi-cpp configuration_listener_merge Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/112.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #112 commit 1924ce57bf42dc4c88b924aa05f14524a67aa4c4 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-06-07T16:26:11Z MINIFI-262: Configuration Listener --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #107: MINIFI-262: Configuration listener
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/107 @phrocker add test case, use condition variable, please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: MINIFI-262: Configuration listener
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/107#discussion_r120533380 --- Diff: libminifi/src/FlowController.cpp --- @@ -163,6 +174,31 @@ FlowController::~FlowController() { provenance_repo_ = nullptr; } +bool FlowController::applyConfiguration(std::string ) { --- End diff -- add test case with a civet web server to host the config --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #107: MINIFI-262: Configuration listener
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/107 @phrocker please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: Configuration listener
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/107#discussion_r120017101 --- Diff: libminifi/src/FlowController.cpp --- @@ -175,7 +211,7 @@ void FlowController::stop(bool force) { this->flow_file_repo_->stop(); this->provenance_repo_->stop(); // Wait for sometime for thread stop -std::this_thread::sleep_for(std::chrono::milliseconds(1000)); +std::this_thread::sleep_for(std::chrono::milliseconds(3000)); --- End diff -- it was the max of all thread sleep interval. for example, the repo sleep is 2 second --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: Configuration listener
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/107#discussion_r120017104 --- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp --- @@ -32,16 +32,25 @@ namespace core { core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml( YAML::Node rootFlowNode) { uuid_t uuid; + int64_t version = 0; checkRequiredField(, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); std::string flowName = rootFlowNode["name"].as(); std::string id = getOrGenerateId(); uuid_parse(id.c_str(), uuid); - logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, - flowName); + if (rootFlowNode["version"]) { +std::string value = rootFlowNode["version"].as(); +if (core::Property::StringToInt(value, version)) { + logger_->log_debug("parseRootProcessorGroup: version => [%d]", version); +} + } + + logger_->log_debug( --- End diff -- done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: Configuration listener
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/107#discussion_r120017069 --- Diff: libminifi/src/FlowController.cpp --- @@ -163,6 +174,31 @@ FlowController::~FlowController() { provenance_repo_ = nullptr; } +bool FlowController::applyConfiguration(std::string ) { --- End diff -- the FlowConfiguration does not have Flow Controller state, during reApply the config, somehow the FlowController need to expose a function so that listener can call to reapply the config. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: Configuration listener
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/107#discussion_r120017020 --- Diff: libminifi/src/ConfigurationListener.cpp --- @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ConfigurationListener.h" +#include "FlowController.h" +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +void ConfigurationListener::start() { + if (running_) +return; + + pull_interval_ = 60 * 1000; + std::string value; + // grab the value for configuration + if (configure_->get(Configure::nifi_configuration_listener_pull_interval, + value)) { +core::TimeUnit unit; +if (core::Property::StringToTime(value, pull_interval_, unit) +&& core::Property::ConvertTimeUnitToMS(pull_interval_, unit, +pull_interval_)) { + logger_->log_info("Configuration Listener pull interval: [%d] ms", + pull_interval_); +} + } + + std::string clientAuthStr; + if (configure_->get(Configure::nifi_configuration_listener_need_ClientAuth, clientAuthStr)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_); + } + + if (configure_->get( + Configure::nifi_configuration_listener_client_ca_certificate, + this->ca_certificate_)) { +logger_->log_info("Configuration Listener CA certificates: [%s]", +this->ca_certificate_.c_str()); + } + + if (this->need_client_certificate_) { +std::string passphrase_file; + +if (!(configure_->get( +Configure::nifi_configuration_listener_client_certificate, this->certificate_) +&& configure_->get(Configure::nifi_configuration_listener_private_key, +this->private_key_))) { + logger_->log_error( + "Certificate and Private Key PEM file not configured for configuration listener, error: %s.", + std::strerror(errno)); +} + +if (configure_->get( +Configure::nifi_configuration_listener_client_pass_phrase, +passphrase_file)) { + // load the passphase from file + std::ifstream file(passphrase_file.c_str(), std::ifstream::in); + if (file.good()) { +this->passphrase_.assign((std::istreambuf_iterator(file)), +std::istreambuf_iterator()); +file.close(); + } +} + +logger_->log_info("Configuration Listener certificate: [%s], private key: [%s], passphrase file: [%s]", +this->certificate_.c_str(), this->private_key_.c_str(), passphrase_file.c_str()); + } + + thread_ = std::thread(::threadExecutor, this); + thread_.detach(); + running_ = true; + logger_->log_info("%s ConfigurationListener Thread Start", type_.c_str()); +} + +void ConfigurationListener::stop() { + if (!running_) +return; + running_ = false; + if (thread_.joinable()) +thread_.join(); + logger_->log_info("%s ConfigurationListener Thread Stop", type_.c_str()); +} + +void ConfigurationListener::run() { + int64_t interval = 0; + while (running_) { +std::this_thread::sleep_for(std::chrono::milliseconds(100)); --- End diff -- it is used to check whether the running is still set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: Configuration listener
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/107#discussion_r120017002 --- Diff: libminifi/include/core/repository/FlowFileRepository.h --- @@ -36,7 +36,7 @@ namespace repository { #define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" #define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M #define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (60) // 10 minute -#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec +#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec --- End diff -- i change some timeout value to more sync with the stop flowcontroller wait time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: Configuration listener
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/107#discussion_r120017010 --- Diff: libminifi/src/ConfigurationListener.cpp --- @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ConfigurationListener.h" +#include "FlowController.h" +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +void ConfigurationListener::start() { + if (running_) +return; + + pull_interval_ = 60 * 1000; + std::string value; + // grab the value for configuration + if (configure_->get(Configure::nifi_configuration_listener_pull_interval, + value)) { +core::TimeUnit unit; +if (core::Property::StringToTime(value, pull_interval_, unit) +&& core::Property::ConvertTimeUnitToMS(pull_interval_, unit, +pull_interval_)) { + logger_->log_info("Configuration Listener pull interval: [%d] ms", + pull_interval_); +} + } + + std::string clientAuthStr; + if (configure_->get(Configure::nifi_configuration_listener_need_ClientAuth, clientAuthStr)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_); + } + + if (configure_->get( + Configure::nifi_configuration_listener_client_ca_certificate, + this->ca_certificate_)) { +logger_->log_info("Configuration Listener CA certificates: [%s]", --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #107: Configuration listener
GitHub user benqiu2016 opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/107 Configuration listener Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/benqiu2016/nifi-minifi-cpp configuration_listener Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/107.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #107 commit 9a4cc44671d1f492646f507c4ceb9fd63910dba8 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-05-15T15:59:58Z Configuration Listener commit 0b221b9e63b54f4d5e46ad677dd8f09b374b5220 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-05-20T20:44:40Z More configuration listener commit ad7a6e70f07afa60567eb9293ae22c3fc5e4411c Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-05-20T21:50:01Z More configuration Listener commit 1b8ffbd906f6edda9c104b7358c7af61df679b4a Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-05-21T15:57:47Z more listener commit 11bdd1e5d689935d78e56338cefa3e9398003831 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-05-26T05:37:49Z More client auth for config listener commit e768a4c0d19aceb25784fec24ebc844fb77f449b Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-05-26T05:41:13Z More client auth for config listener --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #93: MINIFI-301 Removing extraneous third party resour...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/93 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #89: MINIFI-293 Remove thirdparty dependencies no long...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/89 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #90: MINIFI-294 Required vs. optional fields in...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/90#discussion_r114691577 --- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp --- @@ -417,29 +405,27 @@ void YamlConfiguration::parseConnectionYaml( // Configure connection source -auto rawRelationship = connectionNode["source relationship name"] -.as(); +checkRequiredField(, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY); +auto rawRelationship = connectionNode["source relationship name"].as(); core::Relationship relationship(rawRelationship, ""); -logger_->log_debug( -"parseConnection: relationship => [%s]", rawRelationship); +logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship); if (connection) { connection->setRelationship(relationship); } uuid_t srcUUID; -std::string connectionSrcProcName = connectionNode["source name"] -.as(); + if (connectionNode["source id"]) { - std::string connectionSrcProcId = connectionNode["source id"] - .as(); + std::string connectionSrcProcId = connectionNode["source id"].as(); uuid_parse(connectionSrcProcId.c_str(), srcUUID); } else { // if we don't have a source id, try harder to resolve the source processor. // config schema v2 will make this unnecessary + checkRequiredField(, "source name", CONFIG_YAML_CONNECTIONS_KEY); --- End diff -- does it mean that if process UUID is not config, we need to try to find processor by name? in this case, we do not need to do uuid_parse and parent->findProcessor is by name instead of UUID. please double check with Aldrin. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #90: MINIFI-294 Required vs. optional fields in...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/90#discussion_r114679611 --- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp --- @@ -450,27 +436,24 @@ void YamlConfiguration::parseConnectionYaml( } else { // we ran out of ways to discover the source processor logger_->log_error( - "Could not locate a source with name %s to create a connection", - connectionSrcProcName); + "Could not locate a source with name %s to create a connection", connectionSrcProcName); throw std::invalid_argument( - "Could not locate a source with name " + - connectionSrcProcName + " to create a connection "); + "Could not locate a source with name " + connectionSrcProcName + " to create a connection "); } } } connection->setSourceUUID(srcUUID); // Configure connection destination uuid_t destUUID; -std::string connectionDestProcName = connectionNode["destination name"] -.as(); if (connectionNode["destination id"]) { - std::string connectionDestProcId = connectionNode["destination id"] - .as(); + std::string connectionDestProcId = connectionNode["destination id"].as(); uuid_parse(connectionDestProcId.c_str(), destUUID); } else { // we use the same logic as above for resolving the source processor // for looking up the destination processor in absence of a processor id + checkRequiredField(, "destination name", CONFIG_YAML_CONNECTIONS_KEY); --- End diff -- same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #90: MINIFI-294 Required vs. optional fields in...
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/90#discussion_r114679460 --- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp --- @@ -417,29 +405,27 @@ void YamlConfiguration::parseConnectionYaml( // Configure connection source -auto rawRelationship = connectionNode["source relationship name"] -.as(); +checkRequiredField(, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY); +auto rawRelationship = connectionNode["source relationship name"].as(); core::Relationship relationship(rawRelationship, ""); -logger_->log_debug( -"parseConnection: relationship => [%s]", rawRelationship); +logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship); if (connection) { connection->setRelationship(relationship); } uuid_t srcUUID; -std::string connectionSrcProcName = connectionNode["source name"] -.as(); + if (connectionNode["source id"]) { - std::string connectionSrcProcId = connectionNode["source id"] - .as(); + std::string connectionSrcProcId = connectionNode["source id"].as(); uuid_parse(connectionSrcProcId.c_str(), srcUUID); } else { // if we don't have a source id, try harder to resolve the source processor. // config schema v2 will make this unnecessary + checkRequiredField(, "source name", CONFIG_YAML_CONNECTIONS_KEY); --- End diff -- i thought we only use source id instead of source name. source id is the process UUID for the connection source --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #82: MINIFI-273 Update README to include doxygen depen...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/82 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #81: MINIFI-269: Add Site2Site Test case
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/81 @apiri fix the provenance report not defined case. Thanks for the review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #81: MINIFI-269: Add Site2Site Test case
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/81 @apiri refactor based on your above comments. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #81: MINIFI-269: Add Site2Site Test case
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/81#discussion_r113841544 --- Diff: libminifi/test/unit/SerializationTests.cpp --- @@ -16,130 +16,237 @@ * limitations under the License. */ - #include "io/BaseStream.h" #include "Site2SitePeer.h" #include "Site2SiteClientProtocol.h" #include +#include "core/logging/LogAppenders.h" +#include "core/logging/BaseLogger.h" +#include "SiteToSiteHelper.h" #include #include #include #include "../TestBase.h" #define FMT_DEFAULT fmt_lower - using namespace org::apache::nifi::minifi::io; -TEST_CASE("TestSetPortId", "[S2S1]"){ - - - std::unique_ptr peer = std::unique_ptr( new minifi::Site2SitePeer(std::unique_ptr(new DataStream()),"fake_host",65433)); - - minifi::Site2SiteClientProtocol protocol(std::move(peer)); - +TEST_CASE("TestSetPortId", "[S2S1]") { - std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538"; + std::unique_ptr peer = + std::unique_ptr < minifi::Site2SitePeer + > (new minifi::Site2SitePeer( + std::unique_ptr < DataStream > (new DataStream()), "fake_host", + 65433)); - uuid_t fakeUUID; + minifi::Site2SiteClientProtocol protocol(std::move(peer)); - uuid_parse(uuid_str.c_str(),fakeUUID); + std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538"; - protocol.setPortId(fakeUUID); + uuid_t fakeUUID; - REQUIRE( uuid_str == protocol.getPortId() ); + uuid_parse(uuid_str.c_str(), fakeUUID); + protocol.setPortId(fakeUUID); + REQUIRE(uuid_str == protocol.getPortId()); } -TEST_CASE("TestSetPortIdUppercase", "[S2S2]"){ - +TEST_CASE("TestSetPortIdUppercase", "[S2S2]") { - std::unique_ptr peer = std::unique_ptr( new minifi::Site2SitePeer(std::unique_ptr(new DataStream()),"fake_host",65433)); + std::unique_ptr peer = + std::unique_ptr < minifi::Site2SitePeer + > (new minifi::Site2SitePeer( + std::unique_ptr < DataStream > (new DataStream()), "fake_host", + 65433)); minifi::Site2SiteClientProtocol protocol(std::move(peer)); + std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538"; - std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538"; - - uuid_t fakeUUID; - - uuid_parse(uuid_str.c_str(),fakeUUID); - - protocol.setPortId(fakeUUID); + uuid_t fakeUUID; - REQUIRE( uuid_str != protocol.getPortId() ); + uuid_parse(uuid_str.c_str(), fakeUUID); - std::transform(uuid_str.begin(),uuid_str.end(),uuid_str.begin(),::tolower); + protocol.setPortId(fakeUUID); - REQUIRE( uuid_str == protocol.getPortId() ); + REQUIRE(uuid_str != protocol.getPortId()); + std::transform(uuid_str.begin(), uuid_str.end(), uuid_str.begin(), ::tolower); + REQUIRE(uuid_str == protocol.getPortId()); } - -TEST_CASE("TestWriteUTF", "[MINIFI193]"){ +TEST_CASE("TestWriteUTF", "[MINIFI193]") { DataStream baseStream; Serializable ser; std::string stringOne = "helo world"; // yes, this has a typo. std::string verifyString; - ser.writeUTF(stringOne,,false); - + ser.writeUTF(stringOne, , false); - ser.readUTF(verifyString,,false); + ser.readUTF(verifyString, , false); REQUIRE(verifyString == stringOne); +} +TEST_CASE("TestWriteUTF2", "[MINIFI193]") { + DataStream baseStream; -} + Serializable ser; + std::string stringOne = "hel\xa1o world"; + REQUIRE(11 == stringOne.length()); + std::string verifyString; + ser.writeUTF(stringOne, , false); + ser.readUTF(verifyString, , false); + + REQUIRE(verifyString == stringOne); +} -TEST_CASE("TestWriteUTF2", "[MINIFI193]"){ +TEST_CASE("TestWriteUTF3", "[MINIFI193]") { DataStream baseStream; Serializable ser; - std::string stringOne = "hel\xa1o world"; - REQUIRE(11 == stringOne.length()); + std::string stringOne = "\xe4\xbd\xa0\xe5\xa5\xbd\xe4\xb8\x96\xe7\x95\x8c"; + REQUIRE(12 == stringOne.length()); std::string verifyString; - ser.writeUTF(stringOn
[GitHub] nifi-minifi-cpp pull request #81: MINIFI-269: Add Site2Site Test case
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/81#discussion_r113841446 --- Diff: libminifi/test/unit/SiteToSiteHelper.h --- @@ -0,0 +1,156 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ --- End diff -- will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #81: MINIFI-269: Add Site2Site Test case
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/81#discussion_r113841440 --- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp --- @@ -339,7 +339,7 @@ void YamlConfiguration::parseProvenanceReportingYaml( return; } - if (!reportNode || !(reportNode->IsSequence())) { --- End diff -- @apiri it is not a sequence for provenance report like processors. Provenance Reporting: scheduling strategy: TIMER_DRIVEN <=we do not have - scheduling period: 1 sec port: 10001 host: localhost port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204 batch size: 100 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #81: MINIFI-269: Add Site2Site Test case
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/81 @apiri please review and merge. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #81: MINIFI-269: Add Site2Site Test case
GitHub user benqiu2016 opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/81 MINIFI-269: Add Site2Site Test case Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/benqiu2016/nifi-minifi-cpp site2site_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/81.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #81 commit a110587f382cd252815c0b4069d7012abdb72f1d Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-04-24T22:07:25Z MINIFI-269: Add Site2Site Test case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #74: Minifi 227
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/74 @apiri Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r111843708 --- Diff: libminifi/include/core/Processor.h --- @@ -239,12 +242,21 @@ class Processor : public Connectable, public ConfigurableComponent, // Trigger the Processor even if the incoming connection is empty std::atomic _triggerWhenEmpty; - private: + //! obtainSite2SiteProtocol for use + std::shared_ptr obtainSite2SiteProtocol(std::string host, uint16_t sport, uuid_t portId); --- End diff -- Open MINIFI-269 for support site2site server protocol Open MINIFI-270 for the refactor of above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #75: MINIFI-257 Expanding the build matrix to include ...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/75 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #77: MINIFI-264 Remove libxml2 from travis build as it...
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/77 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #74: Minifi 227
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/74 @apiri i addressed your above review comments. Adding the new test case to support full site2site is not as easy as we thought, we need to support hand shake/CRC/two phase commit to make sure that end to end works.. Right now we only support S2S client in CPP, we can add S2S server in CPP. If we can add that, it means that we can run both client and server S2S in CPP test suite. We can address that via a different JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #74: Minifi 227
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/74 @phrocker it looks like the travis for MAC is OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp issue #74: Minifi 227
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/74 my MAC build linter is OKW12612:build binqiu$ make linter /Users/binqiu/report/nifi-minifi-cpp/libminifi/include//Connection.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ConfigurableComponent.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ConfigurationFactory.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/Connectable.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/Core.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/FlowConfiguration.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/FlowFile.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/logging/BaseLogger.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/logging/LogAppenders.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/logging/Logger.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ProcessContext.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ProcessGroup.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/P rocessor.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ProcessorConfig.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ProcessorNode.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ProcessSession.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/ProcessSessionFactory.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/Property.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/Relationship.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/repository/FlowFileRepository.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/Repository.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/RepositoryFactory.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/Scheduling.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//core/yaml/YamlConfiguration.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//EventDrivenSchedulingAgent.h,/Users/binqiu/report/nifi- minifi-cpp/libminifi/include//Exception.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//FlowController.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//FlowControlProtocol.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//FlowFileRecord.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/BaseStream.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/ClientSocket.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/CRCStream.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/DataStream.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/EndianCheck.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/Serializable.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/Sockets.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/StreamFactory.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/tls/TLSSocket.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//io/validation.h,/Use rs/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/AppendHostInfo.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/ExecuteProcess.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/GenerateFlowFile.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/GetFile.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/ListenHTTP.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/ListenSyslog.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/LogAttribute.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/PutFile.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//processors/TailFile.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//properties/Configure.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//provenance/Provenance.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//provenance/ProvenanceRepository.h,/Users/binqiu/report/n ifi-minifi-cpp/libminifi/include//provenance/ProvenanceTaskReport.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//RemoteProcessorGroupPort.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//ResourceClaim.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//SchedulingAgent.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//Site2SiteClientProtocol.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//Site2SitePeer.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//ThreadedSchedulingAgent.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//TimerDrivenSchedulingAgent.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//utils/FailurePolicy.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include//utils/StringUtils.h,/Users/binqiu/report/nifi-minifi-cpp/libminifi/include
[GitHub] nifi-minifi-cpp issue #74: Minifi 227
Github user benqiu2016 commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/74 @phrocker 97%] Building C object thirdparty/civetweb-1.9.1/src/CMakeFiles/c-executable.dir/main.c.o Linking C executable civetweb /usr/include/x86_64-linux-gnu/bits/stdio2.h: In function âsend_file_dataâ: /usr/include/x86_64-linux-gnu/bits/stdio2.h:290:2: error: call to â__fread_chk_warnâ declared with attribute warning: fread called with bigger size * nmemb than length of destination buffer [-Werror] return __fread_chk (__ptr, __bos0 (__ptr), __size, __n, __stream); ^ lto1: all warnings being treated as errors lto-wrapper: /usr/bin/gcc returned 1 exit status /usr/bin/ld: lto-wrapper failed collect2: error: ld returned 1 exit status make[2]: *** [thirdparty/civetweb-1.9.1/src/civetweb] Error 1 make[1]: *** [thirdparty/civetweb-1.9.1/src/CMakeFiles/c-executable.dir/all] Error 2 make: *** [all] Error 2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r110006513 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -1240,6 +1257,88 @@ void Site2SiteClientProtocol::transferFlowFiles( return; } +void Site2SiteClientProtocol::transferBytes(core::ProcessContext *context, core::ProcessSession *session, uint8_t *payload, int length, --- End diff -- OK. will change to pass std::string. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r110004694 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -1240,6 +1257,88 @@ void Site2SiteClientProtocol::transferFlowFiles( return; } +void Site2SiteClientProtocol::transferBytes(core::ProcessContext *context, core::ProcessSession *session, uint8_t *payload, int length, --- End diff -- how can i do a std::move from a string to a std::vector without copy? http://stackoverflow.com/questions/10445042/stdmove-between-stdstring-and-stdvectorunsigned-char pass the raw byte looks OK to me and make the code much simple in this case instead of convert from one type to another. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r110004494 --- Diff: libminifi/src/provenance/ProvenanceTaskReport.cpp --- @@ -0,0 +1,221 @@ +/** + * @file ProvenanceTaskReport.cpp + * ProvenanceTaskReport class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "provenance/ProvenanceTaskReport.h" +#include "../include/io/StreamFactory.h" +#include "io/ClientSocket.h" +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "provenance/Provenance.h" +#include "FlowController.h" + +#include "json/json.h" +#include "json/writer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance{ + +const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport"); +core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost"); +core::Property ProvenanceTaskReport::port("Port", "Remote Port", ""); +core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100"); +core::Relationship ProvenanceTaskReport::relation; + +void ProvenanceTaskReport::initialize() +{ + //! Set the supported properties + std::set properties; + properties.insert(hostName); + properties.insert(port); + properties.insert(batchSize); + setSupportedProperties(properties); + //! Set the supported relationships + std::set relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); +} + --- End diff -- what i mean is that they are different processor and it make sense to make the code path differently. Certainly we can extract that returnProtocol and getNextProtocol to a parent class that both ProvenanceTaskReport and RemoteProcessGroup can inherit from if it is what you mean by duplicate code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109958322 --- Diff: libminifi/src/provenance/ProvenanceRepository.cpp --- @@ -36,6 +36,7 @@ void ProvenanceRepository::run() { uint64_t curTime = getTimeMillis(); uint64_t size = repoSize(); if (size >= purgeThreshold) { + std::lock_guard lock(mutex_); --- End diff -- OK --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109956092 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -1240,6 +1257,88 @@ void Site2SiteClientProtocol::transferFlowFiles( return; } +void Site2SiteClientProtocol::transferBytes(core::ProcessContext *context, core::ProcessSession *session, uint8_t *payload, int length, --- End diff -- Can we move a byte array to vector? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109955536 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -682,6 +682,7 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, bool Site2SiteClientProtocol::send( std::string transactionID, DataPacket *packet, std::shared_ptr flowFile, + uint8_t *payload, int length, --- End diff -- OK. will change that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109954861 --- Diff: thirdparty/jsoncpp/devtools/batchbuild.py --- @@ -0,0 +1,278 @@ +from __future__ import print_function --- End diff -- OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109954793 --- Diff: libminifi/src/provenance/ProvenanceTaskReport.cpp --- @@ -0,0 +1,221 @@ +/** + * @file ProvenanceTaskReport.cpp + * ProvenanceTaskReport class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "provenance/ProvenanceTaskReport.h" +#include "../include/io/StreamFactory.h" +#include "io/ClientSocket.h" +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "provenance/Provenance.h" +#include "FlowController.h" + +#include "json/json.h" +#include "json/writer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance{ + +const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport"); +core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost"); +core::Property ProvenanceTaskReport::port("Port", "Remote Port", ""); +core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100"); +core::Relationship ProvenanceTaskReport::relation; + +void ProvenanceTaskReport::initialize() +{ + //! Set the supported properties + std::set properties; + properties.insert(hostName); + properties.insert(port); + properties.insert(batchSize); + setSupportedProperties(properties); + //! Set the supported relationships + std::set relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); +} + +std::unique_ptr ProvenanceTaskReport::getNextProtocol() +{ + std::lock_guard protocol_lock_(protocol_mutex_); + if (available_protocols_.empty()) + return nullptr; + std::unique_ptr return_pointer = std::move(available_protocols_.top()); + available_protocols_.pop(); + return std::move(return_pointer); +} + +void ProvenanceTaskReport::returnProtocol( + std::unique_ptr return_protocol) +{ + std::lock_guard protocol_lock_(protocol_mutex_); + available_protocols_.push(std::move(return_protocol)); +} + +void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, core::ProcessSession *session) +{ + std::string value; + int64_t lvalue; + + std::unique_ptr protocol_ = getNextProtocol(); + + if (protocol_ == nullptr) + { + protocol_ = std::unique_ptr( + new Site2SiteClientProtocol(0)); + protocol_->setPortId(protocol_uuid_); + + std::string host = ""; + uint16_t sport = 0; + + if (context->getProperty(hostName.getName(), value)) { + host = value; + } + if (context->getProperty(port.getName(), value) + && core::Property::StringToInt(value, lvalue)) { + sport = (uint16_t) lvalue; + } + std::unique_ptr str = + std::unique_ptr( + org::apache::nifi::minifi::io::StreamFactory::getInstance() + ->createSocket(host, sport)); + + std::unique_ptr peer_ = std::unique_ptr( + new Site2SitePeer(std::move(str), host, sport)); + + protocol_->setPeer(std::move(peer_)); + } + + if (!protocol_->bootstrap()) + { + // bootstrap the client protocol if needeed + context->yield(); + std::shared_ptr processo
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109954528 --- Diff: libminifi/src/provenance/ProvenanceTaskReport.cpp --- @@ -0,0 +1,221 @@ +/** + * @file ProvenanceTaskReport.cpp + * ProvenanceTaskReport class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "provenance/ProvenanceTaskReport.h" +#include "../include/io/StreamFactory.h" +#include "io/ClientSocket.h" +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "provenance/Provenance.h" +#include "FlowController.h" + +#include "json/json.h" +#include "json/writer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance{ + +const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport"); +core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost"); +core::Property ProvenanceTaskReport::port("Port", "Remote Port", ""); +core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100"); +core::Relationship ProvenanceTaskReport::relation; + +void ProvenanceTaskReport::initialize() +{ + //! Set the supported properties + std::set properties; + properties.insert(hostName); + properties.insert(port); + properties.insert(batchSize); + setSupportedProperties(properties); + //! Set the supported relationships + std::set relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); +} + --- End diff -- right now, we propose the Provenance Report property the same as RemoteProcess Group as hostName and Port, it may change. Also the way s2s protocol negotiation may change. For example. Remote Process Group may support round robin between different NiFi server instead of hardcode the hostName and port in the processor property. The decouple of these two provide more flexible and it make sense because they behave differently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109761443 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -682,6 +682,7 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, bool Site2SiteClientProtocol::send( std::string transactionID, DataPacket *packet, std::shared_ptr flowFile, + uint8_t *payload, int length, --- End diff -- send can take flowfile or raw bytes, DataPacket is more like a hold for the meta info like transaction, etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109760197 --- Diff: libminifi/src/Site2SiteClientProtocol.cpp --- @@ -1240,6 +1257,88 @@ void Site2SiteClientProtocol::transferFlowFiles( return; } +void Site2SiteClientProtocol::transferBytes(core::ProcessContext *context, core::ProcessSession *session, uint8_t *payload, int length, --- End diff -- but we need to copy the payload to vector, it is overhead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109759205 --- Diff: thirdparty/jsoncpp/devtools/batchbuild.py --- @@ -0,0 +1,278 @@ +from __future__ import print_function --- End diff -- the tests source code are there because it was from the drop for jsoncpp. but i disable build and run the same in the minifi build for the jasoncpp cmake. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109758846 --- Diff: libminifi/src/provenance/ProvenanceRepository.cpp --- @@ -36,6 +36,7 @@ void ProvenanceRepository::run() { uint64_t curTime = getTimeMillis(); uint64_t size = repoSize(); if (size >= purgeThreshold) { + std::lock_guard lock(mutex_); --- End diff -- yes, levelDB is thread save, but when we purge the record in provenanceRepository task, i do not want to ProvenanceReportTask to pick the record and ship to NiFi. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109758278 --- Diff: libminifi/src/provenance/ProvenanceTaskReport.cpp --- @@ -0,0 +1,221 @@ +/** + * @file ProvenanceTaskReport.cpp + * ProvenanceTaskReport class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "provenance/ProvenanceTaskReport.h" +#include "../include/io/StreamFactory.h" +#include "io/ClientSocket.h" +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "provenance/Provenance.h" +#include "FlowController.h" + +#include "json/json.h" +#include "json/writer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance{ + +const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport"); +core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost"); +core::Property ProvenanceTaskReport::port("Port", "Remote Port", ""); +core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100"); +core::Relationship ProvenanceTaskReport::relation; + +void ProvenanceTaskReport::initialize() +{ + //! Set the supported properties + std::set properties; + properties.insert(hostName); + properties.insert(port); + properties.insert(batchSize); + setSupportedProperties(properties); + //! Set the supported relationships + std::set relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); +} + +std::unique_ptr ProvenanceTaskReport::getNextProtocol() +{ + std::lock_guard protocol_lock_(protocol_mutex_); + if (available_protocols_.empty()) + return nullptr; + std::unique_ptr return_pointer = std::move(available_protocols_.top()); + available_protocols_.pop(); + return std::move(return_pointer); +} + +void ProvenanceTaskReport::returnProtocol( + std::unique_ptr return_protocol) +{ + std::lock_guard protocol_lock_(protocol_mutex_); + available_protocols_.push(std::move(return_protocol)); +} + +void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, core::ProcessSession *session) +{ + std::string value; + int64_t lvalue; + + std::unique_ptr protocol_ = getNextProtocol(); + + if (protocol_ == nullptr) + { + protocol_ = std::unique_ptr( + new Site2SiteClientProtocol(0)); + protocol_->setPortId(protocol_uuid_); + + std::string host = ""; + uint16_t sport = 0; + + if (context->getProperty(hostName.getName(), value)) { + host = value; + } + if (context->getProperty(port.getName(), value) + && core::Property::StringToInt(value, lvalue)) { + sport = (uint16_t) lvalue; + } + std::unique_ptr str = + std::unique_ptr( + org::apache::nifi::minifi::io::StreamFactory::getInstance() + ->createSocket(host, sport)); + + std::unique_ptr peer_ = std::unique_ptr( + new Site2SitePeer(std::move(str), host, sport)); + + protocol_->setPeer(std::move(peer_)); + } + + if (!protocol_->bootstrap()) + { + // bootstrap the client protocol if needeed + context->yield(); + std::shared_ptr processo
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/74#discussion_r109757748 --- Diff: libminifi/src/provenance/ProvenanceTaskReport.cpp --- @@ -0,0 +1,221 @@ +/** + * @file ProvenanceTaskReport.cpp + * ProvenanceTaskReport class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "provenance/ProvenanceTaskReport.h" +#include "../include/io/StreamFactory.h" +#include "io/ClientSocket.h" +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "provenance/Provenance.h" +#include "FlowController.h" + +#include "json/json.h" +#include "json/writer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance{ + +const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport"); +core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost"); +core::Property ProvenanceTaskReport::port("Port", "Remote Port", ""); +core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100"); +core::Relationship ProvenanceTaskReport::relation; + +void ProvenanceTaskReport::initialize() +{ + //! Set the supported properties + std::set properties; + properties.insert(hostName); + properties.insert(port); + properties.insert(batchSize); + setSupportedProperties(properties); + //! Set the supported relationships + std::set relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); +} + --- End diff -- we may want to add specific S2S protocol logic for shipping provenance report data. It make the code more flexible and decoupled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi-cpp pull request #74: Minifi 227
GitHub user benqiu2016 opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/74 Minifi 227 Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/benqiu2016/nifi-minifi-cpp MINIFI-227 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/74.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #74 commit 1e1823ab583e0d19dec7a8e36d3aba1fbe5f806e Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-03-31T23:04:56Z MINIFI-227: Provenance report commit 8cc6906ff060b68585abef925cf13d0d0f376857 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-04-01T00:53:06Z MINIFI-227: Provenance report commit d639efa280a330fdd6d4764175efe8745cd810ce Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-04-04T15:56:01Z MINIFI-227: Provenance report --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---