MINIFI-227: Provenance report (rebase)

Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ca5bc5a0
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ca5bc5a0
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ca5bc5a0

Branch: refs/heads/MINIFI-227
Commit: ca5bc5a0223a73e7d542ec0bd108e7aa4e60da33
Parents: 4636d1e bcabd44
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Thu Apr 6 10:57:33 2017 -0700
Committer: Bin Qiu <benqiu2...@gmail.com>
Committed: Thu Apr 6 10:57:33 2017 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 CMakeLists.txt                                  |  15 +
 libminifi/include/Connection.h                  |   2 +-
 libminifi/include/Exception.h                   |  93 ++--
 libminifi/include/FlowControlProtocol.h         |  11 +-
 libminifi/include/FlowController.h              |   5 +-
 libminifi/include/RemoteProcessorGroupPort.h    |  21 +-
 libminifi/include/ResourceClaim.h               | 109 ++---
 libminifi/include/SchedulingAgent.h             |  20 +-
 libminifi/include/Site2SiteClientProtocol.h     |   6 +-
 libminifi/include/Site2SitePeer.h               |   2 -
 libminifi/include/core/ConfigurableComponent.h  |  16 +-
 libminifi/include/core/ConfigurationFactory.h   |  24 +-
 libminifi/include/core/Connectable.h            |   2 +-
 libminifi/include/core/Core.h                   | 178 +++++++
 libminifi/include/core/FlowConfiguration.h      |   4 +-
 libminifi/include/core/FlowFile.h               |   9 +-
 libminifi/include/core/ProcessSession.h         |  28 +-
 libminifi/include/core/Processor.h              |   9 +-
 libminifi/include/core/ProcessorConfig.h        |   6 +-
 libminifi/include/core/Repository.h             |  19 +-
 libminifi/include/core/RepositoryFactory.h      |   9 +-
 libminifi/include/core/core.h                   | 180 -------
 .../core/repository/FlowFileRepository.h        | 116 +++--
 libminifi/include/io/BaseStream.h               |   1 -
 libminifi/include/io/ClientSocket.h             |  14 +-
 libminifi/include/io/StreamFactory.h            | 157 +++---
 libminifi/include/io/tls/TLSSocket.h            |  17 +-
 libminifi/include/io/validation.h               |  10 +-
 libminifi/include/processors/AppendHostInfo.h   |   9 +-
 libminifi/include/processors/ExecuteProcess.h   |   9 +-
 libminifi/include/processors/GenerateFlowFile.h |   9 +-
 libminifi/include/processors/GetFile.h          |  94 ++--
 libminifi/include/processors/ListenHTTP.h       |  20 +-
 libminifi/include/processors/ListenSyslog.h     |  15 +-
 libminifi/include/processors/LogAttribute.h     |   9 +-
 libminifi/include/processors/PutFile.h          |  35 +-
 .../include/processors/RealTimeDataCollector.h  | 145 ------
 libminifi/include/processors/TailFile.h         |  25 +-
 libminifi/include/properties/Configure.h        |   2 +-
 libminifi/include/provenance/Provenance.h       |   1 -
 .../include/provenance/ProvenanceRepository.h   |   4 +-
 libminifi/include/utils/FailurePolicy.h         |  35 +-
 libminifi/include/utils/StringUtils.h           | 182 +++----
 libminifi/include/utils/ThreadPool.h            | 287 +++++++++++
 libminifi/include/utils/TimeUtil.h              |  32 +-
 libminifi/src/Configure.cpp                     | 253 +++++-----
 libminifi/src/Connection.cpp                    |  10 +-
 libminifi/src/EventDrivenSchedulingAgent.cpp    |   4 +-
 libminifi/src/FlowControlProtocol.cpp           |  46 +-
 libminifi/src/FlowController.cpp                |  31 +-
 libminifi/src/FlowFileRecord.cpp                |  54 +--
 libminifi/src/RemoteProcessorGroupPort.cpp      |  15 +-
 libminifi/src/ResourceClaim.cpp                 |   3 +-
 libminifi/src/SchedulingAgent.cpp               |  16 +-
 libminifi/src/Site2SiteClientProtocol.cpp       |  57 ++-
 libminifi/src/Site2SitePeer.cpp                 |  31 +-
 libminifi/src/ThreadedSchedulingAgent.cpp       |  42 +-
 libminifi/src/TimerDrivenSchedulingAgent.cpp    |   4 +-
 libminifi/src/core/ConfigurableComponent.cpp    |  31 +-
 libminifi/src/core/ConfigurationFactory.cpp     |  72 +--
 libminifi/src/core/Connectable.cpp              |  35 +-
 libminifi/src/core/Core.cpp                     |  34 +-
 libminifi/src/core/FlowConfiguration.cpp        |  14 +-
 libminifi/src/core/FlowFile.cpp                 | 223 +++++++++
 libminifi/src/core/ProcessGroup.cpp             |  26 +-
 libminifi/src/core/ProcessSession.cpp           | 124 ++++-
 libminifi/src/core/ProcessSessionFactory.cpp    |  10 +-
 libminifi/src/core/Processor.cpp                |  34 +-
 libminifi/src/core/ProcessorNode.cpp            |  24 +-
 libminifi/src/core/Property.cpp                 |   5 +-
 libminifi/src/core/Record.cpp                   | 225 ---------
 libminifi/src/core/Repository.cpp               |   4 +-
 libminifi/src/core/RepositoryFactory.cpp        |  98 ++--
 libminifi/src/core/logging/BaseLogger.cpp       |   7 +-
 libminifi/src/core/logging/LogAppenders.cpp     |  13 +-
 libminifi/src/core/logging/Logger.cpp           |   1 +
 .../src/core/repository/FlowFileRepository.cpp  |  76 +--
 libminifi/src/core/yaml/YamlConfiguration.cpp   |  20 +-
 libminifi/src/io/BaseStream.cpp                 |  36 +-
 libminifi/src/io/CRCStream.cpp                  |   2 -
 libminifi/src/io/ClientSocket.cpp               |  82 +---
 libminifi/src/io/DataStream.cpp                 | 150 +++---
 libminifi/src/io/EndianCheck.cpp                |   1 -
 libminifi/src/io/Serializable.cpp               |  24 +-
 libminifi/src/io/StreamFactory.cpp              |   8 +-
 libminifi/src/io/tls/TLSSocket.cpp              | 359 +++++++-------
 libminifi/src/processors/AppendHostInfo.cpp     |  41 +-
 libminifi/src/processors/ExecuteProcess.cpp     |  34 +-
 libminifi/src/processors/GenerateFlowFile.cpp   | 192 ++++----
 libminifi/src/processors/GetFile.cpp            | 137 +++---
 libminifi/src/processors/ListenHTTP.cpp         |  76 ++-
 libminifi/src/processors/ListenSyslog.cpp       |  55 +--
 libminifi/src/processors/LogAttribute.cpp       |  31 +-
 libminifi/src/processors/PutFile.cpp            |  69 ++-
 .../src/processors/RealTimeDataCollector.cpp    | 480 -------------------
 libminifi/src/processors/TailFile.cpp           |  78 +--
 libminifi/src/provenance/Provenance.cpp         |  38 +-
 .../src/provenance/ProvenanceRepository.cpp     |  26 +-
 libminifi/test/TestBase.h                       |   4 +-
 libminifi/test/TestExecuteProcess.cpp           | 131 +++++
 libminifi/test/nodefs/NoLevelDB.cpp             |   2 +-
 libminifi/test/nodefs/NoYamlConfiguration.cpp   |   2 +-
 libminifi/test/unit/ProcessorTests.cpp          | 142 +++++-
 libminifi/test/unit/ProvenanceTestHelper.h      |   2 +-
 libminifi/test/unit/ProvenanceTests.cpp         |   2 +-
 libminifi/test/unit/RepoTests.cpp               |   2 +-
 main/MiNiFiMain.cpp                             |   2 +-
 thirdparty/google-styleguide/cpplint.py         |   6 -
 thirdparty/google-styleguide/run_linter.sh      |   7 +-
 110 files changed, 2865 insertions(+), 2996 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/CMakeLists.txt
