MINIFI-227: Provenance report (linter)

Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/770932a0
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/770932a0
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/770932a0

Branch: refs/heads/MINIFI-227
Commit: 770932a03862175c7cd0556d06b9a2edd0c600dc
Parents: ca5bc5a
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Thu Apr 6 15:57:26 2017 -0700
Committer: Bin Qiu <benqiu2...@gmail.com>
Committed: Thu Apr 6 15:57:26 2017 -0700

----------------------------------------------------------------------
 .../include/provenance/ProvenanceTaskReport.h   |   2 +-
 libminifi/src/RemoteProcessorGroupPort.cpp      |  30 +-
 libminifi/src/Site2SiteClientProtocol.cpp       | 496 +++++++++----------
 libminifi/src/core/Processor.cpp                | 142 +++---
 .../src/provenance/ProvenanceTaskReport.cpp     | 276 +++++------
 5 files changed, 466 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/include/provenance/ProvenanceTaskReport.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceTaskReport.h 
b/libminifi/include/provenance/ProvenanceTaskReport.h
index 1d9d507..4123593 100644
--- a/libminifi/include/provenance/ProvenanceTaskReport.h
+++ b/libminifi/include/provenance/ProvenanceTaskReport.h
@@ -52,7 +52,7 @@ public:
 
        }
        //! Processor Name
-       static const std::string ProcessorName;
+       static constexpr char const* ProcessorName = "ProvenanceTaskReport";
        //! Supported Properties
        static core::Property hostName;
        static core::Property port;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index 9e40824..8ee3680 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -44,10 +44,10 @@ namespace minifi {
 const std::string RemoteProcessorGroupPort::ProcessorName(
     "RemoteProcessorGroupPort");
 core::Property RemoteProcessorGroupPort::hostName("Host Name",
-                                                  "Remote Host Name.",
-                                                  "localhost");
+    "Remote Host Name.", "localhost");
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
-core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies 
remote NiFi Port UUID.", "");
+core::Property RemoteProcessorGroupPort::portUUID("Port UUID",
+    "Specifies remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
 
 void RemoteProcessorGroupPort::initialize() {
@@ -64,7 +64,7 @@ void RemoteProcessorGroupPort::initialize() {
 }
 
 void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
-                                         core::ProcessSession *session) {
+    core::ProcessSession *session) {
   if (!transmitting_)
     return;
 
@@ -74,31 +74,31 @@ void 
RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
   uint16_t sport = 0;
 
   if (context->getProperty(hostName.getName(), value)) {
-       host = value;
+    host = value;
   }
   if (context->getProperty(port.getName(), value)
-           && core::Property::StringToInt(value, lvalue)) {
-       sport = (uint16_t) lvalue;
+      && core::Property::StringToInt(value, lvalue)) {
+    sport = (uint16_t) lvalue;
   }
   if (context->getProperty(portUUID.getName(), value)) {
-       uuid_parse(value.c_str(), protocol_uuid_);
+    uuid_parse(value.c_str(), protocol_uuid_);
   }
 
-  std::shared_ptr<Site2SiteClientProtocol> protocol_ = 
this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
+  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
+      this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
 
-  if (!protocol_)
-  {
+  if (!protocol_) {
     context->yield();
-       return;
+    return;
   }
 
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-        context->getProcessorNode().getProcessor());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
+        > (context->getProcessorNode().getProcessor());
     logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-                       processor->getYieldPeriodMsec());
+        processor->getYieldPeriodMsec());
     returnSite2SiteProtocol(protocol_);
     return;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp 
b/libminifi/src/Site2SiteClientProtocol.cpp
index 9b4d307..fbde8a0 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -106,39 +106,39 @@ bool 
Site2SiteClientProtocol::initiateResourceNegotiation() {
   }
   logger_->log_info("status code is %i", statusCode);
   switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        // tearDown();
-        return false;
-      }
-      logger_->log_info(
-          "Site2Site Server Response asked for a different protocol version 
%d",
-          serverVersion);
-      for (unsigned int i = (_currentVersionIndex + 1);
-          i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedVersion[i]) {
-          _currentVersion = _supportedVersion[i];
-          _currentVersionIndex = i;
-          return initiateResourceNegotiation();
-        }
-      }
-      ret = -1;
-      // tearDown();
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Negotiate protocol response ABORT");
-      ret = -1;
+  case RESOURCE_OK:
+    logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+    return true;
+  case DIFFERENT_RESOURCE_VERSION:
+    uint32_t serverVersion;
+    ret = peer_->read(serverVersion);
+    if (ret <= 0) {
       // tearDown();
       return false;
-    default:
-      logger_->log_info("Negotiate protocol response unknown code %d",
-                        statusCode);
-      return true;
+    }
+    logger_->log_info(
+        "Site2Site Server Response asked for a different protocol version %d",
+        serverVersion);
+    for (unsigned int i = (_currentVersionIndex + 1);
+        i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+      if (serverVersion >= _supportedVersion[i]) {
+        _currentVersion = _supportedVersion[i];
+        _currentVersionIndex = i;
+        return initiateResourceNegotiation();
+      }
+    }
+    ret = -1;
+    // tearDown();
+    return false;
+  case NEGOTIATED_ABORT:
+    logger_->log_info("Site2Site Negotiate protocol response ABORT");
+    ret = -1;
+    // tearDown();
+    return false;
+  default:
+    logger_->log_info("Negotiate protocol response unknown code %d",
+        statusCode);
+    return true;
   }
 
   return true;
