MINIFI-227: Provenance report (rebase)
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ca5bc5a0 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ca5bc5a0 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ca5bc5a0 Branch: refs/heads/MINIFI-227 Commit: ca5bc5a0223a73e7d542ec0bd108e7aa4e60da33 Parents: 4636d1e bcabd44 Author: Bin Qiu <benqiu2...@gmail.com> Authored: Thu Apr 6 10:57:33 2017 -0700 Committer: Bin Qiu <benqiu2...@gmail.com> Committed: Thu Apr 6 10:57:33 2017 -0700 ---------------------------------------------------------------------- .travis.yml | 2 +- CMakeLists.txt | 15 + libminifi/include/Connection.h | 2 +- libminifi/include/Exception.h | 93 ++-- libminifi/include/FlowControlProtocol.h | 11 +- libminifi/include/FlowController.h | 5 +- libminifi/include/RemoteProcessorGroupPort.h | 21 +- libminifi/include/ResourceClaim.h | 109 ++--- libminifi/include/SchedulingAgent.h | 20 +- libminifi/include/Site2SiteClientProtocol.h | 6 +- libminifi/include/Site2SitePeer.h | 2 - libminifi/include/core/ConfigurableComponent.h | 16 +- libminifi/include/core/ConfigurationFactory.h | 24 +- libminifi/include/core/Connectable.h | 2 +- libminifi/include/core/Core.h | 178 +++++++ libminifi/include/core/FlowConfiguration.h | 4 +- libminifi/include/core/FlowFile.h | 9 +- libminifi/include/core/ProcessSession.h | 28 +- libminifi/include/core/Processor.h | 9 +- libminifi/include/core/ProcessorConfig.h | 6 +- libminifi/include/core/Repository.h | 19 +- libminifi/include/core/RepositoryFactory.h | 9 +- libminifi/include/core/core.h | 180 ------- .../core/repository/FlowFileRepository.h | 116 +++-- libminifi/include/io/BaseStream.h | 1 - libminifi/include/io/ClientSocket.h | 14 +- libminifi/include/io/StreamFactory.h | 157 +++--- libminifi/include/io/tls/TLSSocket.h | 17 +- libminifi/include/io/validation.h | 10 +- libminifi/include/processors/AppendHostInfo.h | 9 +- libminifi/include/processors/ExecuteProcess.h | 9 +- libminifi/include/processors/GenerateFlowFile.h | 9 +- libminifi/include/processors/GetFile.h | 94 ++-- libminifi/include/processors/ListenHTTP.h | 20 +- libminifi/include/processors/ListenSyslog.h | 15 +- libminifi/include/processors/LogAttribute.h | 9 +- libminifi/include/processors/PutFile.h | 35 +- .../include/processors/RealTimeDataCollector.h | 145 ------ libminifi/include/processors/TailFile.h | 25 +- libminifi/include/properties/Configure.h | 2 +- libminifi/include/provenance/Provenance.h | 1 - .../include/provenance/ProvenanceRepository.h | 4 +- libminifi/include/utils/FailurePolicy.h | 35 +- libminifi/include/utils/StringUtils.h | 182 +++---- libminifi/include/utils/ThreadPool.h | 287 +++++++++++ libminifi/include/utils/TimeUtil.h | 32 +- libminifi/src/Configure.cpp | 253 +++++----- libminifi/src/Connection.cpp | 10 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 4 +- libminifi/src/FlowControlProtocol.cpp | 46 +- libminifi/src/FlowController.cpp | 31 +- libminifi/src/FlowFileRecord.cpp | 54 +-- libminifi/src/RemoteProcessorGroupPort.cpp | 15 +- libminifi/src/ResourceClaim.cpp | 3 +- libminifi/src/SchedulingAgent.cpp | 16 +- libminifi/src/Site2SiteClientProtocol.cpp | 57 ++- libminifi/src/Site2SitePeer.cpp | 31 +- libminifi/src/ThreadedSchedulingAgent.cpp | 42 +- libminifi/src/TimerDrivenSchedulingAgent.cpp | 4 +- libminifi/src/core/ConfigurableComponent.cpp | 31 +- libminifi/src/core/ConfigurationFactory.cpp | 72 +-- libminifi/src/core/Connectable.cpp | 35 +- libminifi/src/core/Core.cpp | 34 +- libminifi/src/core/FlowConfiguration.cpp | 14 +- libminifi/src/core/FlowFile.cpp | 223 +++++++++ libminifi/src/core/ProcessGroup.cpp | 26 +- libminifi/src/core/ProcessSession.cpp | 124 ++++- libminifi/src/core/ProcessSessionFactory.cpp | 10 +- libminifi/src/core/Processor.cpp | 34 +- libminifi/src/core/ProcessorNode.cpp | 24 +- libminifi/src/core/Property.cpp | 5 +- libminifi/src/core/Record.cpp | 225 --------- libminifi/src/core/Repository.cpp | 4 +- libminifi/src/core/RepositoryFactory.cpp | 98 ++-- libminifi/src/core/logging/BaseLogger.cpp | 7 +- libminifi/src/core/logging/LogAppenders.cpp | 13 +- libminifi/src/core/logging/Logger.cpp | 1 + .../src/core/repository/FlowFileRepository.cpp | 76 +-- libminifi/src/core/yaml/YamlConfiguration.cpp | 20 +- libminifi/src/io/BaseStream.cpp | 36 +- libminifi/src/io/CRCStream.cpp | 2 - libminifi/src/io/ClientSocket.cpp | 82 +--- libminifi/src/io/DataStream.cpp | 150 +++--- libminifi/src/io/EndianCheck.cpp | 1 - libminifi/src/io/Serializable.cpp | 24 +- libminifi/src/io/StreamFactory.cpp | 8 +- libminifi/src/io/tls/TLSSocket.cpp | 359 +++++++------- libminifi/src/processors/AppendHostInfo.cpp | 41 +- libminifi/src/processors/ExecuteProcess.cpp | 34 +- libminifi/src/processors/GenerateFlowFile.cpp | 192 ++++---- libminifi/src/processors/GetFile.cpp | 137 +++--- libminifi/src/processors/ListenHTTP.cpp | 76 ++- libminifi/src/processors/ListenSyslog.cpp | 55 +-- libminifi/src/processors/LogAttribute.cpp | 31 +- libminifi/src/processors/PutFile.cpp | 69 ++- .../src/processors/RealTimeDataCollector.cpp | 480 ------------------- libminifi/src/processors/TailFile.cpp | 78 +-- libminifi/src/provenance/Provenance.cpp | 38 +- .../src/provenance/ProvenanceRepository.cpp | 26 +- libminifi/test/TestBase.h | 4 +- libminifi/test/TestExecuteProcess.cpp | 131 +++++ libminifi/test/nodefs/NoLevelDB.cpp | 2 +- libminifi/test/nodefs/NoYamlConfiguration.cpp | 2 +- libminifi/test/unit/ProcessorTests.cpp | 142 +++++- libminifi/test/unit/ProvenanceTestHelper.h | 2 +- libminifi/test/unit/ProvenanceTests.cpp | 2 +- libminifi/test/unit/RepoTests.cpp | 2 +- main/MiNiFiMain.cpp | 2 +- thirdparty/google-styleguide/cpplint.py | 6 - thirdparty/google-styleguide/run_linter.sh | 7 +- 110 files changed, 2865 insertions(+), 2996 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/CMakeLists.txt ---------------------------------------------------------------------- diff --cc CMakeLists.txt index 00ba999,b84706d..2ae7332 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@@ -136,8 -134,23 +136,23 @@@ enable_testing(test target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils") target_include_directories(tests PRIVATE BEFORE "libminifi/include/processors") target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance") - target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp) + target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp jsoncpp_lib_static) add_test(NAME LibMinifiTests COMMAND tests) + + file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/TestExecuteProcess.cpp") + add_executable(testExecuteProcess ${LIBMINIFI_TEST_EXECUTE_PROCESS} ${SPD_SOURCES}) + target_include_directories(testExecuteProcess PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") + target_include_directories(testExecuteProcess PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) + target_include_directories(testExecuteProcess PRIVATE BEFORE "include") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/repository") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/io") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/utils") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/processors") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/provenance") + target_link_libraries(testExecuteProcess ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp) + add_test(NAME ExecuteProcess COMMAND testExecuteProcess) # Create a custom build target called "docker" that will invoke DockerBuild.sh and create the NiFi-MiNiFi-CPP Docker image add_custom_target( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --cc libminifi/include/Site2SiteClientProtocol.h index 1ab6f4f,78673d8..67fd444 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@@ -533,16 -529,11 +532,15 @@@ class Site2SiteClientProtocol // Error the transaction void error(std::string transactionID); // Receive flow files for the process session - void receiveFlowFiles( - core::ProcessContext *context, - core::ProcessSession *session); + void receiveFlowFiles(core::ProcessContext *context, + core::ProcessSession *session); // Transfer flow files for the process session - void transferFlowFiles(core::ProcessContext *context, - core::ProcessSession *session); + void transferFlowFiles( + core::ProcessContext *context, + core::ProcessSession *session); + //! Transfer string for the process session + void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, + std::map<std::string, std::string> attributes); // deleteTransaction void deleteTransaction(std::string transactionID); // Nest Callback Class for write stream http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --cc libminifi/include/core/Processor.h index 69fc3f3,2b540ec..e945fa4 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@@ -255,16 -245,9 +253,13 @@@ private std::mutex mutex_; // Yield Expiration std::atomic<uint64_t> yield_expiration_; + + // Site2Site Protocols + std::stack<std::shared_ptr<Site2SiteClientProtocol>> available_protocols_; + std::atomic<bool> protocols_created_; - // Check all incoming connections for work bool isWorkAvailable(); - // Logger - std::shared_ptr<logging::Logger> logger_; // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer Processor(const Processor &parent); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --cc libminifi/src/RemoteProcessorGroupPort.cpp index 849a2da,33f0cb2..9e40824 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@@ -46,11 -47,26 +47,10 @@@ core::Property RemoteProcessorGroupPort "Remote Host Name.", "localhost"); core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); +core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", ""); core::Relationship RemoteProcessorGroupPort::relation; -std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol() { - std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_); - if (available_protocols_.empty()) - return nullptr; - - std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move( - available_protocols_.top()); - available_protocols_.pop(); - return std::move(return_pointer); -} - -void RemoteProcessorGroupPort::returnProtocol( - std::unique_ptr<Site2SiteClientProtocol> return_protocol) { - std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_); - available_protocols_.push(std::move(return_protocol)); -} - void RemoteProcessorGroupPort::initialize() { - // Set the supported properties std::set<core::Property> properties; properties.insert(hostName); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --cc libminifi/src/Site2SiteClientProtocol.cpp index 208432c,52a0a02..9b4d307 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@@ -1258,89 -1250,6 +1268,88 @@@ void Site2SiteClientProtocol::transferF return; } +void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, + std::map<std::string, std::string> attributes) +{ + Transaction *transaction = NULL; + + if (payload.length() <= 0) + return; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, SEND); + + if (transaction == NULL) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + try + { + DataPacket packet(this, transaction, attributes, payload); + + if (!send(transactionID, &packet, nullptr, session)) + { + throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); + return; + } + logger_->log_info("Site2Site transaction %s send bytes length %d", + transactionID.c_str(), payload.length()); + + if (!confirm(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); + return; + } + if (!complete(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); + return; + } + logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + } + catch (std::exception &exception) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug("Caught Exception during Site2SiteClientProtocol::transferBytes"); + throw; + } + + deleteTransaction(transactionID); + + return; +} + - } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/provenance/Provenance.cpp ----------------------------------------------------------------------