----------------------------------------------------------------------
diff --cc CMakeLists.txt
index 00ba999,b84706d..2ae7332
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@@ -136,8 -134,23 +136,23 @@@ enable_testing(test
      target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils")
      target_include_directories(tests PRIVATE BEFORE 
"libminifi/include/processors")
      target_include_directories(tests PRIVATE BEFORE 
"libminifi/include/provenance")
 -    target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} 
${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library 
civetweb-cpp)
 +    target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} 
${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library 
civetweb-cpp jsoncpp_lib_static)
      add_test(NAME LibMinifiTests COMMAND tests)
+     
+     file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS 
"libminifi/test/TestExecuteProcess.cpp")
+     add_executable(testExecuteProcess ${LIBMINIFI_TEST_EXECUTE_PROCESS} 
${SPD_SOURCES})
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
${LEVELDB_INCLUDE_DIRS})
+     target_include_directories(testExecuteProcess PRIVATE BEFORE "include")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/core")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/core/repository")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/io")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/utils")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/processors")
+     target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/provenance")
+     target_link_libraries(testExecuteProcess ${CMAKE_THREAD_LIBS_INIT} 
${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp 
c-library civetweb-cpp)
+     add_test(NAME ExecuteProcess COMMAND testExecuteProcess)
  
  # Create a custom build target called "docker" that will invoke 
DockerBuild.sh and create the NiFi-MiNiFi-CPP Docker image
  add_custom_target(

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --cc libminifi/include/Site2SiteClientProtocol.h
index 1ab6f4f,78673d8..67fd444
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@@ -533,16 -529,11 +532,15 @@@ class Site2SiteClientProtocol 
    // Error the transaction
    void error(std::string transactionID);
    // Receive flow files for the process session
-   void receiveFlowFiles(
-       core::ProcessContext *context,
-       core::ProcessSession *session);
+   void receiveFlowFiles(core::ProcessContext *context,
+                         core::ProcessSession *session);
    // Transfer flow files for the process session
 -  void transferFlowFiles(core::ProcessContext *context,
 -                         core::ProcessSession *session);
 +  void transferFlowFiles(
 +      core::ProcessContext *context,
 +      core::ProcessSession *session);
 +  //! Transfer string for the process session
 +  void transferString(core::ProcessContext *context, core::ProcessSession 
*session, std::string &payload,
 +      std::map<std::string, std::string> attributes);
    // deleteTransaction
    void deleteTransaction(std::string transactionID);
    // Nest Callback Class for write stream

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --cc libminifi/include/core/Processor.h
index 69fc3f3,2b540ec..e945fa4
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@@ -255,16 -245,9 +253,13 @@@ private
    std::mutex mutex_;
    // Yield Expiration
    std::atomic<uint64_t> yield_expiration_;
 +  
 +  // Site2Site Protocols
 +  std::stack<std::shared_ptr<Site2SiteClientProtocol>> available_protocols_;
 +  std::atomic<bool> protocols_created_;
  
- 
    // Check all incoming connections for work
    bool isWorkAvailable();
-   // Logger
-   std::shared_ptr<logging::Logger> logger_;
    // Prevent default copy constructor and assignment operation
    // Only support pass by reference or pointer
    Processor(const Processor &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --cc libminifi/src/RemoteProcessorGroupPort.cpp
index 849a2da,33f0cb2..9e40824
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@@ -46,11 -47,26 +47,10 @@@ core::Property RemoteProcessorGroupPort
                                                    "Remote Host Name.",
                                                    "localhost");
  core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
 +core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies 
remote NiFi Port UUID.", "");
  core::Relationship RemoteProcessorGroupPort::relation;
  
 -std::unique_ptr<Site2SiteClientProtocol> 
RemoteProcessorGroupPort::getNextProtocol() {
 -  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
 -  if (available_protocols_.empty())
 -    return nullptr;
 -
 -  std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(
 -      available_protocols_.top());
 -  available_protocols_.pop();
 -  return std::move(return_pointer);
 -}
 -
 -void RemoteProcessorGroupPort::returnProtocol(
 -    std::unique_ptr<Site2SiteClientProtocol> return_protocol) {
 -  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
 -  available_protocols_.push(std::move(return_protocol));
 -}
 -
  void RemoteProcessorGroupPort::initialize() {
- 
    // Set the supported properties
    std::set<core::Property> properties;
    properties.insert(hostName);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --cc libminifi/src/Site2SiteClientProtocol.cpp
index 208432c,52a0a02..9b4d307
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@@ -1258,89 -1250,6 +1268,88 @@@ void Site2SiteClientProtocol::transferF
    return;
  }
  
 +void Site2SiteClientProtocol::transferString(core::ProcessContext *context, 
core::ProcessSession *session, std::string &payload,
 +              std::map<std::string, std::string> attributes)
 +{
 +      Transaction *transaction = NULL;
 +
 +      if (payload.length() <= 0)
 +              return;
 +
 +      if (_peerState != READY)
 +      {
 +              bootstrap();
 +      }
 +
 +      if (_peerState != READY)
 +      {
 +              context->yield();
 +              tearDown();
 +              throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
 +              return;
 +      }
 +
 +      // Create the transaction
 +      std::string transactionID;
 +      transaction = createTransaction(transactionID, SEND);
 +
 +      if (transaction == NULL)
 +      {
 +              context->yield();
 +              tearDown();
 +              throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
 +              return;
 +      }
 +
 +      try
 +      {
 +              DataPacket packet(this, transaction, attributes, payload);
 +
 +              if (!send(transactionID, &packet, nullptr, session))
 +              {
 +                      throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
 +                      return;
 +              }
 +              logger_->log_info("Site2Site transaction %s send bytes length 
%d",
 +                                                      transactionID.c_str(), 
payload.length());
 +
 +              if (!confirm(transactionID))
 +              {
 +                      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
 +                      return;
 +              }
 +              if (!complete(transactionID))
 +              {
 +                      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
 +                      return;
 +              }
 +              logger_->log_info("Site2Site transaction %s successfully send 
flow record %d, content bytes %d",
 +                              transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
 +      }
 +      catch (std::exception &exception)
 +      {
 +              if (transaction)
 +                      deleteTransaction(transactionID);
 +              context->yield();
 +              tearDown();
 +              logger_->log_debug("Caught Exception %s", exception.what());
 +              throw;
 +      }
 +      catch (...)
 +      {
 +              if (transaction)
 +                      deleteTransaction(transactionID);
 +              context->yield();
 +              tearDown();
 +              logger_->log_debug("Caught Exception during 
Site2SiteClientProtocol::transferBytes");
 +              throw;
 +      }
 +
 +      deleteTransaction(transactionID);
 +
 +      return;
 +}
 +
- 
  } /* namespace minifi */
  } /* namespace nifi */
  } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ca5bc5a0/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------

Reply via email to