@@ -181,38 +181,38 @@ bool 
Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
   }
 
   switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Codec Negotiate version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        // tearDown();
-        return false;
-      }
-      logger_->log_info(
-          "Site2Site Server Response asked for a different codec version %d",
-          serverVersion);
-      for (unsigned int i = (_currentCodecVersionIndex + 1);
-          i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedCodecVersion[i]) {
-          _currentCodecVersion = _supportedCodecVersion[i];
-          _currentCodecVersionIndex = i;
-          return initiateCodecResourceNegotiation();
-        }
-      }
-      ret = -1;
-      // tearDown();
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Codec Negotiate response ABORT");
-      ret = -1;
+  case RESOURCE_OK:
+    logger_->log_info("Site2Site Codec Negotiate version OK");
+    return true;
+  case DIFFERENT_RESOURCE_VERSION:
+    uint32_t serverVersion;
+    ret = peer_->read(serverVersion);
+    if (ret <= 0) {
       // tearDown();
       return false;
-    default:
-      logger_->log_info("Negotiate Codec response unknown code %d", 
statusCode);
-      return true;
+    }
+    logger_->log_info(
+        "Site2Site Server Response asked for a different codec version %d",
+        serverVersion);
+    for (unsigned int i = (_currentCodecVersionIndex + 1);
+        i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+      if (serverVersion >= _supportedCodecVersion[i]) {
+        _currentCodecVersion = _supportedCodecVersion[i];
+        _currentCodecVersionIndex = i;
+        return initiateCodecResourceNegotiation();
+      }
+    }
+    ret = -1;
+    // tearDown();
+    return false;
+  case NEGOTIATED_ABORT:
+    logger_->log_info("Site2Site Codec Negotiate response ABORT");
+    ret = -1;
+    // tearDown();
+    return false;
+  default:
+    logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
+    return true;
   }
 
   return true;
@@ -241,7 +241,7 @@ bool Site2SiteClientProtocol::handShake() {
     return false;
   }
 
-  std::map<std::string, std::string> properties;
+  std::map < std::string, std::string > properties;
   properties[HandShakePropertyStr[GZIP]] = "false";
   properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
   properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(
@@ -286,7 +286,7 @@ bool Site2SiteClientProtocol::handShake() {
       return false;
     }
     logger_->log_info("Site2Site Protocol Send handshake properties %s %s",
-                      it->first.c_str(), it->second.c_str());
+        it->first.c_str(), it->second.c_str());
   }
 
   RespondCode code;
