MINIFI-227: Provenance report (linter)
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/770932a0 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/770932a0 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/770932a0 Branch: refs/heads/MINIFI-227 Commit: 770932a03862175c7cd0556d06b9a2edd0c600dc Parents: ca5bc5a Author: Bin Qiu <benqiu2...@gmail.com> Authored: Thu Apr 6 15:57:26 2017 -0700 Committer: Bin Qiu <benqiu2...@gmail.com> Committed: Thu Apr 6 15:57:26 2017 -0700 ---------------------------------------------------------------------- .../include/provenance/ProvenanceTaskReport.h | 2 +- libminifi/src/RemoteProcessorGroupPort.cpp | 30 +- libminifi/src/Site2SiteClientProtocol.cpp | 496 +++++++++---------- libminifi/src/core/Processor.cpp | 142 +++--- .../src/provenance/ProvenanceTaskReport.cpp | 276 +++++------ 5 files changed, 466 insertions(+), 480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/include/provenance/ProvenanceTaskReport.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceTaskReport.h b/libminifi/include/provenance/ProvenanceTaskReport.h index 1d9d507..4123593 100644 --- a/libminifi/include/provenance/ProvenanceTaskReport.h +++ b/libminifi/include/provenance/ProvenanceTaskReport.h @@ -52,7 +52,7 @@ public: } //! Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "ProvenanceTaskReport"; //! Supported Properties static core::Property hostName; static core::Property port; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 9e40824..8ee3680 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -44,10 +44,10 @@ namespace minifi { const std::string RemoteProcessorGroupPort::ProcessorName( "RemoteProcessorGroupPort"); core::Property RemoteProcessorGroupPort::hostName("Host Name", - "Remote Host Name.", - "localhost"); + "Remote Host Name.", "localhost"); core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); -core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", ""); +core::Property RemoteProcessorGroupPort::portUUID("Port UUID", + "Specifies remote NiFi Port UUID.", ""); core::Relationship RemoteProcessorGroupPort::relation; void RemoteProcessorGroupPort::initialize() { @@ -64,7 +64,7 @@ void RemoteProcessorGroupPort::initialize() { } void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { + core::ProcessSession *session) { if (!transmitting_) return; @@ -74,31 +74,31 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, uint16_t sport = 0; if (context->getProperty(hostName.getName(), value)) { - host = value; + host = value; } if (context->getProperty(port.getName(), value) - && core::Property::StringToInt(value, lvalue)) { - sport = (uint16_t) lvalue; + && core::Property::StringToInt(value, lvalue)) { + sport = (uint16_t) lvalue; } if (context->getProperty(portUUID.getName(), value)) { - uuid_parse(value.c_str(), protocol_uuid_); + uuid_parse(value.c_str(), protocol_uuid_); } - std::shared_ptr<Site2SiteClientProtocol> protocol_ = this->obtainSite2SiteProtocol(host, sport, protocol_uuid_); + std::shared_ptr<Site2SiteClientProtocol> protocol_ = + this->obtainSite2SiteProtocol(host, sport, protocol_uuid_); - if (!protocol_) - { + if (!protocol_) { context->yield(); - return; + return; } if (!protocol_->bootstrap()) { // bootstrap the client protocol if needeed context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( - context->getProcessorNode().getProcessor()); + std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor + > (context->getProcessorNode().getProcessor()); logger_->log_error("Site2Site bootstrap failed yield period %d peer ", - processor->getYieldPeriodMsec()); + processor->getYieldPeriodMsec()); returnSite2SiteProtocol(protocol_); return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index 9b4d307..fbde8a0 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -106,39 +106,39 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() { } logger_->log_info("status code is %i", statusCode); switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - // tearDown(); - return false; - } - logger_->log_info( - "Site2Site Server Response asked for a different protocol version %d", - serverVersion); - for (unsigned int i = (_currentVersionIndex + 1); - i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedVersion[i]) { - _currentVersion = _supportedVersion[i]; - _currentVersionIndex = i; - return initiateResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Negotiate protocol response ABORT"); - ret = -1; + case RESOURCE_OK: + logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { // tearDown(); return false; - default: - logger_->log_info("Negotiate protocol response unknown code %d", - statusCode); - return true; + } + logger_->log_info( + "Site2Site Server Response asked for a different protocol version %d", + serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); + i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedVersion[i]) { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate protocol response unknown code %d", + statusCode); + return true; } return true; @@ -181,38 +181,38 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { } switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Codec Negotiate version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - // tearDown(); - return false; - } - logger_->log_info( - "Site2Site Server Response asked for a different codec version %d", - serverVersion); - for (unsigned int i = (_currentCodecVersionIndex + 1); - i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedCodecVersion[i]) { - _currentCodecVersion = _supportedCodecVersion[i]; - _currentCodecVersionIndex = i; - return initiateCodecResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Codec Negotiate response ABORT"); - ret = -1; + case RESOURCE_OK: + logger_->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { // tearDown(); return false; - default: - logger_->log_info("Negotiate Codec response unknown code %d", statusCode); - return true; + } + logger_->log_info( + "Site2Site Server Response asked for a different codec version %d", + serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); + i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedCodecVersion[i]) { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; } return true; @@ -241,7 +241,7 @@ bool Site2SiteClientProtocol::handShake() { return false; } - std::map<std::string, std::string> properties; + std::map < std::string, std::string > properties; properties[HandShakePropertyStr[GZIP]] = "false"; properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string( @@ -286,7 +286,7 @@ bool Site2SiteClientProtocol::handShake() { return false; } logger_->log_info("Site2Site Protocol Send handshake properties %s %s", - it->first.c_str(), it->second.c_str()); + it->first.c_str(), it->second.c_str()); } RespondCode code; @@ -300,28 +300,28 @@ bool Site2SiteClientProtocol::handShake() { } switch (code) { - case PROPERTIES_OK: - logger_->log_info("Site2Site HandShake Completed"); - _peerState = HANDSHAKED; - return true; - case PORT_NOT_IN_VALID_STATE: - case UNKNOWN_PORT: - case PORTS_DESTINATION_FULL: - logger_->log_error( - "Site2Site HandShake Failed because destination port is either invalid or full"); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; - default: - logger_->log_info("HandShake Failed because of unknown respond code %d", - code); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; + case PROPERTIES_OK: + logger_->log_info("Site2Site HandShake Completed"); + _peerState = HANDSHAKED; + return true; + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + logger_->log_error( + "Site2Site HandShake Failed because destination port is either invalid or full"); + ret = -1; + /* + peer_->yield(); + tearDown(); */ + return false; + default: + logger_->log_info("HandShake Failed because of unknown respond code %d", + code); + ret = -1; + /* + peer_->yield(); + tearDown(); */ + return false; } return false; @@ -369,7 +369,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType &type) { } int Site2SiteClientProtocol::readRespond(RespondCode &code, - std::string &message) { + std::string &message) { uint8_t firstByte; int ret = peer_->read(firstByte); @@ -408,7 +408,7 @@ int Site2SiteClientProtocol::readRespond(RespondCode &code, } int Site2SiteClientProtocol::writeRespond(RespondCode code, - std::string message) { + std::string message) { RespondCodeContext *resCode = this->getRespondCodeContext(code); if (resCode == NULL) { @@ -525,31 +525,31 @@ Transaction* Site2SiteClientProtocol::createTransaction( org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream( peer_.get()); switch (code) { - case MORE_DATA: - dataAvailable = true; - logger_->log_info("Site2Site peer indicates that data is available"); - transaction = new Transaction(direction, crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", - transaction->getUUIDStr().c_str()); - return transaction; - case NO_MORE_DATA: - dataAvailable = false; - logger_->log_info("Site2Site peer indicates that no data is available"); - transaction = new Transaction(direction, crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", - transaction->getUUIDStr().c_str()); - return transaction; - default: - logger_->log_info( - "Site2Site got unexpected response %d when asking for data", code); - // tearDown(); - return NULL; + case MORE_DATA: + dataAvailable = true; + logger_->log_info("Site2Site peer indicates that data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", + transaction->getUUIDStr().c_str()); + return transaction; + case NO_MORE_DATA: + dataAvailable = false; + logger_->log_info("Site2Site peer indicates that no data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", + transaction->getUUIDStr().c_str()); + return transaction; + default: + logger_->log_info( + "Site2Site got unexpected response %d when asking for data", code); + // tearDown(); + return NULL; } } else { ret = writeRequestType(SEND_FLOWFILES); @@ -564,14 +564,14 @@ Transaction* Site2SiteClientProtocol::createTransaction( _transactionMap[transaction->getUUIDStr()] = transaction; transactionID = transaction->getUUIDStr(); logger_->log_info("Site2Site create transaction %s", - transaction->getUUIDStr().c_str()); + transaction->getUUIDStr().c_str()); return transaction; } } } bool Site2SiteClientProtocol::receive(std::string transactionID, - DataPacket *packet, bool &eof) { + DataPacket *packet, bool &eof) { int ret; Transaction *transaction = NULL; @@ -602,7 +602,7 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, if (transaction->getDirection() != RECEIVE) { logger_->log_info("Site2Site transaction %s direction is wrong", - transactionID.c_str()); + transactionID.c_str()); return false; } @@ -687,9 +687,8 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, } bool Site2SiteClientProtocol::send(std::string transactionID, - DataPacket *packet, - std::shared_ptr<FlowFileRecord> flowFile, - core::ProcessSession *session) { + DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, + core::ProcessSession *session) { int ret; Transaction *transaction = NULL; @@ -720,7 +719,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, if (transaction->getDirection() != SEND) { logger_->log_info("Site2Site transaction %s direction is wrong", - transactionID.c_str()); + transactionID.c_str()); return false; } @@ -751,38 +750,38 @@ bool Site2SiteClientProtocol::send(std::string transactionID, return false; } logger_->log_info("Site2Site transaction %s send attribute key %s value %s", - transactionID.c_str(), itAttribute->first.c_str(), - itAttribute->second.c_str()); + transactionID.c_str(), itAttribute->first.c_str(), + itAttribute->second.c_str()); } uint64_t len = 0; if (flowFile) { - len = flowFile->getSize() ; - ret = transaction->getStream().write(len); - if (ret != 8) { - return false; - } - if (flowFile->getSize()) { - Site2SiteClientProtocol::ReadCallback callback(packet); - session->read(flowFile, &callback); - if (flowFile->getSize() != packet->_size) { - return false; - } - } - } - else if (packet->payload_.length() > 0) { - len = packet->payload_.length(); - - ret = transaction->getStream().write(len); - if (ret != 8) { - return false; - } - - ret = transaction->getStream().writeData((uint8_t *) (packet->payload_.c_str()), len); - if (ret != len) { - return false; - } - packet->_size += len; + len = flowFile->getSize(); + ret = transaction->getStream().write(len); + if (ret != 8) { + return false; + } + if (flowFile->getSize()) { + Site2SiteClientProtocol::ReadCallback callback(packet); + session->read(flowFile, &callback); + if (flowFile->getSize() != packet->_size) { + return false; + } + } + } else if (packet->payload_.length() > 0) { + len = packet->payload_.length(); + + ret = transaction->getStream().write(len); + if (ret != 8) { + return false; + } + + ret = transaction->getStream().writeData( + reinterpret_cast<uint8_t *> (const_cast<char*> (packet->payload_.c_str())), len); + if (ret != len) { + return false; + } + packet->_size += len; } transaction->_transfers++; @@ -796,7 +795,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, } void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, - core::ProcessSession *session) { + core::ProcessSession *session) { uint64_t bytes = 0; int transfers = 0; Transaction *transaction = NULL; @@ -809,7 +808,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, - "Can not establish handshake with peer"); + "Can not establish handshake with peer"); return; } @@ -826,7 +825,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, try { while (true) { - std::map<std::string, std::string> empty; + std::map < std::string, std::string > empty; uint64_t startTime = getTimeMillis(); std::string payload; DataPacket packet(this, transaction, empty, payload); @@ -840,8 +839,8 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, // transaction done break; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast + < FlowFileRecord > (session->create()); if (!flowFile) { throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); @@ -870,8 +869,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName(); session->getProvenanceReporter()->receive(flowFile, transitUri, - sourceIdentifier, details, - endTime - startTime); + sourceIdentifier, details, endTime - startTime); session->transfer(flowFile, relation); // receive the transfer for the flow record bytes += packet._size; @@ -958,7 +956,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { int64_t crcValue = transaction->getCRC(); std::string crc = std::to_string(crcValue); logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", - transaction->getCRC(), transactionID.c_str()); + transaction->getCRC(), transactionID.c_str()); ret = writeRespond(CONFIRM_TRANSACTION, crc); if (ret <= 0) return false; @@ -970,24 +968,24 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { if (code == CONFIRM_TRANSACTION) { logger_->log_info("Site2Site transaction %s peer confirm transaction", - transactionID.c_str()); + transactionID.c_str()); transaction->_state = TRANSACTION_CONFIRMED; return true; } else if (code == BAD_CHECKSUM) { logger_->log_info("Site2Site transaction %s peer indicate bad checksum", - transactionID.c_str()); + transactionID.c_str()); /* transaction->_state = TRANSACTION_CONFIRMED; return true; */ return false; } else { logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); + transactionID.c_str(), code); return false; } } else { logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", - transactionID.c_str()); + transactionID.c_str()); ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); if (ret <= 0) return false; @@ -1007,7 +1005,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { std::string crc = std::to_string(crcValue); if (message == crc) { logger_->log_info("Site2Site transaction %s CRC matched", - transactionID.c_str()); + transactionID.c_str()); ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); if (ret <= 0) return false; @@ -1015,7 +1013,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { return true; } else { logger_->log_info("Site2Site transaction %s CRC not matched %s", - transactionID.c_str(), crc.c_str()); + transactionID.c_str(), crc.c_str()); ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); /* ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); @@ -1033,7 +1031,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { return true; } else { logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); + transactionID.c_str(), code); return false; } return false; @@ -1082,7 +1080,7 @@ void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) { } logger_->log_info("Site2Site delete transaction %s", - transaction->getUUIDStr().c_str()); + transaction->getUUIDStr().c_str()); delete transaction; _transactionMap.erase(transactionID); } @@ -1136,7 +1134,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { return true; } else { logger_->log_info("Site2Site transaction %s send finished", - transactionID.c_str()); + transactionID.c_str()); ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); if (ret <= 0) { return false; @@ -1157,21 +1155,21 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { if (code == TRANSACTION_FINISHED) { logger_->log_info("Site2Site transaction %s peer finished transaction", - transactionID.c_str()); + transactionID.c_str()); transaction->_state = TRANSACTION_COMPLETED; return true; } else { logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); + transactionID.c_str(), code); return false; } } } void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, - core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flow = - std::static_pointer_cast<FlowFileRecord>(session->get()); + core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast + < FlowFileRecord > (session->get()); Transaction *transaction = NULL; @@ -1186,7 +1184,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, - "Can not establish handshake with peer"); + "Can not establish handshake with peer"); return; } @@ -1215,20 +1213,20 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, return; } logger_->log_info("Site2Site transaction %s send flow record %s", - transactionID.c_str(), flow->getUUIDStr().c_str()); + transactionID.c_str(), flow->getUUIDStr().c_str()); uint64_t endTime = getTimeMillis(); std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr(); std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName(); session->getProvenanceReporter()->send(flow, transitUri, details, - endTime - startTime, false); + endTime - startTime, false); session->remove(flow); uint64_t transferNanos = getTimeNano() - startSendingNanos; if (transferNanos > _batchSendNanos) break; - flow = std::static_pointer_cast<FlowFileRecord>(session->get()); + flow = std::static_pointer_cast < FlowFileRecord > (session->get()); if (!flow) { continueTransaction = false; @@ -1268,86 +1266,78 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, return; } -void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, - std::map<std::string, std::string> attributes) -{ - Transaction *transaction = NULL; - - if (payload.length() <= 0) - return; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; - } - - // Create the transaction - std::string transactionID; - transaction = createTransaction(transactionID, SEND); - - if (transaction == NULL) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; - } - - try - { - DataPacket packet(this, transaction, attributes, payload); - - if (!send(transactionID, &packet, nullptr, session)) - { - throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); - return; - } - logger_->log_info("Site2Site transaction %s send bytes length %d", - transactionID.c_str(), payload.length()); - - if (!confirm(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); - return; - } - if (!complete(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); - return; - } - logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", - transactionID.c_str(), transaction->_transfers, transaction->_bytes); - } - catch (std::exception &exception) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - logger_->log_debug("Caught Exception during Site2SiteClientProtocol::transferBytes"); - throw; - } - - deleteTransaction(transactionID); - - return; +void Site2SiteClientProtocol::transferString(core::ProcessContext *context, + core::ProcessSession *session, std::string &payload, + std::map<std::string, std::string> attributes) { + Transaction *transaction = NULL; + + if (payload.length() <= 0) + return; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, + "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, SEND); + + if (transaction == NULL) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + try { + DataPacket packet(this, transaction, attributes, payload); + + if (!send(transactionID, &packet, nullptr, session)) { + throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); + return; + } + logger_->log_info("Site2Site transaction %s send bytes length %d", + transactionID.c_str(), payload.length()); + + if (!confirm(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); + return; + } + if (!complete(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); + return; + } + logger_->log_info( + "Site2Site transaction %s successfully send flow record %d, content bytes %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + } catch (std::exception &exception) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug( + "Caught Exception during Site2SiteClientProtocol::transferBytes"); + throw; + } + + deleteTransaction(transactionID); + + return; } } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index ef1cd70..74636f7 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -29,6 +29,7 @@ #include <thread> #include <memory> #include <functional> +#include <utility> #include "Connection.h" #include "core/Connectable.h" #include "core/ProcessContext.h" @@ -42,9 +43,8 @@ namespace nifi { namespace minifi { namespace core { -Processor::Processor(std::string name, uuid_t uuid) - : Connectable(name, uuid), - ConfigurableComponent(logging::Logger::getLogger()) { +Processor::Processor(std::string name, uuid_t uuid) : + Connectable(name, uuid), ConfigurableComponent(logging::Logger::getLogger()) { has_work_.store(false); // Setup the default values state_ = DISABLED; @@ -62,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid) incoming_connections_Iter = this->_incomingConnections.begin(); logger_ = logging::Logger::getLogger(); logger_->log_info("Processor %s created UUID %s", name_.c_str(), - uuidStr_.c_str()); + uuidStr_.c_str()); } bool Processor::isRunning() { @@ -78,12 +78,12 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { if (isRunning()) { logger_->log_info("Can not add connection while the process %s is running", - name_.c_str()); + name_.c_str()); return false; } - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>( - conn); - std::lock_guard<std::mutex> lock(mutex_); + std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection + > (conn); + std::lock_guard < std::mutex > lock(mutex_); uuid_t srcUUID; uuid_t destUUID; @@ -116,7 +116,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { auto &&it = _outGoingConnections.find(relationship); if (it != _outGoingConnections.end()) { // We already has connection for this relationship - std::set<std::shared_ptr<Connectable>> existedConnection = it->second; + std::set < std::shared_ptr < Connectable >> existedConnection = + it->second; if (existedConnection.find(connection) == existedConnection.end()) { // We do not have the same connection for this relationship yet existedConnection.insert(connection); @@ -129,7 +130,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { } } else { // We do not have any outgoing connection for this relationship yet - std::set<std::shared_ptr<Connectable>> newConnection; + std::set < std::shared_ptr < Connectable >> newConnection; newConnection.insert(connection); connection->setSource(shared_from_this()); _outGoingConnections[relationship] = newConnection; @@ -151,13 +152,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { return; } - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); uuid_t srcUUID; uuid_t destUUID; - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>( - conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection + > (conn); connection->getSourceUUID(srcUUID); connection->getDestinationUUID(destUUID); @@ -193,65 +194,65 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { } } -std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol - (std::string host, uint16_t sport, uuid_t portId) { - std::lock_guard<std::mutex> lock(mutex_); - - if (!protocols_created_) { - for (int i = 0; i < this->max_concurrent_tasks_; i++) { - // create the protocol pool based on max threads allowed - std::shared_ptr<Site2SiteClientProtocol> protocol ( - new Site2SiteClientProtocol(0)); - protocols_created_ = true; - protocol->setPortId(portId); - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = - std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( - org::apache::nifi::minifi::io::StreamFactory::getInstance() - ->createSocket(host, sport)); - std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>( - new Site2SitePeer(std::move(str), host, sport)); - protocol->setPeer(std::move(peer_)); - available_protocols_.push(protocol); - } - } - if (!available_protocols_.empty()) { - std::shared_ptr<Site2SiteClientProtocol> return_pointer = available_protocols_.top(); - available_protocols_.pop(); - return return_pointer; - } - else { - // create the protocol on demand if we exceed the pool - std::shared_ptr<Site2SiteClientProtocol> protocol ( - new Site2SiteClientProtocol(0)); - protocol->setPortId(portId); - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = - std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( - org::apache::nifi::minifi::io::StreamFactory::getInstance() - ->createSocket(host, sport)); - std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>( - new Site2SitePeer(std::move(str), host, sport)); - protocol->setPeer(std::move(peer_)); - return protocol; - } +std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol( + std::string host, uint16_t sport, uuid_t portId) { + std::lock_guard < std::mutex > lock(mutex_); + + if (!protocols_created_) { + for (int i = 0; i < this->max_concurrent_tasks_; i++) { + // create the protocol pool based on max threads allowed + std::shared_ptr<Site2SiteClientProtocol> protocol( + new Site2SiteClientProtocol(0)); + protocols_created_ = true; + protocol->setPortId(portId); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = + std::unique_ptr < org::apache::nifi::minifi::io::DataStream + > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket( + host, sport)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer + > (new Site2SitePeer(std::move(str), host, sport)); + protocol->setPeer(std::move(peer_)); + available_protocols_.push(protocol); + } + } + if (!available_protocols_.empty()) { + std::shared_ptr<Site2SiteClientProtocol> return_pointer = + available_protocols_.top(); + available_protocols_.pop(); + return return_pointer; + } else { + // create the protocol on demand if we exceed the pool + std::shared_ptr<Site2SiteClientProtocol> protocol( + new Site2SiteClientProtocol(0)); + protocol->setPortId(portId); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = + std::unique_ptr < org::apache::nifi::minifi::io::DataStream + > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket( + host, sport)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer + > (new Site2SitePeer(std::move(str), host, sport)); + protocol->setPeer(std::move(peer_)); + return protocol; + } } -void Processor::returnSite2SiteProtocol(std::shared_ptr<Site2SiteClientProtocol> protocol) -{ - std::lock_guard<std::mutex> lock(mutex_); - if (protocol && available_protocols_.size() < max_concurrent_tasks_) { - available_protocols_.push(protocol); - } +void Processor::returnSite2SiteProtocol( + std::shared_ptr<Site2SiteClientProtocol> protocol) { + std::lock_guard < std::mutex > lock(mutex_); + if (protocol && available_protocols_.size() < max_concurrent_tasks_) { + available_protocols_.push(protocol); + } } bool Processor::flowFilesQueued() { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); if (_incomingConnections.size() == 0) return false; for (auto &&conn : _incomingConnections) { - std::shared_ptr<Connection> connection = - std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast + < Connection > (conn); if (connection->getQueueSize() > 0) return true; } @@ -260,14 +261,15 @@ bool Processor::flowFilesQueued() { } bool Processor::flowFilesOutGoingFull() { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); for (auto &&connection : _outGoingConnections) { // We already has connection for this relationship - std::set<std::shared_ptr<Connectable>> existedConnection = connection.second; + std::set < std::shared_ptr < Connectable >> existedConnection = + connection.second; for (const auto conn : existedConnection) { - std::shared_ptr<Connection> connection = std::static_pointer_cast< - Connection>(conn); + std::shared_ptr < Connection > connection = std::static_pointer_cast + < Connection > (conn); if (connection->isFull()) return true; } @@ -277,7 +279,7 @@ bool Processor::flowFilesOutGoingFull() { } void Processor::onTrigger(ProcessContext *context, - ProcessSessionFactory *sessionFactory) { + ProcessSessionFactory *sessionFactory) { auto session = sessionFactory->createSession(); try { @@ -301,8 +303,8 @@ bool Processor::isWorkAvailable() { try { for (const auto &conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast< - Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast + < Connection > (conn); if (connection->getQueueSize() > 0) { hasWork = true; break; @@ -311,7 +313,7 @@ bool Processor::isWorkAvailable() { } catch (...) { logger_->log_error( "Caught an exception while checking if work is available;" - " unless it was positively determined that work is available, assuming NO work is available!"); + " unless it was positively determined that work is available, assuming NO work is available!"); } return hasWork; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/770932a0/libminifi/src/provenance/ProvenanceTaskReport.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceTaskReport.cpp b/libminifi/src/provenance/ProvenanceTaskReport.cpp index 4f16e4b..1908bb4 100644 --- a/libminifi/src/provenance/ProvenanceTaskReport.cpp +++ b/libminifi/src/provenance/ProvenanceTaskReport.cpp @@ -21,10 +21,9 @@ #include <queue> #include <map> #include <set> -#include <sys/time.h> -#include <time.h> +#include <string> +#include <memory> #include <sstream> -#include <string.h> #include <iostream> #include "provenance/ProvenanceTaskReport.h" @@ -43,151 +42,146 @@ namespace org { namespace apache { namespace nifi { namespace minifi { -namespace provenance{ +namespace provenance { -const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport"); -core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost"); +core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", + "localhost"); core::Property ProvenanceTaskReport::port("Port", "Remote Port", "9999"); -core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100"); -core::Property ProvenanceTaskReport::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", ""); +core::Property ProvenanceTaskReport::batchSize("Batch Size", + "Specifies how many records to send in a single batch, at most.", "100"); +core::Property ProvenanceTaskReport::portUUID("Port UUID", + "Specifies remote NiFi Port UUID.", ""); core::Relationship ProvenanceTaskReport::relation; const char *ProvenanceTaskReport::ProvenanceAppStr = "MiNiFi Flow"; -void ProvenanceTaskReport::initialize() -{ - //! Set the supported properties - std::set<core::Property> properties; - properties.insert(hostName); - properties.insert(port); - properties.insert(batchSize); - properties.insert(portUUID); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(relation); - setSupportedRelationships(relationships); +void ProvenanceTaskReport::initialize() { + //! Set the supported properties + std::set<core::Property> properties; + properties.insert(hostName); + properties.insert(port); + properties.insert(batchSize); + properties.insert(portUUID); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); } -void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, core::ProcessSession *session) -{ - std::string value; - int64_t lvalue; - std::string host = ""; - uint16_t sport = 0; - - if (context->getProperty(hostName.getName(), value)) { - host = value; - } - if (context->getProperty(port.getName(), value) - && core::Property::StringToInt(value, lvalue)) { - sport = (uint16_t) lvalue; - } - if (context->getProperty(portUUID.getName(), value)) { - uuid_parse(value.c_str(), protocol_uuid_); - } - - std::shared_ptr<Site2SiteClientProtocol> protocol_ = this->obtainSite2SiteProtocol(host, sport, protocol_uuid_); - - if (!protocol_) - { - context->yield(); - return; - } - - if (!protocol_->bootstrap()) - { - // bootstrap the client protocol if needeed - context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( - context->getProcessorNode().getProcessor()); - logger_->log_error("Site2Site bootstrap failed yield period %d peer ", - processor->getYieldPeriodMsec()); - returnSite2SiteProtocol(protocol_); - return; - } - - int64_t batch = 100; - - if (context->getProperty(batchSize.getName(), value) && core::Property::StringToInt(value, lvalue)) - { - batch = lvalue; - } - - std::vector<std::shared_ptr<ProvenanceEventRecord>> records; - std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast<ProvenanceRepository> (context->getProvenanceRepository()); - - repo->getProvenanceRecord(records, batch); - - if (records.size() <= 0) - { - returnSite2SiteProtocol(protocol_); - return; - } - - Json::Value array; - for (auto record : records) - { - Json::Value recordJson; - Json::Value updatedAttributesJson; - Json::Value parentUuidJson; - Json::Value childUuidJson; - recordJson["eventId"] = record->getEventId().c_str(); - recordJson["eventType"] = ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()]; - recordJson["timestampMillis"] = record->getEventTime(); - recordJson["durationMillis"] = record->getEventDuration(); - recordJson["lineageStart"] = record->getlineageStartDate(); - recordJson["details"] = record->getDetails().c_str(); - recordJson["componentId"] = record->getComponentId().c_str(); - recordJson["componentType"] = record->getComponentType().c_str(); - recordJson["entityId"] = record->getFlowFileUuid().c_str(); - recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile"; - recordJson["entitySize"] = record->getFileSize(); - recordJson["entityOffset"] = record->getFileOffset(); - - for (auto attr : record->getAttributes()) - { - updatedAttributesJson[attr.first] = attr.second; - } - recordJson["updatedAttributes"] = updatedAttributesJson; - - for (auto parentUUID : record->getParentUuids()) - { - parentUuidJson.append(parentUUID.c_str()); - } - recordJson["parentIds"] = parentUuidJson; - - for (auto childUUID : record->getChildrenUuids()) - { - childUuidJson.append(childUUID.c_str()); - } - recordJson["childIds"] = childUuidJson; - recordJson["transitUri"] = record->getTransitUri().c_str(); - recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier().c_str(); - recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri().c_str(); - recordJson["application"] = ProvenanceAppStr; - array.append(recordJson); - } - - Json::StyledWriter writer; - std::string jsonStr = writer.write(array); - - try - { - std::map<std::string, std::string> attributes; - protocol_->transferString(context, session, jsonStr, attributes); - } - catch (...) - { - // if transfer bytes failed, return instead of purge the provenance records - returnSite2SiteProtocol(protocol_); - return; - } - - // we transfer the record, purge the record from DB - repo->purgeProvenanceRecord(records); - - returnSite2SiteProtocol(protocol_); - +void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + std::string value; + int64_t lvalue; + std::string host = ""; + uint16_t sport = 0; + + if (context->getProperty(hostName.getName(), value)) { + host = value; + } + if (context->getProperty(port.getName(), value) + && core::Property::StringToInt(value, lvalue)) { + sport = (uint16_t) lvalue; + } + if (context->getProperty(portUUID.getName(), value)) { + uuid_parse(value.c_str(), protocol_uuid_); + } + + std::shared_ptr<Site2SiteClientProtocol> protocol_ = + this->obtainSite2SiteProtocol(host, sport, protocol_uuid_); + + if (!protocol_) { + context->yield(); + return; + } + + if (!protocol_->bootstrap()) { + // bootstrap the client protocol if needeed + context->yield(); + std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor + > (context->getProcessorNode().getProcessor()); + logger_->log_error("Site2Site bootstrap failed yield period %d peer ", + processor->getYieldPeriodMsec()); + returnSite2SiteProtocol(protocol_); + return; + } + + int64_t batch = 100; + + if (context->getProperty(batchSize.getName(), value) + && core::Property::StringToInt(value, lvalue)) { + batch = lvalue; + } + + std::vector < std::shared_ptr < ProvenanceEventRecord >> records; + std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast + < ProvenanceRepository > (context->getProvenanceRepository()); + + repo->getProvenanceRecord(records, batch); + + if (records.size() <= 0) { + returnSite2SiteProtocol(protocol_); + return; + } + + Json::Value array; + for (auto record : records) { + Json::Value recordJson; + Json::Value updatedAttributesJson; + Json::Value parentUuidJson; + Json::Value childUuidJson; + recordJson["eventId"] = record->getEventId().c_str(); + recordJson["eventType"] = + ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()]; + recordJson["timestampMillis"] = record->getEventTime(); + recordJson["durationMillis"] = record->getEventDuration(); + recordJson["lineageStart"] = record->getlineageStartDate(); + recordJson["details"] = record->getDetails().c_str(); + recordJson["componentId"] = record->getComponentId().c_str(); + recordJson["componentType"] = record->getComponentType().c_str(); + recordJson["entityId"] = record->getFlowFileUuid().c_str(); + recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile"; + recordJson["entitySize"] = record->getFileSize(); + recordJson["entityOffset"] = record->getFileOffset(); + + for (auto attr : record->getAttributes()) { + updatedAttributesJson[attr.first] = attr.second; + } + recordJson["updatedAttributes"] = updatedAttributesJson; + + for (auto parentUUID : record->getParentUuids()) { + parentUuidJson.append(parentUUID.c_str()); + } + recordJson["parentIds"] = parentUuidJson; + + for (auto childUUID : record->getChildrenUuids()) { + childUuidJson.append(childUUID.c_str()); + } + recordJson["childIds"] = childUuidJson; + recordJson["transitUri"] = record->getTransitUri().c_str(); + recordJson["remoteIdentifier"] = + record->getSourceSystemFlowFileIdentifier().c_str(); + recordJson["alternateIdentifier"] = + record->getAlternateIdentifierUri().c_str(); + recordJson["application"] = ProvenanceAppStr; + array.append(recordJson); + } + + Json::StyledWriter writer; + std::string jsonStr = writer.write(array); + + try { + std::map < std::string, std::string > attributes; + protocol_->transferString(context, session, jsonStr, attributes); + } catch (...) { + // if transfer bytes failed, return instead of purge the provenance records + returnSite2SiteProtocol(protocol_); + return; + } + + // we transfer the record, purge the record from DB + repo->purgeProvenanceRecord(records); + + returnSite2SiteProtocol(protocol_); } } /* namespace provenance */