@@ -300,28 +300,28 @@ bool Site2SiteClientProtocol::handShake() {
   }
 
   switch (code) {
-    case PROPERTIES_OK:
-      logger_->log_info("Site2Site HandShake Completed");
-      _peerState = HANDSHAKED;
-      return true;
-    case PORT_NOT_IN_VALID_STATE:
-    case UNKNOWN_PORT:
-    case PORTS_DESTINATION_FULL:
-      logger_->log_error(
-          "Site2Site HandShake Failed because destination port is either 
invalid or full");
-      ret = -1;
-      /*
-       peer_->yield();
-       tearDown(); */
-      return false;
-    default:
-      logger_->log_info("HandShake Failed because of unknown respond code %d",
-                        code);
-      ret = -1;
-      /*
-       peer_->yield();
-       tearDown(); */
-      return false;
+  case PROPERTIES_OK:
+    logger_->log_info("Site2Site HandShake Completed");
+    _peerState = HANDSHAKED;
+    return true;
+  case PORT_NOT_IN_VALID_STATE:
+  case UNKNOWN_PORT:
+  case PORTS_DESTINATION_FULL:
+    logger_->log_error(
+        "Site2Site HandShake Failed because destination port is either invalid 
or full");
+    ret = -1;
+    /*
+     peer_->yield();
+     tearDown(); */
+    return false;
+  default:
+    logger_->log_info("HandShake Failed because of unknown respond code %d",
+        code);
+    ret = -1;
+    /*
+     peer_->yield();
+     tearDown(); */
+    return false;
   }
 
   return false;
@@ -369,7 +369,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType 
&type) {
 }
 
 int Site2SiteClientProtocol::readRespond(RespondCode &code,
-                                         std::string &message) {
+    std::string &message) {
   uint8_t firstByte;
 
   int ret = peer_->read(firstByte);
@@ -408,7 +408,7 @@ int Site2SiteClientProtocol::readRespond(RespondCode &code,
 }
 
 int Site2SiteClientProtocol::writeRespond(RespondCode code,
-                                          std::string message) {
+    std::string message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
 
   if (resCode == NULL) {
@@ -525,31 +525,31 @@ Transaction* Site2SiteClientProtocol::createTransaction(
     org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(
         peer_.get());
     switch (code) {
-      case MORE_DATA:
-        dataAvailable = true;
-        logger_->log_info("Site2Site peer indicates that data is available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s",
-                          transaction->getUUIDStr().c_str());
-        return transaction;
-      case NO_MORE_DATA:
-        dataAvailable = false;
-        logger_->log_info("Site2Site peer indicates that no data is 
available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s",
-                          transaction->getUUIDStr().c_str());
-        return transaction;
-      default:
-        logger_->log_info(
-            "Site2Site got unexpected response %d when asking for data", code);
-        // tearDown();
-        return NULL;
+    case MORE_DATA:
+      dataAvailable = true;
+      logger_->log_info("Site2Site peer indicates that data is available");
+      transaction = new Transaction(direction, crcstream);
+      _transactionMap[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      transaction->setDataAvailable(dataAvailable);
+      logger_->log_info("Site2Site create transaction %s",
+          transaction->getUUIDStr().c_str());
+      return transaction;
+    case NO_MORE_DATA:
+      dataAvailable = false;
+      logger_->log_info("Site2Site peer indicates that no data is available");
+      transaction = new Transaction(direction, crcstream);
+      _transactionMap[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      transaction->setDataAvailable(dataAvailable);
+      logger_->log_info("Site2Site create transaction %s",
+          transaction->getUUIDStr().c_str());
+      return transaction;
+    default:
+      logger_->log_info(
+          "Site2Site got unexpected response %d when asking for data", code);
+      // tearDown();
+      return NULL;
     }
   } else {
     ret = writeRequestType(SEND_FLOWFILES);
@@ -564,14 +564,14 @@ Transaction* Site2SiteClientProtocol::createTransaction(
       _transactionMap[transaction->getUUIDStr()] = transaction;
       transactionID = transaction->getUUIDStr();
       logger_->log_info("Site2Site create transaction %s",
-                        transaction->getUUIDStr().c_str());
+          transaction->getUUIDStr().c_str());
       return transaction;
     }
   }
 }
 
 bool Site2SiteClientProtocol::receive(std::string transactionID,
-                                      DataPacket *packet, bool &eof) {
+    DataPacket *packet, bool &eof) {
   int ret;
   Transaction *transaction = NULL;
 
@@ -602,7 +602,7 @@ bool Site2SiteClientProtocol::receive(std::string 
transactionID,
 
   if (transaction->getDirection() != RECEIVE) {
     logger_->log_info("Site2Site transaction %s direction is wrong",
-                      transactionID.c_str());
+        transactionID.c_str());
     return false;
   }
 
@@ -687,9 +687,8 @@ bool Site2SiteClientProtocol::receive(std::string 
transactionID,
 }
 
 bool Site2SiteClientProtocol::send(std::string transactionID,
-                                   DataPacket *packet,
-                                   std::shared_ptr<FlowFileRecord> flowFile,
-                                   core::ProcessSession *session) {
+    DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile,
+    core::ProcessSession *session) {
   int ret;
   Transaction *transaction = NULL;
 
@@ -720,7 +719,7 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
 
   if (transaction->getDirection() != SEND) {
     logger_->log_info("Site2Site transaction %s direction is wrong",
-                      transactionID.c_str());
+        transactionID.c_str());
     return false;
   }
 
@@ -751,38 +750,38 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
       return false;
     }
     logger_->log_info("Site2Site transaction %s send attribute key %s value 
%s",
-                      transactionID.c_str(), itAttribute->first.c_str(),
-                      itAttribute->second.c_str());
+        transactionID.c_str(), itAttribute->first.c_str(),
+        itAttribute->second.c_str());
   }
 
   uint64_t len = 0;
   if (flowFile) {
-       len = flowFile->getSize() ;
-       ret = transaction->getStream().write(len);
-       if (ret != 8) {
-         return false;
-       }
-       if (flowFile->getSize()) {
-         Site2SiteClientProtocol::ReadCallback callback(packet);
-         session->read(flowFile, &callback);
-         if (flowFile->getSize() != packet->_size) {
-                 return false;
-         }
-       }
-  }
-  else if (packet->payload_.length() > 0) {
-       len = packet->payload_.length();
-
-       ret = transaction->getStream().write(len);
-       if (ret != 8) {
-         return false;
-       }
-
-       ret = transaction->getStream().writeData((uint8_t *) 
(packet->payload_.c_str()), len);
-       if (ret != len) {
-         return false;
-       }
-       packet->_size += len;
+    len = flowFile->getSize();
+    ret = transaction->getStream().write(len);
+    if (ret != 8) {
+      return false;
+    }
+    if (flowFile->getSize()) {
+      Site2SiteClientProtocol::ReadCallback callback(packet);
+      session->read(flowFile, &callback);
+      if (flowFile->getSize() != packet->_size) {
+        return false;
+      }
+    }
+  } else if (packet->payload_.length() > 0) {
+    len = packet->payload_.length();
+
+    ret = transaction->getStream().write(len);
+    if (ret != 8) {
+      return false;
+    }
+
+    ret = transaction->getStream().writeData(
+        reinterpret_cast<uint8_t *> (const_cast<char*> 
(packet->payload_.c_str())), len);
+    if (ret != len) {
+      return false;
+    }
+    packet->_size += len;
   }
 
   transaction->_transfers++;
@@ -796,7 +795,7 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
 }
 
 void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
-                                               core::ProcessSession *session) {
+    core::ProcessSession *session) {
   uint64_t bytes = 0;
   int transfers = 0;
   Transaction *transaction = NULL;
@@ -809,7 +808,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION,
-                    "Can not establish handshake with peer");
+        "Can not establish handshake with peer");
     return;
   }
 
@@ -826,7 +825,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
 
   try {
     while (true) {
-      std::map<std::string, std::string> empty;
+      std::map < std::string, std::string > empty;
       uint64_t startTime = getTimeMillis();
       std::string payload;
       DataPacket packet(this, transaction, empty, payload);
@@ -840,8 +839,8 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
         // transaction done
         break;
       }
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-          FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast
+          < FlowFileRecord > (session->create());
 
       if (!flowFile) {
         throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
@@ -870,8 +869,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
       std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host="
           + peer_->getHostName();
       session->getProvenanceReporter()->receive(flowFile, transitUri,
-                                                sourceIdentifier, details,
-                                                endTime - startTime);
+          sourceIdentifier, details, endTime - startTime);
       session->transfer(flowFile, relation);
       // receive the transfer for the flow record
       bytes += packet._size;
@@ -958,7 +956,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
     int64_t crcValue = transaction->getCRC();
     std::string crc = std::to_string(crcValue);
     logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s",
-                      transaction->getCRC(), transactionID.c_str());
+        transaction->getCRC(), transactionID.c_str());
     ret = writeRespond(CONFIRM_TRANSACTION, crc);
     if (ret <= 0)
       return false;
@@ -970,24 +968,24 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
 
     if (code == CONFIRM_TRANSACTION) {
       logger_->log_info("Site2Site transaction %s peer confirm transaction",
-                        transactionID.c_str());
+          transactionID.c_str());
       transaction->_state = TRANSACTION_CONFIRMED;
       return true;
     } else if (code == BAD_CHECKSUM) {
       logger_->log_info("Site2Site transaction %s peer indicate bad checksum",
-                        transactionID.c_str());
+          transactionID.c_str());
       /*
        transaction->_state = TRANSACTION_CONFIRMED;
        return true; */
       return false;
     } else {
       logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
-                        transactionID.c_str(), code);
+          transactionID.c_str(), code);
       return false;
     }
   } else {
     logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s",
-                      transactionID.c_str());
+        transactionID.c_str());
     ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
     if (ret <= 0)
       return false;
@@ -1007,7 +1005,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
         std::string crc = std::to_string(crcValue);
         if (message == crc) {
           logger_->log_info("Site2Site transaction %s CRC matched",
-                            transactionID.c_str());
+              transactionID.c_str());
           ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
           if (ret <= 0)
             return false;
@@ -1015,7 +1013,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
           return true;
         } else {
           logger_->log_info("Site2Site transaction %s CRC not matched %s",
-                            transactionID.c_str(), crc.c_str());
+              transactionID.c_str(), crc.c_str());
           ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
           /*
            ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
@@ -1033,7 +1031,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
       return true;
     } else {
       logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
-                        transactionID.c_str(), code);
+          transactionID.c_str(), code);
       return false;
     }
     return false;
@@ -1082,7 +1080,7 @@ void 
Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
   }
 
   logger_->log_info("Site2Site delete transaction %s",
-                    transaction->getUUIDStr().c_str());
+      transaction->getUUIDStr().c_str());
   delete transaction;
   _transactionMap.erase(transactionID);
 }
@@ -1136,7 +1134,7 @@ bool Site2SiteClientProtocol::complete(std::string 
transactionID) {
       return true;
     } else {
       logger_->log_info("Site2Site transaction %s send finished",
-                        transactionID.c_str());
+          transactionID.c_str());
       ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
       if (ret <= 0) {
         return false;
@@ -1157,21 +1155,21 @@ bool Site2SiteClientProtocol::complete(std::string 
transactionID) {
 
     if (code == TRANSACTION_FINISHED) {
       logger_->log_info("Site2Site transaction %s peer finished transaction",
-                        transactionID.c_str());
+          transactionID.c_str());
       transaction->_state = TRANSACTION_COMPLETED;
       return true;
     } else {
       logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
-                        transactionID.c_str(), code);
+          transactionID.c_str(), code);
       return false;
     }
   }
 }
 
 void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
-                                                core::ProcessSession *session) 
{
-  std::shared_ptr<FlowFileRecord> flow =
-      std::static_pointer_cast<FlowFileRecord>(session->get());
+    core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast
+      < FlowFileRecord > (session->get());
 
   Transaction *transaction = NULL;
 
@@ -1186,7 +1184,7 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION,
-                    "Can not establish handshake with peer");
+        "Can not establish handshake with peer");
     return;
   }
 
@@ -1215,20 +1213,20 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
         return;
       }
       logger_->log_info("Site2Site transaction %s send flow record %s",
-                        transactionID.c_str(), flow->getUUIDStr().c_str());
+          transactionID.c_str(), flow->getUUIDStr().c_str());
       uint64_t endTime = getTimeMillis();
       std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
       std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host="
           + peer_->getHostName();
       session->getProvenanceReporter()->send(flow, transitUri, details,
-                                             endTime - startTime, false);
+          endTime - startTime, false);
       session->remove(flow);
 
       uint64_t transferNanos = getTimeNano() - startSendingNanos;
       if (transferNanos > _batchSendNanos)
         break;
 
-      flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+      flow = std::static_pointer_cast < FlowFileRecord > (session->get());
 
       if (!flow) {
         continueTransaction = false;
@@ -1268,86 +1266,78 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
   return;
 }
 
-void Site2SiteClientProtocol::transferString(core::ProcessContext *context, 
core::ProcessSession *session, std::string &payload,
-               std::map<std::string, std::string> attributes)
-{
-       Transaction *transaction = NULL;
-
-       if (payload.length() <= 0)
-               return;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               context->yield();
-               tearDown();
-               throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
-               return;
-       }
-
-       // Create the transaction
-       std::string transactionID;
-       transaction = createTransaction(transactionID, SEND);
-
-       if (transaction == NULL)
-       {
-               context->yield();
-               tearDown();
-               throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
-               return;
-       }
-
-       try
-       {
-               DataPacket packet(this, transaction, attributes, payload);
-
-               if (!send(transactionID, &packet, nullptr, session))
-               {
-                       throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
-                       return;
-               }
-               logger_->log_info("Site2Site transaction %s send bytes length 
%d",
-                                                       transactionID.c_str(), 
payload.length());
-
-               if (!confirm(transactionID))
-               {
-                       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
-                       return;
-               }
-               if (!complete(transactionID))
-               {
-                       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
-                       return;
-               }
-               logger_->log_info("Site2Site transaction %s successfully send 
flow record %d, content bytes %d",
-                               transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
-       }
-       catch (std::exception &exception)
-       {
-               if (transaction)
-                       deleteTransaction(transactionID);
-               context->yield();
-               tearDown();
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               if (transaction)
-                       deleteTransaction(transactionID);
-               context->yield();
-               tearDown();
-               logger_->log_debug("Caught Exception during 
Site2SiteClientProtocol::transferBytes");
-               throw;
-       }
-
-       deleteTransaction(transactionID);
-
-       return;
+void Site2SiteClientProtocol::transferString(core::ProcessContext *context,
+    core::ProcessSession *session, std::string &payload,
+    std::map<std::string, std::string> attributes) {
+  Transaction *transaction = NULL;
+
+  if (payload.length() <= 0)
+    return;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION,
+        "Can not establish handshake with peer");
+    return;
+  }
+
+  // Create the transaction
+  std::string transactionID;
+  transaction = createTransaction(transactionID, SEND);
+
+  if (transaction == NULL) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+    return;
+  }
+
+  try {
+    DataPacket packet(this, transaction, attributes, payload);
+
+    if (!send(transactionID, &packet, nullptr, session)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+      return;
+    }
+    logger_->log_info("Site2Site transaction %s send bytes length %d",
+        transactionID.c_str(), payload.length());
+
+    if (!confirm(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+      return;
+    }
+    if (!complete(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+      return;
+    }
+    logger_->log_info(
+        "Site2Site transaction %s successfully send flow record %d, content 
bytes %d",
+        transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+  } catch (std::exception &exception) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug(
+        "Caught Exception during Site2SiteClientProtocol::transferBytes");
+    throw;
+  }
+
+  deleteTransaction(transactionID);
+
+  return;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ef1cd70..74636f7 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -29,6 +29,7 @@
 #include <thread>
 #include <memory>
 #include <functional>
+#include <utility>
 #include "Connection.h"
 #include "core/Connectable.h"
 #include "core/ProcessContext.h"
@@ -42,9 +43,8 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-Processor::Processor(std::string name, uuid_t uuid)
-    : Connectable(name, uuid),
-      ConfigurableComponent(logging::Logger::getLogger()) {
+Processor::Processor(std::string name, uuid_t uuid) :
+    Connectable(name, uuid), 
ConfigurableComponent(logging::Logger::getLogger()) {
   has_work_.store(false);
   // Setup the default values
   state_ = DISABLED;
@@ -62,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
   incoming_connections_Iter = this->_incomingConnections.begin();
   logger_ = logging::Logger::getLogger();
   logger_->log_info("Processor %s created UUID %s", name_.c_str(),
-                    uuidStr_.c_str());
+      uuidStr_.c_str());
 }
 
 bool Processor::isRunning() {
@@ -78,12 +78,12 @@ bool Processor::addConnection(std::shared_ptr<Connectable> 
conn) {
 
   if (isRunning()) {
     logger_->log_info("Can not add connection while the process %s is running",
-                      name_.c_str());
+        name_.c_str());
     return false;
   }
-  std::shared_ptr<Connection> connection = 
std::static_pointer_cast<Connection>(
-      conn);
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < 
Connection
+      > (conn);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
@@ -116,7 +116,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> 
conn) {
     auto &&it = _outGoingConnections.find(relationship);
     if (it != _outGoingConnections.end()) {
       // We already has connection for this relationship
-      std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
+      std::set < std::shared_ptr < Connectable >> existedConnection =
+          it->second;
       if (existedConnection.find(connection) == existedConnection.end()) {
         // We do not have the same connection for this relationship yet
         existedConnection.insert(connection);
@@ -129,7 +130,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> 
conn) {
       }
     } else {
       // We do not have any outgoing connection for this relationship yet
-      std::set<std::shared_ptr<Connectable>> newConnection;
+      std::set < std::shared_ptr < Connectable >> newConnection;
       newConnection.insert(connection);
       connection->setSource(shared_from_this());
       _outGoingConnections[relationship] = newConnection;
@@ -151,13 +152,13 @@ void 
Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     return;
   }
 
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
 
-  std::shared_ptr<Connection> connection = 
std::static_pointer_cast<Connection>(
-      conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < 
Connection
+      > (conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
@@ -193,65 +194,65 @@ void 
Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   }
 }
 
-std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol
-               (std::string host, uint16_t sport, uuid_t portId) {
-       std::lock_guard<std::mutex> lock(mutex_);
-
-       if (!protocols_created_) {
-               for (int i = 0; i < this->max_concurrent_tasks_; i++) {
-                       // create the protocol pool based on max threads allowed
-                       std::shared_ptr<Site2SiteClientProtocol> protocol (
-                                       new Site2SiteClientProtocol(0));
-                       protocols_created_ = true;
-                       protocol->setPortId(portId);
-                       
std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-                                       
std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
-                                           
org::apache::nifi::minifi::io::StreamFactory::getInstance()
-                                               ->createSocket(host, sport));
-                       std::unique_ptr<Site2SitePeer> peer_ = 
std::unique_ptr<Site2SitePeer>(
-                                       new Site2SitePeer(std::move(str), host, 
sport));
-                       protocol->setPeer(std::move(peer_));
-                       available_protocols_.push(protocol);
-               }
-       }
-       if (!available_protocols_.empty()) {
-               std::shared_ptr<Site2SiteClientProtocol> return_pointer = 
available_protocols_.top();
-               available_protocols_.pop();
-               return return_pointer;
-       }
-       else {
-               // create the protocol on demand if we exceed the pool
-               std::shared_ptr<Site2SiteClientProtocol> protocol (
-                                                       new 
Site2SiteClientProtocol(0));
-               protocol->setPortId(portId);
-               std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-                                                       
std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
-                                                           
org::apache::nifi::minifi::io::StreamFactory::getInstance()
-                                                               
->createSocket(host, sport));
-               std::unique_ptr<Site2SitePeer> peer_ = 
std::unique_ptr<Site2SitePeer>(
-                                                       new 
Site2SitePeer(std::move(str), host, sport));
-               protocol->setPeer(std::move(peer_));
-               return protocol;
-       }
+std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol(
+    std::string host, uint16_t sport, uuid_t portId) {
+  std::lock_guard < std::mutex > lock(mutex_);
+
+  if (!protocols_created_) {
+    for (int i = 0; i < this->max_concurrent_tasks_; i++) {
+      // create the protocol pool based on max threads allowed
+      std::shared_ptr<Site2SiteClientProtocol> protocol(
+          new Site2SiteClientProtocol(0));
+      protocols_created_ = true;
+      protocol->setPortId(portId);
+      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+          std::unique_ptr < org::apache::nifi::minifi::io::DataStream
+              > 
(org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket(
+                  host, sport));
+      std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
+          > (new Site2SitePeer(std::move(str), host, sport));
+      protocol->setPeer(std::move(peer_));
+      available_protocols_.push(protocol);
+    }
+  }
+  if (!available_protocols_.empty()) {
+    std::shared_ptr<Site2SiteClientProtocol> return_pointer =
+        available_protocols_.top();
+    available_protocols_.pop();
+    return return_pointer;
+  } else {
+    // create the protocol on demand if we exceed the pool
+    std::shared_ptr<Site2SiteClientProtocol> protocol(
+        new Site2SiteClientProtocol(0));
+    protocol->setPortId(portId);
+    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+        std::unique_ptr < org::apache::nifi::minifi::io::DataStream
+            > 
(org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket(
+                host, sport));
+    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
+        > (new Site2SitePeer(std::move(str), host, sport));
+    protocol->setPeer(std::move(peer_));
+    return protocol;
+  }
 }
 
-void 
Processor::returnSite2SiteProtocol(std::shared_ptr<Site2SiteClientProtocol> 
protocol)
-{
-       std::lock_guard<std::mutex> lock(mutex_);
-       if (protocol && available_protocols_.size() < max_concurrent_tasks_) {
-               available_protocols_.push(protocol);
-       }
+void Processor::returnSite2SiteProtocol(
+    std::shared_ptr<Site2SiteClientProtocol> protocol) {
+  std::lock_guard < std::mutex > lock(mutex_);
+  if (protocol && available_protocols_.size() < max_concurrent_tasks_) {
+    available_protocols_.push(protocol);
+  }
 }
 
 bool Processor::flowFilesQueued() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   if (_incomingConnections.size() == 0)
     return false;
 
   for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection =
-        std::static_pointer_cast<Connection>(conn);
+    std::shared_ptr<Connection> connection = std::static_pointer_cast
+        < Connection > (conn);
     if (connection->getQueueSize() > 0)
       return true;
   }
@@ -260,14 +261,15 @@ bool Processor::flowFilesQueued() {
 }
 
 bool Processor::flowFilesOutGoingFull() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   for (auto &&connection : _outGoingConnections) {
     // We already has connection for this relationship
-    std::set<std::shared_ptr<Connectable>> existedConnection = 
connection.second;
+    std::set < std::shared_ptr < Connectable >> existedConnection =
+        connection.second;
     for (const auto conn : existedConnection) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr < Connection > connection = std::static_pointer_cast
+          < Connection > (conn);
       if (connection->isFull())
         return true;
     }
@@ -277,7 +279,7 @@ bool Processor::flowFilesOutGoingFull() {
 }
 
 void Processor::onTrigger(ProcessContext *context,
-                          ProcessSessionFactory *sessionFactory) {
+    ProcessSessionFactory *sessionFactory) {
   auto session = sessionFactory->createSession();
 
   try {
@@ -301,8 +303,8 @@ bool Processor::isWorkAvailable() {
 
   try {
     for (const auto &conn : _incomingConnections) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast
+          < Connection > (conn);
       if (connection->getQueueSize() > 0) {
         hasWork = true;
         break;
@@ -311,7 +313,7 @@ bool Processor::isWorkAvailable() {
   } catch (...) {
     logger_->log_error(
         "Caught an exception while checking if work is available;"
-        " unless it was positively determined that work is available, assuming 
NO work is available!");
+            " unless it was positively determined that work is available, 
assuming NO work is available!");
   }
 
   return hasWork;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/provenance/ProvenanceTaskReport.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceTaskReport.cpp 
b/libminifi/src/provenance/ProvenanceTaskReport.cpp
index 4f16e4b..1908bb4 100644
--- a/libminifi/src/provenance/ProvenanceTaskReport.cpp
+++ b/libminifi/src/provenance/ProvenanceTaskReport.cpp
@@ -21,10 +21,9 @@
 #include <queue>
 #include <map>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
+#include <string>
+#include <memory>
 #include <sstream>
-#include <string.h>
 #include <iostream>
 
 #include "provenance/ProvenanceTaskReport.h"
@@ -43,151 +42,146 @@ namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-namespace provenance{
+namespace provenance {
 
-const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport");
-core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host 
Name.", "localhost");
+core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.",
+    "localhost");
 core::Property ProvenanceTaskReport::port("Port", "Remote Port", "9999");
-core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how 
many records to send in a single batch, at most.", "100");
-core::Property ProvenanceTaskReport::portUUID("Port UUID", "Specifies remote 
NiFi Port UUID.", "");
+core::Property ProvenanceTaskReport::batchSize("Batch Size",
+    "Specifies how many records to send in a single batch, at most.", "100");
+core::Property ProvenanceTaskReport::portUUID("Port UUID",
+    "Specifies remote NiFi Port UUID.", "");
 core::Relationship ProvenanceTaskReport::relation;
 const char *ProvenanceTaskReport::ProvenanceAppStr = "MiNiFi Flow";
 
-void ProvenanceTaskReport::initialize()
-{
-       //! Set the supported properties
-       std::set<core::Property> properties;
-       properties.insert(hostName);
-       properties.insert(port);
-       properties.insert(batchSize);
-       properties.insert(portUUID);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<core::Relationship> relationships;
-       relationships.insert(relation);
-       setSupportedRelationships(relationships);
+void ProvenanceTaskReport::initialize() {
+  //! Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(hostName);
+  properties.insert(port);
+  properties.insert(batchSize);
+  properties.insert(portUUID);
+  setSupportedProperties(properties);
+  //! Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(relation);
+  setSupportedRelationships(relationships);
 }
 
-void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session)
-{
-       std::string value;
-       int64_t lvalue;
-       std::string host = "";
-       uint16_t sport = 0;
-
-       if (context->getProperty(hostName.getName(), value)) {
-         host = value;
-       }
-       if (context->getProperty(port.getName(), value)
-           && core::Property::StringToInt(value, lvalue)) {
-         sport = (uint16_t) lvalue;
-       }
-       if (context->getProperty(portUUID.getName(), value)) {
-         uuid_parse(value.c_str(), protocol_uuid_);
-       }
-
-       std::shared_ptr<Site2SiteClientProtocol> protocol_ = 
this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
-       
-       if (!protocol_)
-       {
-               context->yield();
-               return;
-       }
-
-       if (!protocol_->bootstrap())
-       {
-           // bootstrap the client protocol if needeed
-           context->yield();
-           std::shared_ptr<Processor> processor = 
std::static_pointer_cast<Processor>(
-               context->getProcessorNode().getProcessor());
-           logger_->log_error("Site2Site bootstrap failed yield period %d peer 
",
-                              processor->getYieldPeriodMsec());
-           returnSite2SiteProtocol(protocol_);
-           return;
-       }
-
-       int64_t batch = 100;
-
-       if (context->getProperty(batchSize.getName(), value) && 
core::Property::StringToInt(value, lvalue))
-       {
-               batch = lvalue;
-       }
-       
-       std::vector<std::shared_ptr<ProvenanceEventRecord>> records;
-       std::shared_ptr<ProvenanceRepository> repo = 
std::static_pointer_cast<ProvenanceRepository> 
(context->getProvenanceRepository());
-
-       repo->getProvenanceRecord(records, batch);
-
-       if (records.size() <= 0)
-       {
-               returnSite2SiteProtocol(protocol_);
-               return;
-       }
-
-       Json::Value array;
-       for (auto record : records)
-       {
-               Json::Value recordJson;
-               Json::Value updatedAttributesJson;
-               Json::Value parentUuidJson;
-               Json::Value childUuidJson;
-               recordJson["eventId"] = record->getEventId().c_str();
-               recordJson["eventType"] = 
ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
-               recordJson["timestampMillis"] = record->getEventTime();
-               recordJson["durationMillis"] = record->getEventDuration();
-               recordJson["lineageStart"] = record->getlineageStartDate();
-               recordJson["details"] = record->getDetails().c_str();
-               recordJson["componentId"] = record->getComponentId().c_str();
-               recordJson["componentType"] = 
record->getComponentType().c_str();
-               recordJson["entityId"] = record->getFlowFileUuid().c_str();
-               recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
-               recordJson["entitySize"] = record->getFileSize();
-               recordJson["entityOffset"] = record->getFileOffset();
-
-               for (auto attr : record->getAttributes())
-               {
-                       updatedAttributesJson[attr.first] = attr.second;
-               }
-               recordJson["updatedAttributes"] = updatedAttributesJson;
-
-               for (auto parentUUID : record->getParentUuids())
-               {
-                       parentUuidJson.append(parentUUID.c_str());
-               }
-               recordJson["parentIds"] = parentUuidJson;
-
-               for (auto childUUID : record->getChildrenUuids())
-               {
-                       childUuidJson.append(childUUID.c_str());
-               }
-               recordJson["childIds"] = childUuidJson;
-               recordJson["transitUri"] = record->getTransitUri().c_str();
-               recordJson["remoteIdentifier"] = 
record->getSourceSystemFlowFileIdentifier().c_str();
-               recordJson["alternateIdentifier"] = 
record->getAlternateIdentifierUri().c_str();
-               recordJson["application"] = ProvenanceAppStr;
-               array.append(recordJson);
-       }
-
-       Json::StyledWriter writer;
-       std::string jsonStr = writer.write(array);
-
-       try
-       {
-               std::map<std::string, std::string> attributes;
-               protocol_->transferString(context, session, jsonStr, 
attributes);
-       }
-       catch (...)
-       {
-               // if transfer bytes failed, return instead of purge the 
provenance records
-               returnSite2SiteProtocol(protocol_);
-               return;
-       }
-
-       // we transfer the record, purge the record from DB
-       repo->purgeProvenanceRecord(records);
-
-       returnSite2SiteProtocol(protocol_);
-
+void ProvenanceTaskReport::onTrigger(core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string value;
+  int64_t lvalue;
+  std::string host = "";
+  uint16_t sport = 0;
+
+  if (context->getProperty(hostName.getName(), value)) {
+    host = value;
+  }
+  if (context->getProperty(port.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    sport = (uint16_t) lvalue;
+  }
+  if (context->getProperty(portUUID.getName(), value)) {
+    uuid_parse(value.c_str(), protocol_uuid_);
+  }
+
+  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
+      this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
+
+  if (!protocol_) {
+    context->yield();
+    return;
+  }
+
+  if (!protocol_->bootstrap()) {
+    // bootstrap the client protocol if needeed
+    context->yield();
+    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
+        > (context->getProcessorNode().getProcessor());
+    logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
+        processor->getYieldPeriodMsec());
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  int64_t batch = 100;
+
+  if (context->getProperty(batchSize.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    batch = lvalue;
+  }
+
+  std::vector < std::shared_ptr < ProvenanceEventRecord >> records;
+  std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast
+      < ProvenanceRepository > (context->getProvenanceRepository());
+
+  repo->getProvenanceRecord(records, batch);
+
+  if (records.size() <= 0) {
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  Json::Value array;
+  for (auto record : records) {
+    Json::Value recordJson;
+    Json::Value updatedAttributesJson;
+    Json::Value parentUuidJson;
+    Json::Value childUuidJson;
+    recordJson["eventId"] = record->getEventId().c_str();
+    recordJson["eventType"] =
+        ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
+    recordJson["timestampMillis"] = record->getEventTime();
+    recordJson["durationMillis"] = record->getEventDuration();
+    recordJson["lineageStart"] = record->getlineageStartDate();
+    recordJson["details"] = record->getDetails().c_str();
+    recordJson["componentId"] = record->getComponentId().c_str();
+    recordJson["componentType"] = record->getComponentType().c_str();
+    recordJson["entityId"] = record->getFlowFileUuid().c_str();
+    recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
+    recordJson["entitySize"] = record->getFileSize();
+    recordJson["entityOffset"] = record->getFileOffset();
+
+    for (auto attr : record->getAttributes()) {
+      updatedAttributesJson[attr.first] = attr.second;
+    }
+    recordJson["updatedAttributes"] = updatedAttributesJson;
+
+    for (auto parentUUID : record->getParentUuids()) {
+      parentUuidJson.append(parentUUID.c_str());
+    }
+    recordJson["parentIds"] = parentUuidJson;
+
+    for (auto childUUID : record->getChildrenUuids()) {
+      childUuidJson.append(childUUID.c_str());
+    }
+    recordJson["childIds"] = childUuidJson;
+    recordJson["transitUri"] = record->getTransitUri().c_str();
+    recordJson["remoteIdentifier"] =
+        record->getSourceSystemFlowFileIdentifier().c_str();
+    recordJson["alternateIdentifier"] =
+        record->getAlternateIdentifierUri().c_str();
+    recordJson["application"] = ProvenanceAppStr;
+    array.append(recordJson);
+  }
+
+  Json::StyledWriter writer;
+  std::string jsonStr = writer.write(array);
+
+  try {
+    std::map < std::string, std::string > attributes;
+    protocol_->transferString(context, session, jsonStr, attributes);
+  } catch (...) {
+    // if transfer bytes failed, return instead of purge the provenance records
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  // we transfer the record, purge the record from DB
+  repo->purgeProvenanceRecord(records);
+
+  returnSite2SiteProtocol(protocol_);
 }
 
 } /* namespace provenance */

Reply via email to