fgerlits commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2205045738
##########
libminifi/src/sitetosite/RawSiteToSiteClient.cpp:
##########
@@ -0,0 +1,431 @@
+/**
+ * Site2SiteProtocol class implementation
+ *
Review Comment:
this should be updated or deleted
##########
docker/test/integration/features/s2s.feature:
##########
@@ -25,73 +25,88 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S
protocol
Scenario: A MiNiFi instance produces and transfers data to a NiFi instance
via s2s
Given a GetFile processor with the "Input Directory" property set to
"/tmp/input"
And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node opened on
"http://nifi-${feature_id}:8080/nifi"
- And the "success" relationship of the GetFile processor is connected to
the input port on the RemoteProcessGroup
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on
"http://nifi-${feature_id}:8080/nifi"
+ And an input port with name "to_nifi" is created on the RemoteProcessGroup
named "RemoteProcessGroup"
+ And the "success" relationship of the GetFile processor is connected to
the to_nifi
- And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi"
+ And a NiFi flow is receiving data in an input port named "from-minifi"
with the id of the port named "to_nifi" from the RemoteProcessGroup named
"RemoteProcessGroup"
Review Comment:
I understand this is already quite long, but I think
```suggestion
And a NiFi flow is receiving data from the RemoteProcessGroup named
"RemoteProcessGroup" in an input port named "from-minifi" which has the same id
as the port named "to_nifi"
```
would be easier to understand, even though it is even longer.
##########
docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py:
##########
@@ -134,8 +179,8 @@ def serialize_node(self, connectable, root, visited):
for proc in conn_procs:
root['connections'].append({
'name': str(uuid.uuid4()),
- 'source': {'id': str(connectable.uuid)},
- 'destination': {'id': str(proc.uuid) if not
isinstance(proc, InputPort) else str(proc.instance_id)}
+ 'source': {'id': str(connectable.uuid) if not
isinstance(connectable, InputPort) and not isinstance(connectable, OutputPort)
else str(connectable.instance_id)},
+ 'destination': {'id': str(proc.uuid) if not
isinstance(proc, InputPort) and not isinstance(proc, OutputPort) else
str(proc.instance_id)}
Review Comment:
this is getting too complex; it could be simplified e.g. by adding an
`id_for_connection` method in `Connectable`, and overriding it in `InputPort`
and `OutputPort`
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
+ std::shared_ptr<Transaction> transaction;
+ const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ std::string message;
};
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
public:
- SiteToSiteClient()
- : core::ConnectableImpl("SitetoSiteClient") {
+ explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+ : peer_(std::move(peer)) {
+ gsl_Assert(peer_);
}
- ~SiteToSiteClient() override = default;
+ SiteToSiteClient(const SiteToSiteClient&) = delete;
+ SiteToSiteClient(SiteToSiteClient&&) = delete;
+ SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+ SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
- void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
- ssl_context_service_ = context_service;
- }
+ virtual ~SiteToSiteClient() = default;
- /**
- * Creates a transaction using the transaction ID and the direction
- * @param transactionID transaction identifier
- * @param direction direction of transfer
- */
- virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
+ virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+ virtual bool transmitPayload(core::ProcessContext& context, const
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
- /**
- * Transfers flow files
- * @param direction transfer direction
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transfer(TransferDirection direction, core::ProcessContext&
context, core::ProcessSession& session) {
-#ifndef WIN32
- if (__builtin_expect(direction == SEND, 1)) {
+ bool transfer(TransferDirection direction, core::ProcessContext& context,
core::ProcessSession& session) {
+ if (direction == TransferDirection::SEND) {
return transferFlowFiles(context, session);
} else {
return receiveFlowFiles(context, session);
}
-#else
- if (direction == SEND) {
- return transferFlowFiles(context, session);
- } else {
- return receiveFlowFiles(context, session);
- }
-#endif
}
- /**
- * Transfers flow files to server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transferFlowFiles(core::ProcessContext& context,
core::ProcessSession& session);
-
- /**
- * Receive flow files from server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
-
- // Confirm the data that was sent or received by comparing CRC32's of the
data sent and the data received.
- // Receive flow files for the process session
- bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession&
session);
-
- // Receive the data packet from the transaction
- // Return false when any error occurs
- bool receive(const utils::Identifier &transactionID, DataPacket *packet,
bool &eof);
- /**
- * Transfers raw data and attributes to server
- * @param context process context
- * @param session process session
- * @param payload data to transmit
- * @param attributes
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transmitPayload(core::ProcessContext& context,
core::ProcessSession& session, const std::string &payload,
- std::map<std::string, std::string> attributes)
= 0;
-
- void setPortId(utils::Identifier &id) {
+ void setPortId(const utils::Identifier& id) {
port_id_ = id;
}
- /**
- * Sets the idle timeout.
- */
void setIdleTimeout(std::chrono::milliseconds timeout) {
idle_timeout_ = timeout;
}
- /**
- * Sets the base peer for this interface.
- */
- virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
- peer_ = std::move(peer);
- }
-
- /**
- * Provides a reference to the port identifier
- * @returns port identifier
- */
- utils::Identifier getPortId() const {
+ [[nodiscard]] utils::Identifier getPortId() const {
return port_id_;
}
- /**
- * Obtains the peer list and places them into the provided vector
- * @param peers peer vector.
- * @return true if successful, false otherwise
- */
- virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
- /**
- * Establishes the interface.
- * @return true if successful, false otherwise
- */
- virtual bool establish() = 0;
-
- const std::shared_ptr<core::logging::Logger> &getLogger() {
+ [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
return logger_;
}
- void yield() override {
+ void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
+ ssl_context_service_ = context_service;
}
- /**
- * Determines if we are connected and operating
- */
- bool isRunning() const override {
- return running_;
+ void setUseCompression(bool use_compression) {
+ use_compression_ = use_compression;
}
- /**
- * Determines if work is available by this connectable
- * @return boolean if work is available.
- */
- bool isWorkAvailable() override {
- return true;
+ void setBatchSize(uint64_t size) {
+ batch_size_ = size;
}
- virtual bool bootstrap() {
- return true;
+ void setBatchCount(uint64_t count) {
+ batch_count_ = count;
}
- // Return -1 when any error occurs
- virtual int16_t send(const utils::Identifier& transactionID, DataPacket*
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession*
session);
+ void setBatchDuration(std::chrono::milliseconds duration) {
+ batch_duration_ = duration;
+ }
- protected:
- // Cancel the transaction
- virtual void cancel(const utils::Identifier &transactionID);
- // Complete the transaction
- virtual bool complete(core::ProcessContext& context, const utils::Identifier
&transactionID);
- // Error the transaction
- virtual void error(const utils::Identifier &transactionID);
+ virtual void setTimeout(std::chrono::milliseconds timeout) {
+ timeout_ = timeout;
+ }
- virtual bool confirm(const utils::Identifier &transactionID);
- // deleteTransaction
- virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+ friend class test::SiteToSiteClientTestAccessor;
+ virtual bool bootstrap() = 0;
+ virtual bool establish() = 0;
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
virtual void tearDown() = 0;
- // read Respond
- virtual int readResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode &code, std::string &message);
- // write respond
- virtual int writeResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode code, const std::string& message);
- // getRespondCodeContext
- virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (auto & i : SiteToSiteRequest::respondCodeContext) {
- if (i.code == code) {
- return &i;
- }
- }
- return nullptr;
- }
+ virtual void deleteTransaction(const utils::Identifier &transaction_id);
+ virtual std::optional<SiteToSiteResponse> readResponse(const
std::shared_ptr<Transaction> &transaction);
+ virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction,
const SiteToSiteResponse& response);
- // Peer State
- PeerState peer_state_{PeerState::IDLE};
+ bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+ bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>&
transaction, const std::map<std::string, std::string>& attributes);
+ void finalizeSendTransaction(const std::shared_ptr<Transaction>&
transaction, uint64_t sent_bytes);
+ bool sendPacket(const DataPacket& packet);
+ bool sendFlowFile(const std::shared_ptr<Transaction>& transaction,
core::FlowFile& flow_file, core::ProcessSession& session);
- // portId
- utils::Identifier port_id_;
+ void cancel(const utils::Identifier &transaction_id);
+ bool complete(core::ProcessContext& context, const utils::Identifier
&transaction_id);
+ void error(const utils::Identifier &transaction_id);
+ bool confirm(const utils::Identifier &transaction_id);
- // idleTimeout
- std::chrono::milliseconds idle_timeout_{15000};
+ void handleTransactionError(const std::shared_ptr<Transaction>& transaction,
core::ProcessContext& context, const std::exception& exception);
- // Peer Connection
+ PeerState peer_state_{PeerState::IDLE};
+ utils::Identifier port_id_;
+ std::chrono::milliseconds idle_timeout_{15000};
Review Comment:
we could write
```suggestion
std::chrono::milliseconds idle_timeout_{15s};
```
since we are already including chrono literals
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+ return
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity,
const utils::Identifier &id) {
+ try {
+ rapidjson::Document root;
+ rapidjson::ParseResult ok = root.Parse(entity.c_str());
+ if (!ok) {
+ std::stringstream ss;
+ ss << "Failed to parse archive lens stack from JSON string with reason: "
+ << rapidjson::GetParseError_En(ok.Code())
+ << " at offset " << ok.Offset();
+
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
+ }
+
+ std::vector<PeerStatus> peer_statuses;
+ if (!root.HasMember("peers") || !root["peers"].IsArray() ||
root["peers"].Size() <= 0) {
+ logger->log_debug("Peers is either not a member or is empty. String to
analyze: {}", entity);
+ return peer_statuses;
Review Comment:
Is it intentional that we return an empty vector here? In other error cases,
we return a `nullopt`.
##########
libminifi/src/sitetosite/RawSiteToSiteClient.cpp:
##########
@@ -0,0 +1,431 @@
+/**
+ * Site2SiteProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <utility>
+#include <map>
+#include <string>
+#include <memory>
+#include <vector>
+
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "utils/gsl.h"
+#include "utils/Enum.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+bool negotiateVersion(SiteToSitePeer& peer, const std::string& resource_name,
const std::string& negotiation_type, uint32_t& current_version, uint32_t&
current_version_index,
+ const std::vector<uint32_t>& supported_versions, const
std::shared_ptr<core::logging::Logger>& logger) {
+ if (const auto ret = peer.write(resource_name); ret == 0 ||
io::isError(ret)) {
+ logger->log_debug("result of writing {} resource name is {}",
negotiation_type, ret);
+ return false;
+ }
+
+ if (const auto ret = peer.write(current_version); ret == 0 ||
io::isError(ret)) {
+ logger->log_debug("result of {} version is {}", negotiation_type, ret);
+ return false;
+ }
+
+ uint8_t status_code_byte = 0;
+ if (const auto ret = peer.read(status_code_byte); ret == 0 ||
io::isError(ret)) {
+ logger->log_debug("result of writing {} version status code {}",
negotiation_type, ret);
+ return false;
+ }
+
+ auto status_code =
magic_enum::enum_cast<ResourceNegotiationStatusCode>(status_code_byte);
+ if (!status_code) {
+ logger->log_error("Negotiate {} response unknown code {}",
negotiation_type, status_code_byte);
+ return false;
+ }
+
+ switch (*status_code) {
+ case ResourceNegotiationStatusCode::RESOURCE_OK: {
+ logger->log_debug("Site2Site {} Negotiate version OK", negotiation_type);
+ return true;
+ }
+ case ResourceNegotiationStatusCode::DIFFERENT_RESOURCE_VERSION: {
+ uint32_t server_version = 0;
+ if (const auto ret = peer.read(server_version); ret == 0 ||
io::isError(ret)) {
+ return false;
+ }
+
+ logger->log_info("Site2Site Server Response asked for a different
protocol version ", server_version);
+
+ for (uint32_t i = (current_version_index + 1); i <
supported_versions.size(); i++) {
+ if (server_version >= supported_versions.at(i)) {
+ current_version = supported_versions.at(i);
+ current_version_index = i;
+ return negotiateVersion(peer, resource_name, negotiation_type,
current_version, current_version_index, supported_versions, logger);
+ }
+ }
+ logger->log_error("Site2Site Negotiate {} failed to find a common
version with server", negotiation_type);
+ return false;
+ }
+ case ResourceNegotiationStatusCode::NEGOTIATED_ABORT: {
+ logger->log_error("Site2Site Negotiate {} response ABORT",
negotiation_type);
+ return false;
+ }
+ default: {
+ logger->log_error("Negotiate {} response unhandled code {}",
negotiation_type, magic_enum::enum_name(*status_code));
+ return false;
+ }
+ }
+}
+} // namespace
+
+bool RawSiteToSiteClient::establish() {
+ if (peer_state_ != PeerState::IDLE) {
+ logger_->log_error("Site2Site peer state is not idle while try to
establish");
+ return false;
+ }
+
+ if (auto ret = peer_->open(); !ret) {
+ logger_->log_error("Site2Site peer socket open failed");
+ return false;
+ }
+
+ if (auto ret = initiateResourceNegotiation(); !ret) {
+ logger_->log_error("Site2Site Protocol Version Negotiation failed");
+ return false;
+ }
+
+ logger_->log_debug("Site2Site socket established");
+ peer_state_ = PeerState::ESTABLISHED;
+
+ return true;
+}
+
+bool RawSiteToSiteClient::initiateResourceNegotiation() {
+ if (peer_state_ != PeerState::IDLE) {
+ logger_->log_error("Site2Site peer state is not idle while
initiateResourceNegotiation");
+ return false;
+ }
+
+ logger_->log_debug("Negotiate protocol version with destination port {}
current version {}", port_id_.to_string(), current_version_);
+ return negotiateVersion(*peer_, std::string{PROTOCOL_RESOURCE_NAME},
"protocol", current_version_, current_version_index_, supported_version_,
logger_);
+}
+
+bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
+ if (peer_state_ != PeerState::HANDSHAKED) {
+ logger_->log_error("Site2Site peer state is not handshaked while
initiateCodecResourceNegotiation");
+ return false;
+ }
+
+ logger_->log_trace("Negotiate Codec version with destination port {} current
version {}", port_id_.to_string(), current_codec_version_);
+ return negotiateVersion(*peer_, std::string{CODEC_RESOURCE_NAME}, "codec",
current_codec_version_, current_codec_version_index_, supported_codec_version_,
logger_);
+}
+
+bool RawSiteToSiteClient::handShake() {
+ if (peer_state_ != PeerState::ESTABLISHED) {
+ logger_->log_error("Site2Site peer state is not established while
handshake");
+ return false;
+ }
+ logger_->log_debug("Site2Site Protocol Perform hand shake with destination
port {}", port_id_.to_string());
+ comms_identifier_ = utils::IdGenerator::getIdGenerator()->generate();
+
+ if (const auto ret = peer_->write(comms_identifier_); ret == 0 ||
io::isError(ret)) {
+ logger_->log_error("Failed to write comms identifier {}", ret);
+ return false;
+ }
+
+ std::map<std::string, std::string> properties;
+ // TODO(lordgamez): send use_compression_ boolean value when compression
support is added
+ properties[std::string(magic_enum::enum_name(HandshakeProperty::GZIP))] =
"false";
+
properties[std::string(magic_enum::enum_name(HandshakeProperty::PORT_IDENTIFIER))]
= port_id_.to_string();
+
properties[std::string(magic_enum::enum_name(HandshakeProperty::REQUEST_EXPIRATION_MILLIS))]
= std::to_string(timeout_.load().count());
+ if (current_version_ >= 5) {
+ if (batch_count_ > 0) {
+
properties[std::string(magic_enum::enum_name(HandshakeProperty::BATCH_COUNT))]
= std::to_string(batch_count_);
+ }
+ if (batch_size_ > 0) {
+
properties[std::string(magic_enum::enum_name(HandshakeProperty::BATCH_SIZE))] =
std::to_string(batch_size_);
+ }
+ if (batch_duration_.load() > 0ms) {
+
properties[std::string(magic_enum::enum_name(HandshakeProperty::BATCH_DURATION))]
= std::to_string(batch_duration_.load().count());
+ }
+ }
+
+ if (current_version_ >= 3) {
+ if (const auto ret = peer_->write(peer_->getURL()); ret == 0 ||
io::isError(ret)) {
+ logger_->log_error("Failed to write peer URL {}", ret);
+ return false;
+ }
+ }
+
+ if (const auto ret = peer_->write(gsl::narrow<uint32_t>(properties.size()));
ret == 0 || io::isError(ret)) {
+ logger_->log_error("Failed to write properties size {}", ret);
+ return false;
+ }
+
+ for (const auto& property : properties) {
+ if (const auto ret = peer_->write(property.first); ret == 0 ||
io::isError(ret)) {
+ logger_->log_error("Failed to write property key {}", ret);
+ return false;
+ }
+
+ if (const auto ret = peer_->write(property.second); ret == 0 ||
io::isError(ret)) {
+ logger_->log_error("Failed to write property value {}", ret);
+ return false;
+ }
+ logger_->log_debug("Site2Site Protocol Send handshake properties {} {}",
property.first, property.second);
+ }
+
+ const auto response = readResponse(nullptr);
+ if (!response) {
+ return false;
+ }
+
+ auto logPortStateError = [this](const std::string& error) {
+ logger_->log_error("Site2Site HandShake Failed because destination port,
{}, is {}", port_id_.to_string(), error);
+ };
+
+ switch (response->code) {
+ case ResponseCode::PROPERTIES_OK:
+ logger_->log_debug("Site2Site HandShake Completed");
+ peer_state_ = PeerState::HANDSHAKED;
+ return true;
+ case ResponseCode::PORT_NOT_IN_VALID_STATE:
+ logPortStateError("in invalid state");
+ return false;
+ case ResponseCode::UNKNOWN_PORT:
+ logPortStateError("an unknown port");
+ return false;
+ case ResponseCode::PORTS_DESTINATION_FULL:
+ logPortStateError("full");
+ return false;
+ case ResponseCode::UNAUTHORIZED:
+ logger_->log_error("Site2Site HandShake on port {} failed:
UNAUTHORIZED", port_id_.to_string());
+ return false;
+ default:
+ logger_->log_error("Site2Site HandShake on port {} failed: unknown
response code {}", port_id_.to_string(),
magic_enum::enum_underlying(response->code));
+ return false;
+ }
+}
+
+void RawSiteToSiteClient::tearDown() {
+ if (magic_enum::enum_underlying(peer_state_) >=
magic_enum::enum_underlying(PeerState::ESTABLISHED)) {
+ logger_->log_trace("Site2Site Protocol tearDown");
+ writeRequestType(RequestType::SHUTDOWN);
+ }
+
+ known_transactions_.clear();
+ peer_->close();
+ peer_state_ = PeerState::IDLE;
+}
+
+std::optional<std::vector<PeerStatus>> RawSiteToSiteClient::getPeerList() {
+ if (!establish() || !handShake()) {
+ tearDown();
+ return std::nullopt;
+ }
+
+ if (writeRequestType(RequestType::REQUEST_PEER_LIST) <= 0) {
+ tearDown();
+ return std::nullopt;
+ }
+
+ uint32_t number_of_peers = 0;
+ std::vector<PeerStatus> peers;
+ if (const auto ret = peer_->read(number_of_peers); ret == 0 ||
io::isError(ret)) {
+ tearDown();
+ return std::nullopt;
+ }
+
+ for (uint32_t i = 0; i < number_of_peers; i++) {
+ std::string host;
+ if (const auto ret = peer_->read(host); ret == 0 || io::isError(ret)) {
+ tearDown();
+ return std::nullopt;
+ }
+
+ uint32_t port = 0;
+ if (const auto ret = peer_->read(port); ret == 0 || io::isError(ret)) {
+ tearDown();
+ return std::nullopt;
+ }
+
+ uint8_t secure = 0;
+ if (const auto ret = peer_->read(secure); ret == 0 || io::isError(ret)) {
+ tearDown();
+ return std::nullopt;
+ }
+
+ uint32_t count = 0;
+ if (const auto ret = peer_->read(count); ret == 0 || io::isError(ret)) {
+ tearDown();
+ return std::nullopt;
+ }
+
+ peers.push_back(PeerStatus(port_id_, host, gsl::narrow<uint16_t>(port),
count, true));
+ logger_->log_trace("Site2Site Peer host {} port {} Secure {}", host, port,
std::to_string(secure));
Review Comment:
`secure` is no longer used in `PeerStatus` (only in the log); I can't see it
added back in #1974, either -- is this OK?
##########
libminifi/test/unit/SiteToSiteTests.cpp:
##########
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "io/BaseStream.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SiteToSiteHelper.h"
+#include "unit/DummyProcessor.h"
+#include "unit/ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SiteToSiteClientTestAccessor {
+ public:
+ static bool bootstrap(sitetosite::RawSiteToSiteClient& client) {
+ return client.bootstrap();
+ }
+
+ static std::shared_ptr<sitetosite::Transaction>
createTransaction(sitetosite::RawSiteToSiteClient& client,
sitetosite::TransferDirection direction) {
+ return client.createTransaction(direction);
+ }
+
+ static bool sendFlowFile(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::FlowFile&
flow_file, core::ProcessSession& session) {
+ return client.sendFlowFile(transaction, flow_file, session);
+ }
+
+ static bool sendPacket(sitetosite::RawSiteToSiteClient& client, const
sitetosite::DataPacket& packet) {
+ return client.sendPacket(packet);
+ }
+
+ static std::pair<uint64_t, uint64_t>
readFlowFiles(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::ProcessSession&
session) {
+ return client.readFlowFiles(transaction, session);
+ }
+};
+
+void initializeMockBootstrapResponses(const
std::unique_ptr<SiteToSiteResponder>& collector) {
Review Comment:
the parameter type of this could be `SiteToSiteResponder&`, too
##########
libminifi/include/sitetosite/Peer.h:
##########
@@ -17,88 +17,29 @@
*/
#pragma once
-#include <errno.h>
-#include <stdio.h>
-
#include <atomic>
-#include <map>
#include <memory>
-#include <mutex>
#include <string>
#include <utility>
+#include <array>
#include "core/logging/LoggerFactory.h"
-#include "core/Property.h"
-#include "io/BaseStream.h"
-#include "io/BufferStream.h"
-#include "properties/Configure.h"
#include "http/BaseHTTPClient.h"
-#include "utils/TimeUtil.h"
+#include "io/BaseStream.h"
#include "io/NetworkPrioritizer.h"
-namespace org::apache::nifi::minifi::sitetosite {
-
-class Peer {
- public:
- explicit Peer(const utils::Identifier &port_id, const std::string &host,
uint16_t port, bool secure = false)
- : host_(host),
- port_(port),
- secure_(secure) {
- port_id_ = port_id;
- }
-
- explicit Peer(const std::string &host, uint16_t port, bool secure = false)
- : host_(host),
- port_(port),
- secure_(secure) {
- }
-
- explicit Peer(const Peer &other)
- : host_(other.host_),
- port_(other.port_),
- secure_(other.secure_) {
- port_id_ = other.port_id_;
- }
-
- explicit Peer(Peer &&other)
- : host_(std::move(other.host_)),
- port_(std::move(other.port_)),
- secure_(std::move(other.secure_)) {
- port_id_ = other.port_id_;
- }
-
- uint16_t getPort() const {
- return port_;
- }
-
- const std::string &getHost() const {
- return host_;
- }
+using namespace std::literals::chrono_literals;
- bool isSecure() const {
- return secure_;
- }
-
- utils::Identifier getPortId() const {
- return port_id_;
- }
-
- protected:
- std::string host_;
-
- uint16_t port_;
-
- utils::Identifier port_id_;
-
- // secore comms
+namespace org::apache::nifi::minifi::sitetosite {
- bool secure_;
-};
+static constexpr std::array<char, 4> MAGIC_BYTES = { 'N', 'i', 'F', 'i' };
Review Comment:
in a header, on namespace level, this should be
```suggestion
inline constexpr std::array<char, 4> MAGIC_BYTES = { 'N', 'i', 'F', 'i' };
```
##########
extensions/standard-processors/tests/unit/FlowJsonTests.cpp:
##########
@@ -1786,4 +1808,69 @@ TEST_CASE("Parameter context names cannot conflict with
parameter provider gener
"with no parameter provider or generated by other parameter provider");
}
+TEST_CASE("Output port can also be used in RPG") {
+ ConfigurationTestController test_controller;
+
+ core::flow::AdaptiveConfiguration config(test_controller.getContext());
+
+ static const std::string CONFIG_JSON =
+ R"(
+{
+ "rootGroup": {
+ "name": "MiNiFi Flow",
+ "processors": [{
+ "identifier": "00000000-0000-0000-0000-000000000001",
+ "name": "PutFile",
+ "type": "org.apache.nifi.processors.standard.PutFile",
+ "schedulingStrategy": "Event_DRIVEN",
+ "autoTerminatedRelationships": ["success"],
+ "properties": {}
+ }],
+ "connections": [{
+ "identifier": "00000000-0000-0000-0000-000000000008",
+ "name": "S2SToRPG",
+ "source": {
+ "id": "00000000-0000-0000-0000-000000000003",
+ "name": "AmazingOutputPort"
+ },
+ "destination": {
+ "id": "00000000-0000-0000-0000-000000000001",
+ "name": "PutFile"
+ },
+ "selectedRelationships": [""]
+ }],
+ "remoteProcessGroups": [{
+ "name": "NiFi Flow",
+ "targetUri": "https://localhost:8090/nifi",
+ "communicationsTimeout": "19 sec",
+ "inputPorts": [{
Review Comment:
this should be `outputPorts`
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
Review Comment:
is this `size` field used anywhere?
##########
libminifi/src/sitetosite/SiteToSiteFactory.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/SiteToSiteFactory.h"
+
+#include <utility>
+#include <memory>
+
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "sitetosite/HttpSiteToSiteClient.h"
+#include "utils/net/AsioSocketUtils.h"
+#include "core/ClassLoader.h"
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::unique_ptr<SiteToSitePeer> createStreamingPeer(const
SiteToSiteClientConfiguration &client_configuration) {
+ utils::net::SocketData socket_data{client_configuration.getHost(),
client_configuration.getPort(), client_configuration.getSecurityContext()};
+ auto connection =
std::make_unique<utils::net::AsioSocketConnection>(socket_data);
+ return std::make_unique<SiteToSitePeer>(std::move(connection),
client_configuration.getHost(), client_configuration.getPort(),
client_configuration.getInterface());
+}
+
+void setCommonConfigurationOptions(SiteToSiteClient& client, const
SiteToSiteClientConfiguration &client_configuration) {
+ client.setSSLContextService(client_configuration.getSecurityContext());
+ client.setUseCompression(client_configuration.getUseCompression());
+ if (client_configuration.getBatchCount()) {
+ client.setBatchCount(client_configuration.getBatchCount().value());
+ }
+ if (client_configuration.getBatchSize()) {
+ client.setBatchSize(client_configuration.getBatchSize().value());
+ }
+ if (client_configuration.getBatchDuration()) {
+ client.setBatchDuration(client_configuration.getBatchDuration().value());
+ }
+ if (client_configuration.getTimeout()) {
+ client.setTimeout(client_configuration.getTimeout().value());
+ }
+}
+
+std::unique_ptr<SiteToSiteClient> createRawSocketSiteToSiteClient(const
SiteToSiteClientConfiguration &client_configuration) {
+ auto raw_site_to_site_client =
std::make_unique<RawSiteToSiteClient>(createStreamingPeer(client_configuration));
+ raw_site_to_site_client->setPortId(client_configuration.getPortId());
+ setCommonConfigurationOptions(*raw_site_to_site_client,
client_configuration);
+ return raw_site_to_site_client;
+}
+
+std::unique_ptr<SiteToSiteClient> createHttpSiteToSiteClient(const
SiteToSiteClientConfiguration &client_configuration) {
+ auto peer = std::make_unique<SiteToSitePeer>(client_configuration.getHost(),
client_configuration.getPort(), client_configuration.getInterface());
+ peer->setHTTPProxy(client_configuration.getHTTPProxy());
+
+ auto http_site_to_site_client =
std::make_unique<HttpSiteToSiteClient>(std::move(peer));
+ http_site_to_site_client->setPortId(client_configuration.getPortId());
+
http_site_to_site_client->setIdleTimeout(client_configuration.getIdleTimeout());
+ setCommonConfigurationOptions(*http_site_to_site_client,
client_configuration);
+ return http_site_to_site_client;
+}
+} // namespace
+
+std::unique_ptr<SiteToSiteClient> createClient(const
SiteToSiteClientConfiguration &client_configuration) {
+ switch (client_configuration.getClientType()) {
+ case ClientType::RAW:
+ return createRawSocketSiteToSiteClient(client_configuration);
+ case ClientType::HTTP:
+ return createHttpSiteToSiteClient(client_configuration);
+ }
+ return nullptr;
Review Comment:
It would be better to terminate here; could this be a `gsl_Assert(false)` or
something similar?
##########
libminifi/include/sitetosite/SiteToSite.h:
##########
@@ -23,35 +23,28 @@
#include "minifi-cpp/controllers/SSLContextService.h"
#include "Peer.h"
-#include "core/Property.h"
-#include "properties/Configure.h"
#include "io/CRCStream.h"
#include "utils/Id.h"
-#include "http/BaseHTTPClient.h"
-#include "utils/Export.h"
namespace org::apache::nifi::minifi::sitetosite {
-#if defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wunused-variable"
-#endif
-
-// Resource Negotiated Status Code
-#define RESOURCE_OK 20
-#define DIFFERENT_RESOURCE_VERSION 21
-#define NEGOTIATED_ABORT 255
-// ! Max attributes
-#define MAX_NUM_ATTRIBUTES 25000
-
-// Respond Code Sequence Pattern
-static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
-static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
+
+enum class ResourceNegotiationStatusCode : uint8_t {
+ RESOURCE_OK = 20,
+ DIFFERENT_RESOURCE_VERSION = 21,
+ NEGOTIATED_ABORT = 255
+};
+
+static constexpr uint32_t MAX_NUM_ATTRIBUTES = 25000;
+
+// Response Code Sequence Pattern
+static constexpr uint8_t CODE_SEQUENCE_VALUE_1 = static_cast<uint8_t>('R');
+static constexpr uint8_t CODE_SEQUENCE_VALUE_2 = static_cast<uint8_t>('C');
Review Comment:
these should be `inline constexpr`, too
##########
SITE_TO_SITE.md:
##########
@@ -0,0 +1,262 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+## Table of Contents
+- [Site-to-Site Overview](#site-to-site-overview)
+- [Site-to-Site Configuration](#site-to-site-configuration)
+ - [Site-to-Site Configuration on NiFi
side](#site-to-site-configuration-on-nifi-side)
+ - [Site-to-Site Configuration on MiNiFi C++
side](#site-to-site-configuration-on-minifi-c-side)
+- [Additional examples](#additional-examples)
+
+## Site-to-Site Overview
+
+Site-to-Site protocol allows data to be transferred between MiNiFi C++ and
NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote
process groups. This is useful for scenarios where you want to send data from
MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and
HTTP protocols.
+
+At the moment site-to-site protocol is only supported between MiNiFi C++ and
NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++
instances. It is recommended to use processors like InvokeHTTP and ListenHTTP
to transfer data between MiNiFi C++ instances.
+
+## Site-to-Site Configuration
+
+### Site-to-Site Configuration on NiFi side
+
+On NiFi side, site-to-site protocol is configured by creating input and output
ports. The input port is used to receive data from MiNiFi C++ and the output
port is used to send data to MiNiFi C++. The input and output ports can be
created in the NiFi UI by dragging and dropping the input and output port icons
onto the canvas.
+
+To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the
instance id of the port should be used. The instance id can be found in the
NiFi UI by clicking on the input or output port and looking at the operation
panel. It can be copied from that panel, or from the port "instanceIdentifier"
field from configuration json file in the NiFi conf directory.
+
+### Site-to-Site Configuration on MiNiFi C++ side
+
+Site-to-Site protocol is configured on the MiNiFi C++ side by using remote
process groups in the configuration. The remote process group represents the
NiFi endpoint and uses the instance ids of the ports created on the NiFi side.
The remote process group can be configured to use either raw TCP or HTTP
protocol.
+
+Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++
where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol:
+
+```yaml
+MiNiFi Config Version: 3
+Flow Controller:
+ name: Simple GenerateFlowFile to RPG
+Processors:
+ - id: b0c04f28-0158-1000-0000-000000000000
+ name: GenerateFlowFile
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 5 sec
+ auto-terminated relationships list: []
+ Properties:
+ Data Format: Text
+ Unique FlowFiles: false
+ Custom Text: Custom text
+Connections:
+ - id: b0c0c3cc-0158-1000-0000-000000000000
+ name: GenerateFlowFile/succes/nifi-inputport
+ source id: b0c04f28-0158-1000-0000-000000000000
+ destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6
+ source relationship name: success
+Remote Process Groups:
+ - id: b0c09ff0-0158-1000-0000-000000000000
+ name: "RPG"
+ url: http://localhost:8080/nifi
+ timeout: 20 sec
+ yield period: 10 sec
+ transport protocol: RAW
+ Input Ports:
+ - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6 # this is the instance id of
the input port created in NiFi
+ name: nifi-inputport
+ max concurrent tasks: 1
+ use compression: false # currently not supported and ignored in
MiNiFi C++
+ batch size:
+ size: 10 MB
+ count: 10
+ duration: 30 sec
+ Output Ports: []
+```
+
+Here is another example in NiFi style json format how to configure
site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving
data from NiFi using the HTTP protocol:
+
+```json
Review Comment:
Here in the readme, I think it would be better to have YAML configurations
for both cases, because YAML is more readable, and it would be easier to
compare the two cases (input vs output port).
I think it's enough to include JSON versions of these configurations in the
examples directory, and mention them in this readme (as you already do).
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
+ std::shared_ptr<Transaction> transaction;
+ const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ std::string message;
};
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
public:
- SiteToSiteClient()
- : core::ConnectableImpl("SitetoSiteClient") {
+ explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+ : peer_(std::move(peer)) {
+ gsl_Assert(peer_);
Review Comment:
Can we change the type to `gsl::not_null` (and get rid of the assert)?
##########
SITE_TO_SITE.md:
##########
@@ -0,0 +1,262 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+## Table of Contents
+- [Site-to-Site Overview](#site-to-site-overview)
+- [Site-to-Site Configuration](#site-to-site-configuration)
+ - [Site-to-Site Configuration on NiFi
side](#site-to-site-configuration-on-nifi-side)
+ - [Site-to-Site Configuration on MiNiFi C++
side](#site-to-site-configuration-on-minifi-c-side)
+- [Additional examples](#additional-examples)
+
+## Site-to-Site Overview
+
+Site-to-Site protocol allows data to be transferred between MiNiFi C++ and
NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote
process groups. This is useful for scenarios where you want to send data from
MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and
HTTP protocols.
+
+At the moment site-to-site protocol is only supported between MiNiFi C++ and
NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++
instances. It is recommended to use processors like InvokeHTTP and ListenHTTP
to transfer data between MiNiFi C++ instances.
+
+## Site-to-Site Configuration
+
+### Site-to-Site Configuration on NiFi side
+
+On NiFi side, site-to-site protocol is configured by creating input and output
ports. The input port is used to receive data from MiNiFi C++ and the output
port is used to send data to MiNiFi C++. The input and output ports can be
created in the NiFi UI by dragging and dropping the input and output port icons
onto the canvas.
+
+To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the
instance id of the port should be used. The instance id can be found in the
NiFi UI by clicking on the input or output port and looking at the operation
panel. It can be copied from that panel, or from the port "instanceIdentifier"
field from configuration json file in the NiFi conf directory.
+
+### Site-to-Site Configuration on MiNiFi C++ side
+
+Site-to-Site protocol is configured on the MiNiFi C++ side by using remote
process groups in the configuration. The remote process group represents the
NiFi endpoint and uses the instance ids of the ports created on the NiFi side.
The remote process group can be configured to use either raw TCP or HTTP
protocol.
+
+Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++
where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol:
+
+```yaml
+MiNiFi Config Version: 3
+Flow Controller:
+ name: Simple GenerateFlowFile to RPG
+Processors:
+ - id: b0c04f28-0158-1000-0000-000000000000
+ name: GenerateFlowFile
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 5 sec
+ auto-terminated relationships list: []
+ Properties:
+ Data Format: Text
+ Unique FlowFiles: false
+ Custom Text: Custom text
+Connections:
+ - id: b0c0c3cc-0158-1000-0000-000000000000
+ name: GenerateFlowFile/succes/nifi-inputport
+ source id: b0c04f28-0158-1000-0000-000000000000
+ destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6
+ source relationship name: success
+Remote Process Groups:
+ - id: b0c09ff0-0158-1000-0000-000000000000
+ name: "RPG"
+ url: http://localhost:8080/nifi
+ timeout: 20 sec
+ yield period: 10 sec
+ transport protocol: RAW
+ Input Ports:
+ - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6 # this is the instance id of
the input port created in NiFi
+ name: nifi-inputport
+ max concurrent tasks: 1
+ use compression: false # currently not supported and ignored in
MiNiFi C++
+ batch size:
+ size: 10 MB
+ count: 10
+ duration: 30 sec
+ Output Ports: []
+```
+
+Here is another example in NiFi style json format how to configure
site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving
data from NiFi using the HTTP protocol:
+
+```json
+{
+ "encodingVersion": {
+ "majorVersion": 2,
+ "minorVersion": 0
+ },
+ "maxTimerDrivenThreadCount": 1,
+ "maxEventDrivenThreadCount": 1,
+ "parameterContexts": [],
+ "rootGroup": {
+ "identifier": "c5bceca3-9c20-4068-bf2d-425e14026471",
+ "instanceIdentifier": "3cb4b3ce-7cd8-4ab7-a6bf-d4640ac5db43",
+ "name": "root",
+ "position": {
+ "x": 0.0,
+ "y": 0.0
+ },
+ "processGroups": [],
+ "remoteProcessGroups": [
+ {
+ "identifier": "327b446a-0043-48d1-8bb4-df65ba1afa60",
+ "instanceIdentifier": "2ed47dca-38f5-476d-9c37-5ea0a5072f1e",
+ "name": "https://localhost:8443/nifi",
+ "position": {
+ "x": 235.0,
+ "y": 71.00000762939453
+ },
+ "targetUri": "https://localhost:8443/nifi",
+ "targetUris": "https://localhost:8443/nifi",
+ "communicationsTimeout": "30 secs",
+ "yieldDuration": "10 sec",
+ "transportProtocol": "HTTP",
+ "inputPorts": [],
+ "outputPorts": [
+ {
+ "identifier": "de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+ "instanceIdentifier":
"de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+ "name": "nifi-outputport",
+ "remoteGroupId":
"327b446a-0043-48d1-8bb4-df65ba1afa60",
+ "componentType": "REMOTE_OUTPUT_PORT",
+ "targetId": "de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+ "groupIdentifier":
"c5bceca3-9c20-4068-bf2d-425e14026471"
+ }
+ ],
+ "componentType": "REMOTE_PROCESS_GROUP",
+ "groupIdentifier": "c5bceca3-9c20-4068-bf2d-425e14026471"
+ }
+ ],
+ "processors": [
+ {
+ "identifier": "f29a2667-7c86-4b22-a5d3-a23ee88f3c66",
+ "instanceIdentifier": "7511c14c-9923-43ef-90b4-ac3e05b1a9fa",
+ "name": "PutFile",
+ "comments": "",
+ "position": {
+ "x": 1042.0,
+ "y": 90.5
+ },
+ "type": "org.apache.nifi.minifi.processors.PutFile",
+ "bundle": {
+ "group": "org.apache.nifi.minifi",
+ "artifact": "minifi-standard-processors",
+ "version": "1.0.0"
+ },
+ "properties": {
+ "Create Missing Directories": "true",
+ "Maximum File Count": "-1",
+ "Directory": ".",
+ "Conflict Resolution Strategy": "fail"
+ },
+ "propertyDescriptors": {
+ "Permissions": {
+ "name": "Permissions",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Create Missing Directories": {
+ "name": "Create Missing Directories",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Maximum File Count": {
+ "name": "Maximum File Count",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Directory Permissions": {
+ "name": "Directory Permissions",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Directory": {
+ "name": "Directory",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Conflict Resolution Strategy": {
+ "name": "Conflict Resolution Strategy",
+ "identifiesControllerService": false,
+ "sensitive": false
+ }
+ },
+ "style": {},
+ "schedulingPeriod": "1000 ms",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "executionNode": "ALL",
+ "penaltyDuration": "30000 ms",
+ "yieldDuration": "1000 ms",
+ "bulletinLevel": "WARN",
+ "runDurationMillis": 0,
+ "concurrentlySchedulableTaskCount": 1,
+ "autoTerminatedRelationships": [
+ "success",
+ "failure"
+ ],
+ "componentType": "PROCESSOR",
+ "groupIdentifier": "c5bceca3-9c20-4068-bf2d-425e14026471"
+ }
+ ],
+ "inputPorts": [],
+ "outputPorts": [],
+ "connections": [
+ {
+ "identifier": "bab1ce73-e9e5-4a9a-a990-ee9c65668d8c",
+ "instanceIdentifier": "9526b397-190a-4fe3-bf0f-bd7dfc2dfafc",
+ "name": "nifi-outputport/undefined/PutFile",
+ "position": {
+ "x": 0.0,
+ "y": 0.0
+ },
+ "source": {
+ "id": "de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+ "type": "REMOTE_OUTPUT_PORT",
+ "groupId": "327b446a-0043-48d1-8bb4-df65ba1afa60",
+ "name": "nifi-outputport"
+ },
+ "destination": {
+ "id": "f29a2667-7c86-4b22-a5d3-a23ee88f3c66",
+ "type": "PROCESSOR",
+ "groupId": "c5bceca3-9c20-4068-bf2d-425e14026471",
+ "name": "PutFile",
+ "instanceIdentifier":
"7511c14c-9923-43ef-90b4-ac3e05b1a9fa"
+ },
+ "labelIndex": 1,
+ "zIndex": 0,
+ "selectedRelationships": [
+ "undefined"
+ ],
+ "backPressureObjectThreshold": 2000,
+ "backPressureDataSizeThreshold": "100 MB",
+ "flowFileExpiration": "0 seconds",
+ "prioritizers": [],
+ "bends": [],
+ "componentType": "CONNECTION",
+ "groupIdentifier": "c5bceca3-9c20-4068-bf2d-425e14026471"
+ }
+ ],
+ "labels": [],
+ "funnels": [],
+ "controllerServices": [],
+ "variables": {},
+ "componentType": "PROCESS_GROUP"
+ }
+}
+```
+
+Notes on the configuration:
+
+- In the MiNiFi C++ configuration, in yaml configuration the remote input and
output ports' `id` field, and in json configuration the ports' `identifier`,
`instanceIdentifier`, and `targetId` fields should be set to the instance id of
the input and output ports created in NiFi
(`de7cc09a-0196-1000-2c63-ee6b4319ffb6` in the examples).
+- Connections from the remote output port to the processor should use the
`undefined` relationship
+- `useCompression` can be set, but it is currently not supported in MiNiFi C++
so it will be set to false in the site-to-site messages
+- the `targetUri` and `targetUris` field in the remote process group should be
set to the NiFi instance's URL, this can also use comma separated list of URLs
if the remote process group is configured to use multiple NiFi nodes
Review Comment:
```suggestion
- the `url` field (`targetUri` or `targetUris` in JSON) field in the remote
process group should be set to the NiFi instance's URL, this can also use comma
separated list of URLs if the remote process group is configured to use
multiple NiFi nodes
```
##########
libminifi/include/sitetosite/Peer.h:
##########
@@ -109,175 +50,109 @@ class PeerStatus {
PeerStatus& operator=(const PeerStatus &other) = default;
PeerStatus& operator=(PeerStatus &&other) = default;
- const std::shared_ptr<Peer> &getPeer() const {
- return peer_;
+ const utils::Identifier &getPortId() const {
+ return port_id_;
+ }
+
+ const std::string &getHost() const {
+ return host_;
+ }
+
+ [[nodiscard]] uint16_t getPort() const {
+ return port_;
}
- uint32_t getFlowFileCount() {
+ [[nodiscard]] uint32_t getFlowFileCount() const {
return flow_file_count_;
}
- bool getQueryForPeers() {
+ [[nodiscard]] bool getQueryForPeers() const {
return query_for_peers_;
}
protected:
- std::shared_ptr<Peer> peer_;
+ utils::Identifier port_id_;
+ std::string host_;
+ uint16_t port_;
uint32_t flow_file_count_;
bool query_for_peers_;
};
-static const char MAGIC_BYTES[] = { 'N', 'i', 'F', 'i' };
-
-// Site2SitePeer Class
-class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStreamImpl {
+class SiteToSitePeer : public io::BaseStreamImpl {
public:
- SiteToSitePeer()
- : stream_(nullptr),
- host_(""),
- port_(-1) {
- }
- /*
- * Create a new site2site peer
- */
- explicit
SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BaseStream>
injected_socket,
- const std::string host,
- uint16_t port,
- const std::string &ifc)
+ SiteToSitePeer(std::unique_ptr<io::BaseStream> injected_socket, const
std::string& host, uint16_t port, const std::string& ifc)
: SiteToSitePeer(host, port, ifc) {
stream_ = std::move(injected_socket);
}
- explicit SiteToSitePeer(const std::string &host, uint16_t port, const
std::string &ifc)
- : stream_(nullptr),
- host_(host),
+ SiteToSitePeer(const std::string &host, uint16_t port, const std::string
&ifc)
+ : host_(host),
port_(port),
- timeout_(std::chrono::seconds(30)),
- logger_(core::logging::LoggerFactory<SiteToSitePeer>::getLogger()) {
- url_ = "nifi://" + host_ + ":" + std::to_string(port_);
- yield_expiration_ = std::chrono::system_clock::time_point();
- timeout_ = std::chrono::seconds(30);
- local_network_interface_ = io::NetworkInterface(ifc, nullptr);
+ url_("nifi://" + host_ + ":" + std::to_string(port_)),
+ local_network_interface_(io::NetworkInterface(ifc, nullptr)) {
+ timeout_.store(30s);
Review Comment:
`timeout_` is already initialized below, by the default member initializer
##########
libminifi/test/libtest/integration/HTTPHandlers.cpp:
##########
@@ -93,6 +95,25 @@ TransactionResponder::TransactionResponder(std::string
base_url, std::string por
}
bool TransactionResponder::handlePost(CivetServer* /*server*/, struct
mg_connection *conn) {
+ auto req_info = mg_get_request_info(conn);
+ std::unordered_map<std::string, std::string> expected_headers {
Review Comment:
This could be massaged into a `static constexpr`, but I'm not sure we want
to bother in a test class. But at least it should be `static const`.
##########
libminifi/include/sitetosite/SiteToSite.h:
##########
@@ -205,140 +147,184 @@ typedef enum {
ABORT = 250,
UNRECOGNIZED_RESPONSE_CODE = 254,
END_OF_STREAM = 255
-}RespondCode;
-
-// Respond Code Class
-typedef struct {
- RespondCode code;
- const char *description;
- bool hasDescription;
-} RespondCodeContext;
-
-
+};
-// Request Type Str
-class SiteToSiteRequest {
- public:
- MINIFIAPI static const char *RequestTypeStr[MAX_REQUEST_TYPE];
- MINIFIAPI static RespondCodeContext respondCodeContext[21];
+struct ResponseCodeContext {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ const std::string_view description;
+ bool has_description = false;
};
+static constexpr std::array<ResponseCodeContext, 21> respond_code_contexts = {{
Review Comment:
`response_code_contexts` would be a better name
(also this should be `inline constexpr`)
##########
SITE_TO_SITE.md:
##########
@@ -0,0 +1,262 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+## Table of Contents
+- [Site-to-Site Overview](#site-to-site-overview)
+- [Site-to-Site Configuration](#site-to-site-configuration)
+ - [Site-to-Site Configuration on NiFi
side](#site-to-site-configuration-on-nifi-side)
+ - [Site-to-Site Configuration on MiNiFi C++
side](#site-to-site-configuration-on-minifi-c-side)
+- [Additional examples](#additional-examples)
+
+## Site-to-Site Overview
+
+Site-to-Site protocol allows data to be transferred between MiNiFi C++ and
NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote
process groups. This is useful for scenarios where you want to send data from
MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and
HTTP protocols.
+
+At the moment site-to-site protocol is only supported between MiNiFi C++ and
NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++
instances. It is recommended to use processors like InvokeHTTP and ListenHTTP
to transfer data between MiNiFi C++ instances.
+
+## Site-to-Site Configuration
+
+### Site-to-Site Configuration on NiFi side
+
+On NiFi side, site-to-site protocol is configured by creating input and output
ports. The input port is used to receive data from MiNiFi C++ and the output
port is used to send data to MiNiFi C++. The input and output ports can be
created in the NiFi UI by dragging and dropping the input and output port icons
onto the canvas.
+
+To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the
instance id of the port should be used. The instance id can be found in the
NiFi UI by clicking on the input or output port and looking at the operation
panel. It can be copied from that panel, or from the port "instanceIdentifier"
field from configuration json file in the NiFi conf directory.
+
+### Site-to-Site Configuration on MiNiFi C++ side
+
+Site-to-Site protocol is configured on the MiNiFi C++ side by using remote
process groups in the configuration. The remote process group represents the
NiFi endpoint and uses the instance ids of the ports created on the NiFi side.
The remote process group can be configured to use either raw TCP or HTTP
protocol.
+
+Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++
where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol:
+
+```yaml
+MiNiFi Config Version: 3
+Flow Controller:
+ name: Simple GenerateFlowFile to RPG
+Processors:
+ - id: b0c04f28-0158-1000-0000-000000000000
+ name: GenerateFlowFile
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 5 sec
+ auto-terminated relationships list: []
+ Properties:
+ Data Format: Text
+ Unique FlowFiles: false
+ Custom Text: Custom text
+Connections:
+ - id: b0c0c3cc-0158-1000-0000-000000000000
+ name: GenerateFlowFile/succes/nifi-inputport
+ source id: b0c04f28-0158-1000-0000-000000000000
+ destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6
+ source relationship name: success
+Remote Process Groups:
+ - id: b0c09ff0-0158-1000-0000-000000000000
+ name: "RPG"
+ url: http://localhost:8080/nifi
+ timeout: 20 sec
+ yield period: 10 sec
+ transport protocol: RAW
+ Input Ports:
+ - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6 # this is the instance id of
the input port created in NiFi
+ name: nifi-inputport
+ max concurrent tasks: 1
+ use compression: false # currently not supported and ignored in
MiNiFi C++
+ batch size:
+ size: 10 MB
+ count: 10
+ duration: 30 sec
+ Output Ports: []
+```
+
+Here is another example in NiFi style json format how to configure
site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving
data from NiFi using the HTTP protocol:
+
+```json
+{
+ "encodingVersion": {
+ "majorVersion": 2,
+ "minorVersion": 0
+ },
+ "maxTimerDrivenThreadCount": 1,
+ "maxEventDrivenThreadCount": 1,
+ "parameterContexts": [],
+ "rootGroup": {
+ "identifier": "c5bceca3-9c20-4068-bf2d-425e14026471",
+ "instanceIdentifier": "3cb4b3ce-7cd8-4ab7-a6bf-d4640ac5db43",
+ "name": "root",
+ "position": {
+ "x": 0.0,
+ "y": 0.0
+ },
+ "processGroups": [],
+ "remoteProcessGroups": [
+ {
+ "identifier": "327b446a-0043-48d1-8bb4-df65ba1afa60",
+ "instanceIdentifier": "2ed47dca-38f5-476d-9c37-5ea0a5072f1e",
+ "name": "https://localhost:8443/nifi",
+ "position": {
+ "x": 235.0,
+ "y": 71.00000762939453
+ },
+ "targetUri": "https://localhost:8443/nifi",
+ "targetUris": "https://localhost:8443/nifi",
Review Comment:
We shouldn't have both `targetUri` and `targetUris` in the example, since
minifi will only use the one it finds first in the config.
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
+ std::shared_ptr<Transaction> transaction;
+ const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ std::string message;
};
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
public:
- SiteToSiteClient()
- : core::ConnectableImpl("SitetoSiteClient") {
+ explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+ : peer_(std::move(peer)) {
+ gsl_Assert(peer_);
}
- ~SiteToSiteClient() override = default;
+ SiteToSiteClient(const SiteToSiteClient&) = delete;
+ SiteToSiteClient(SiteToSiteClient&&) = delete;
+ SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+ SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
- void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
- ssl_context_service_ = context_service;
- }
+ virtual ~SiteToSiteClient() = default;
- /**
- * Creates a transaction using the transaction ID and the direction
- * @param transactionID transaction identifier
- * @param direction direction of transfer
- */
- virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
+ virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+ virtual bool transmitPayload(core::ProcessContext& context, const
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
- /**
- * Transfers flow files
- * @param direction transfer direction
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transfer(TransferDirection direction, core::ProcessContext&
context, core::ProcessSession& session) {
-#ifndef WIN32
- if (__builtin_expect(direction == SEND, 1)) {
+ bool transfer(TransferDirection direction, core::ProcessContext& context,
core::ProcessSession& session) {
+ if (direction == TransferDirection::SEND) {
return transferFlowFiles(context, session);
} else {
return receiveFlowFiles(context, session);
}
-#else
- if (direction == SEND) {
- return transferFlowFiles(context, session);
- } else {
- return receiveFlowFiles(context, session);
- }
-#endif
}
- /**
- * Transfers flow files to server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transferFlowFiles(core::ProcessContext& context,
core::ProcessSession& session);
-
- /**
- * Receive flow files from server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
-
- // Confirm the data that was sent or received by comparing CRC32's of the
data sent and the data received.
- // Receive flow files for the process session
- bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession&
session);
-
- // Receive the data packet from the transaction
- // Return false when any error occurs
- bool receive(const utils::Identifier &transactionID, DataPacket *packet,
bool &eof);
- /**
- * Transfers raw data and attributes to server
- * @param context process context
- * @param session process session
- * @param payload data to transmit
- * @param attributes
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transmitPayload(core::ProcessContext& context,
core::ProcessSession& session, const std::string &payload,
- std::map<std::string, std::string> attributes)
= 0;
-
- void setPortId(utils::Identifier &id) {
+ void setPortId(const utils::Identifier& id) {
port_id_ = id;
}
- /**
- * Sets the idle timeout.
- */
void setIdleTimeout(std::chrono::milliseconds timeout) {
idle_timeout_ = timeout;
}
- /**
- * Sets the base peer for this interface.
- */
- virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
- peer_ = std::move(peer);
- }
-
- /**
- * Provides a reference to the port identifier
- * @returns port identifier
- */
- utils::Identifier getPortId() const {
+ [[nodiscard]] utils::Identifier getPortId() const {
return port_id_;
}
- /**
- * Obtains the peer list and places them into the provided vector
- * @param peers peer vector.
- * @return true if successful, false otherwise
- */
- virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
- /**
- * Establishes the interface.
- * @return true if successful, false otherwise
- */
- virtual bool establish() = 0;
-
- const std::shared_ptr<core::logging::Logger> &getLogger() {
+ [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
return logger_;
}
- void yield() override {
+ void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
+ ssl_context_service_ = context_service;
}
- /**
- * Determines if we are connected and operating
- */
- bool isRunning() const override {
- return running_;
+ void setUseCompression(bool use_compression) {
+ use_compression_ = use_compression;
}
- /**
- * Determines if work is available by this connectable
- * @return boolean if work is available.
- */
- bool isWorkAvailable() override {
- return true;
+ void setBatchSize(uint64_t size) {
+ batch_size_ = size;
}
- virtual bool bootstrap() {
- return true;
+ void setBatchCount(uint64_t count) {
+ batch_count_ = count;
}
- // Return -1 when any error occurs
- virtual int16_t send(const utils::Identifier& transactionID, DataPacket*
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession*
session);
+ void setBatchDuration(std::chrono::milliseconds duration) {
+ batch_duration_ = duration;
+ }
- protected:
- // Cancel the transaction
- virtual void cancel(const utils::Identifier &transactionID);
- // Complete the transaction
- virtual bool complete(core::ProcessContext& context, const utils::Identifier
&transactionID);
- // Error the transaction
- virtual void error(const utils::Identifier &transactionID);
+ virtual void setTimeout(std::chrono::milliseconds timeout) {
+ timeout_ = timeout;
+ }
- virtual bool confirm(const utils::Identifier &transactionID);
- // deleteTransaction
- virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+ friend class test::SiteToSiteClientTestAccessor;
+ virtual bool bootstrap() = 0;
+ virtual bool establish() = 0;
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
virtual void tearDown() = 0;
- // read Respond
- virtual int readResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode &code, std::string &message);
- // write respond
- virtual int writeResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode code, const std::string& message);
- // getRespondCodeContext
- virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (auto & i : SiteToSiteRequest::respondCodeContext) {
- if (i.code == code) {
- return &i;
- }
- }
- return nullptr;
- }
+ virtual void deleteTransaction(const utils::Identifier &transaction_id);
+ virtual std::optional<SiteToSiteResponse> readResponse(const
std::shared_ptr<Transaction> &transaction);
+ virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction,
const SiteToSiteResponse& response);
- // Peer State
- PeerState peer_state_{PeerState::IDLE};
+ bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+ bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>&
transaction, const std::map<std::string, std::string>& attributes);
+ void finalizeSendTransaction(const std::shared_ptr<Transaction>&
transaction, uint64_t sent_bytes);
+ bool sendPacket(const DataPacket& packet);
+ bool sendFlowFile(const std::shared_ptr<Transaction>& transaction,
core::FlowFile& flow_file, core::ProcessSession& session);
- // portId
- utils::Identifier port_id_;
+ void cancel(const utils::Identifier &transaction_id);
+ bool complete(core::ProcessContext& context, const utils::Identifier
&transaction_id);
+ void error(const utils::Identifier &transaction_id);
+ bool confirm(const utils::Identifier &transaction_id);
- // idleTimeout
- std::chrono::milliseconds idle_timeout_{15000};
+ void handleTransactionError(const std::shared_ptr<Transaction>& transaction,
core::ProcessContext& context, const std::exception& exception);
- // Peer Connection
+ PeerState peer_state_{PeerState::IDLE};
+ utils::Identifier port_id_;
+ std::chrono::milliseconds idle_timeout_{15000};
std::unique_ptr<SiteToSitePeer> peer_;
-
- std::atomic<bool> running_{false};
-
- // transaction map
std::map<utils::Identifier, std::shared_ptr<Transaction>>
known_transactions_;
+ std::chrono::nanoseconds batch_send_nanos_{5s};
- // BATCH_SEND_NANOS
- std::chrono::nanoseconds _batchSendNanos = std::chrono::seconds(5);
-
- /***
- * versioning
- */
- uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
- int _currentVersionIndex{0};
- uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
- uint32_t _supportedCodecVersion[1] = {1};
- int _currentCodecVersionIndex{0};
- uint32_t
_currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
+ const std::vector<uint32_t> supported_version_ = {5, 4, 3, 2, 1};
Review Comment:
nitpicking, but `supported_versions_` and `supported_codec_versions_` would
be better names
##########
libminifi/test/libtest/unit/DummyProcessor.h:
##########
@@ -41,7 +41,8 @@ class DummyProcessor : public minifi::core::ProcessorImpl {
.isSensitive(true)
.build();
static constexpr auto Properties = std::array<core::PropertyReference,
2>{SimpleProperty, SensitiveProperty};
- static constexpr auto Relationships =
std::array<core::RelationshipDefinition, 0>{};
+ static constexpr core::RelationshipDefinition Success{"success", "Success
relationship"};
+ static constexpr auto Relationships =
std::array<core::RelationshipDefinition, 1>{Success};
Review Comment:
no type conversion is needed here, so
```suggestion
static constexpr auto Relationships = std::array{Success};
```
would work
##########
libminifi/src/RemoteProcessGroupPort.cpp:
##########
@@ -0,0 +1,379 @@
+/**
+ * @file RemoteProcessGroupPort.cpp
+ * RemoteProcessGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RemoteProcessGroupPort.h"
+
+#include <memory>
+#include <iostream>
+#include <vector>
+#include <string>
+#include <utility>
+#include <cinttypes>
+
+#include "sitetosite/Peer.h"
+#include "Exception.h"
+#include "sitetosite/SiteToSiteFactory.h"
+
+#include "rapidjson/document.h"
+
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Processor.h"
+#include "http/BaseHTTPClient.h"
+#include "controllers/SSLContextService.h"
+#include "utils/net/DNS.h"
+
+#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW,
which conflicts with rapidjson
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi {
+
+namespace {
+std::string buildFullSiteToSiteUrl(const RPG& nifi) {
+ std::stringstream full_url;
+ full_url << nifi.protocol << nifi.host;
+ // don't append port if it is 0 ( undefined )
+ if (nifi.port > 0) {
+ full_url << ":" << std::to_string(nifi.port);
+ }
+ full_url << "/nifi-api/site-to-site";
+ return full_url.str();
+}
+} // namespace
+
+const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME =
"RemoteProcessGroupPortSSLContextService";
+
+void RemoteProcessGroupPort::setURL(const std::string& val) {
+ auto urls = utils::string::split(val, ",");
+ for (const auto& url : urls) {
+ http::URL parsed_url{utils::string::trim(url)};
+ if (parsed_url.isValid()) {
+ logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url,
parsed_url.hostPort());
+ nifi_instances_.push_back({parsed_url.host(), parsed_url.port(),
parsed_url.protocol()});
+ } else {
+ logger_->log_error("Could not parse RPG URL '{}'", url);
+ }
+ }
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient>
RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration&
config) const {
+ config.setSecurityContext(ssl_service_);
+ config.setHTTPProxy(proxy_);
+ config.setIdleTimeout(idle_timeout_);
+ config.setUseCompression(use_compression_);
+ config.setBatchCount(batch_count_);
+ config.setBatchSize(batch_size_);
+ config.setBatchDuration(batch_duration_);
+ config.setTimeout(timeout_);
+
+ return sitetosite::createClient(config);
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient>
RemoteProcessGroupPort::getNextProtocol() {
+ std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+ if (!available_protocols_.try_dequeue(next_protocol)) {
+ if (peer_index_ >= 0) {
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ logger_->log_debug("Creating client from peer {}", peer_index_.load());
+ auto& peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ next_protocol = initializeProtocol(config);
+ } else {
+ logger_->log_debug("Refreshing the peer list since there are none
configured.");
+ refreshPeerList();
+ }
+ }
+ logger_->log_debug("Obtained protocol from available_protocols_");
+ return next_protocol;
+}
+
+void
RemoteProcessGroupPort::returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient>
return_protocol) {
+ auto count = peers_.size();
+ if (max_concurrent_tasks_ > count)
+ count = max_concurrent_tasks_;
+ if (available_protocols_.size_approx() >= count) {
+ logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
+ // let the memory be freed
+ return;
+ }
+ logger_->log_debug("enqueueing protocol {}, have a total of {}",
getUUIDStr(), available_protocols_.size_approx());
+ available_protocols_.enqueue(std::move(return_protocol));
+}
+
+void RemoteProcessGroupPort::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+
+ logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+ if (auto protocol_uuid = context.getProperty(portUUID)) {
+ protocol_uuid_ = *protocol_uuid;
+ }
+
+ auto context_name = context.getProperty(SSLContext);
+ if (!context_name || IsNullOrEmpty(*context_name)) {
+ context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+ }
+
+ std::shared_ptr<core::controller::ControllerService> service =
context.getControllerService(*context_name, getUUID());
+ if (nullptr != service) {
+ ssl_service_ =
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(service);
+ } else {
+ std::string secureStr;
+ if (configure_->get(Configure::nifi_remote_input_secure, secureStr) &&
utils::string::toBool(secureStr).value_or(false)) {
+ ssl_service_ =
std::make_shared<minifi::controllers::SSLContextServiceImpl>(RPG_SSL_CONTEXT_SERVICE_NAME,
configure_);
+ ssl_service_->onEnable();
+ }
+ }
+
+ idle_timeout_ = context.getProperty(idleTimeout) |
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) |
utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
+
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ if (!nifi_instances_.empty()) {
+ refreshPeerList();
+ if (!peers_.empty())
+ peer_index_ = 0;
+ }
+ // populate the site2site protocol for load balancing between them
+ if (!peers_.empty()) {
+ auto count = peers_.size();
+ if (max_concurrent_tasks_ > count)
+ count = max_concurrent_tasks_;
+ for (uint32_t i = 0; i < count; i++) {
+ auto peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ logger_->log_trace("Creating client");
+ auto next_protocol = initializeProtocol(config);
+ logger_->log_trace("Created client, moving into available protocols");
+ returnProtocol(std::move(next_protocol));
+ }
+ } else {
+ // we don't have any peers
+ logger_->log_error("No peers selected during scheduling");
+ }
+}
+
+void RemoteProcessGroupPort::notifyStop() {
+ transmitting_ = false;
+ RPGLatch count(false); // we're just a monitor
+ // we use the latch
+ while (count.getCount() > 0) {
+ }
Review Comment:
This doesn't seem to do anything. First, we create the `RPGLatch` object
with a `count` member which is a pointer to a static atomic integer which is
initialized to 0 and its value is never changed. Next, we check if `*count > 0`
(it isn't), and finally we continue on our way.
It used to do something, because there was an `RPGLatch count;` line in
`RemoteProcessorGroupPort::onTrigger()`. This was supposed to increment the
atomic integer, but it was broken because the constructor and destructor
incremented and decremented the pointer instead of the atomic integer it points
to.
I think we should fix the `RPGLatch` class (or replace it with something
from the standard library if there is a replacement) and put the counter
increment back into `onTrigger()`. Or if it is no longer needed, then we
should remove the `RPGLatch` class.
##########
libminifi/include/RemoteProcessGroupPort.h:
##########
@@ -155,102 +157,90 @@ class RemoteProcessorGroupPort : public
core::ProcessorImpl {
local_network_interface_ = ifc;
}
- std::string getInterface() {
- return local_network_interface_;
- }
+ void setURL(const std::string& val);
- /**
- * Sets the url. Supports a CSV
- */
- void setURL(std::string val) {
- auto urls = utils::string::split(val, ",");
- for (const auto& url : urls) {
- http::URL parsed_url{utils::string::trim(url)};
- if (parsed_url.isValid()) {
- logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url,
parsed_url.hostPort());
- nifi_instances_.push_back({parsed_url.host(), parsed_url.port(),
parsed_url.protocol()});
- } else {
- logger_->log_error("Could not parse RPG URL '{}'", url);
- }
- }
- }
-
- std::vector<RPG> getInstances() const {
+ [[nodiscard]] std::vector<RPG> getInstances() const {
return nifi_instances_;
}
void setHTTPProxy(const http::HTTPProxy &proxy) {
- this->proxy_ = proxy;
+ proxy_ = proxy;
}
- http::HTTPProxy getHTTPProxy() {
- return this->proxy_;
- }
- // refresh remoteSite2SiteInfo via nifi rest api
- std::pair<std::string, int> refreshRemoteSite2SiteInfo();
- // refresh site2site peer list
- void refreshPeerList();
+ [[nodiscard]] http::HTTPProxy getHTTPProxy() const {
+ return proxy_;
+ }
void notifyStop() override;
void enableHTTP() {
- client_type_ = sitetosite::HTTP;
+ client_type_ = sitetosite::ClientType::HTTP;
}
- protected:
- /**
- * Non static in case anything is loaded when this object is re-scheduled
- */
- bool is_http_disabled() {
- auto ptr =
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient",
"HTTPClient");
- if (ptr != nullptr) {
- delete ptr;
- return false;
- } else {
- return true;
- }
+ void setUseCompression(bool use_compression) {
+ use_compression_ = use_compression;
+ }
+
+ [[nodiscard]] bool getUseCompression() const {
+ return use_compression_;
+ }
+
+ void setBatchCount(uint64_t count) {
+ batch_count_ = count;
+ }
+
+ [[nodiscard]] std::optional<uint64_t> getBatchCount() const {
+ return batch_count_;
}
- std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol(bool create);
+ void setBatchSize(uint64_t size) {
+ batch_size_ = size;
+ }
+
+ [[nodiscard]] std::optional<uint64_t> getBatchSize() const {
+ return batch_size_;
+ }
+
+ void setBatchDuration(std::chrono::milliseconds duration) {
+ batch_duration_ = duration;
+ }
+
+ [[nodiscard]] std::optional<std::chrono::milliseconds> getBatchDuration()
const {
+ return batch_duration_;
+ }
+
+ protected:
+ std::optional<std::pair<std::string, uint16_t>>
refreshRemoteSiteToSiteInfo();
+ void refreshPeerList();
+ std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol();
void returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> protocol);
moodycamel::ConcurrentQueue<std::unique_ptr<sitetosite::SiteToSiteClient>>
available_protocols_;
-
std::shared_ptr<Configure> configure_;
- // Transaction Direction
sitetosite::TransferDirection direction_;
- // Transmitting
std::atomic<bool> transmitting_;
- // timeout
- uint64_t timeout_;
- // local network interface
+ std::optional<std::chrono::milliseconds> timeout_;
std::string local_network_interface_;
-
utils::Identifier protocol_uuid_;
-
- std::chrono::milliseconds idle_timeout_ = std::chrono::seconds(15);
-
- // rest API end point info
- std::vector<struct RPG> nifi_instances_;
-
- // http proxy
+ std::chrono::milliseconds idle_timeout_ = 15s;
+ std::vector<RPG> nifi_instances_;
http::HTTPProxy proxy_;
-
- bool bypass_rest_api_;
-
- sitetosite::CLIENT_TYPE client_type_;
-
- // Remote Site2Site Info
- bool site2site_secure_;
+ sitetosite::ClientType client_type_;
std::vector<sitetosite::PeerStatus> peers_;
- std::atomic<int> peer_index_;
+ std::atomic<int64_t> peer_index_;
std::mutex peer_mutex_;
Review Comment:
Do we need both `atomic` and `peer_mutex`?
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+ return
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity,
const utils::Identifier &id) {
+ try {
+ rapidjson::Document root;
+ rapidjson::ParseResult ok = root.Parse(entity.c_str());
+ if (!ok) {
+ std::stringstream ss;
+ ss << "Failed to parse archive lens stack from JSON string with reason: "
+ << rapidjson::GetParseError_En(ok.Code())
+ << " at offset " << ok.Offset();
+
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
+ }
+
+ std::vector<PeerStatus> peer_statuses;
+ if (!root.HasMember("peers") || !root["peers"].IsArray() ||
root["peers"].Size() <= 0) {
+ logger->log_debug("Peers is either not a member or is empty. String to
analyze: {}", entity);
+ return peer_statuses;
+ }
+
+ for (const auto &peer : root["peers"].GetArray()) {
+ std::string hostname;
+ int port = 0;
+ int flow_file_count = 0;
+
+ if (peer.HasMember("hostname") && peer["hostname"].IsString() &&
+ peer.HasMember("port") && peer["port"].IsNumber()) {
+ hostname = peer["hostname"].GetString();
+ port = peer["port"].GetInt();
+ }
+
+ if (peer.HasMember("flowFileCount")) {
+ if (peer["flowFileCount"].IsNumber()) {
+ flow_file_count = gsl::narrow<int>(peer["flowFileCount"].GetInt64());
+ } else {
+ logger->log_debug("Could not parse flowFileCount, so we're going to
continue without it");
+ }
+ }
+
+ // host name and port are required.
+ if (!IsNullOrEmpty(hostname) && port > 0) {
+ PeerStatus status(id, hostname, port, flow_file_count, true);
+ peer_statuses.push_back(std::move(status));
+ } else {
+ logger->log_debug("hostname empty or port is zero. hostname: {}, port:
{}", hostname, port);
+ }
+ }
+ return peer_statuses;
+ } catch (const Exception &exception) {
+ logger->log_debug("Caught Exception {}", exception.what());
+ return std::nullopt;
+ }
+}
+} // namespace
+
+std::shared_ptr<Transaction>
HttpSiteToSiteClient::createTransaction(TransferDirection direction) {
+ std::string dir_str = direction == TransferDirection::SEND ? "input-ports" :
"output-ports";
+ std::stringstream uri;
+ uri << getBaseURI() << "data-transfer/" << dir_str << "/" <<
getPortId().to_string() << "/transactions";
+ auto client = createHttpClient(uri.str(), http::HttpRequestMethod::POST);
+ setSiteToSiteHeaders(*client);
+ client->setConnectionTimeout(std::chrono::milliseconds(5000));
+ client->setContentType("application/json");
+ client->setRequestHeader("Accept", "application/json");
+ client->setRequestHeader("Transfer-Encoding", "chunked");
+ client->setPostFields("");
+ client->submit();
+
+ if (auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream())) {
+ logger_->log_debug("Closing {}", http_stream->getClientRef()->getURL());
+ }
+
+ if (client->getResponseCode() != 201) {
+ peer_->setStream(nullptr);
+ logger_->log_debug("Could not create transaction, received {}",
client->getResponseCode());
+ return nullptr;
+ }
+ // parse the headers
+ auto intent_name = client->getHeaderValue("x-location-uri-intent");
+ if (!utils::string::equalsIgnoreCase(intent_name, "transaction-url")) {
+ logger_->log_debug("Could not create transaction, intent is {}",
intent_name);
+ return nullptr;
+ }
+
+ auto url = client->getHeaderValue("Location");
+ if (IsNullOrEmpty(url)) {
+ logger_->log_debug("Location is empty");
+ return nullptr;
+ }
+
+ org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer>
crcstream(gsl::make_not_null(peer_.get()));
+ auto transaction = std::make_shared<HttpTransaction>(direction,
std::move(crcstream));
+ transaction->initialize(this, url);
+ auto transaction_id = parseTransactionId(url);
+ if (!transaction_id) {
+ logger_->log_debug("Transaction ID is empty");
+ return nullptr;
+ }
+ transaction->setTransactionId(transaction_id.value());
+ std::shared_ptr<minifi::http::HTTPClient> transaction_client;
+ if (transaction->getDirection() == TransferDirection::SEND) {
+ transaction_client = openConnectionForSending(transaction);
+ } else {
+ transaction_client = openConnectionForReceive(transaction);
+ transaction->setDataAvailable(true);
+ // 201 tells us that data is available. 200 would mean that nothing is
available.
+ }
+ gsl_Assert(transaction_client);
+
+ setSiteToSiteHeaders(*transaction_client);
+ peer_->setStream(std::make_unique<http::HttpStream>(transaction_client));
+ logger_->log_debug("Created transaction id -{}-",
transaction->getUUID().to_string());
+ known_transactions_[transaction->getUUID()] = transaction;
+ return transaction;
+}
+
+std::optional<SiteToSiteResponse>
HttpSiteToSiteClient::readResponseForReceiveTransfer(const
std::shared_ptr<Transaction>& transaction) {
+ SiteToSiteResponse response;
+ if (current_code_ == ResponseCode::FINISH_TRANSACTION) {
+ return response;
+ }
+
+ if (transaction->getState() == TransactionState::TRANSACTION_STARTED ||
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+ if (current_code_ == ResponseCode::CONFIRM_TRANSACTION &&
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+ auto stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+ if (!stream->isFinished()) {
+ logger_->log_debug("confirm read for {}, but not finished ",
transaction->getUUIDStr());
+ if (stream->waitForDataAvailable()) {
+ response.code = ResponseCode::CONTINUE_TRANSACTION;
+ return response;
Review Comment:
This style:
```suggestion
return SiteToSiteResponse{.code =
ResponseCode::CONTINUE_TRANSACTION);
```
would be more readable for me. As it is, with a `response` object created at
the start and returned at various points, it's not clear whether the object has
been modified earlier in the function.
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+ return
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity,
const utils::Identifier &id) {
+ try {
+ rapidjson::Document root;
+ rapidjson::ParseResult ok = root.Parse(entity.c_str());
+ if (!ok) {
+ std::stringstream ss;
+ ss << "Failed to parse archive lens stack from JSON string with reason: "
Review Comment:
"archive lens stack"? -- seems to be a (very old) copy-paste error
##########
libminifi/test/unit/SiteToSiteTests.cpp:
##########
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "io/BaseStream.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SiteToSiteHelper.h"
+#include "unit/DummyProcessor.h"
+#include "unit/ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SiteToSiteClientTestAccessor {
+ public:
+ static bool bootstrap(sitetosite::RawSiteToSiteClient& client) {
+ return client.bootstrap();
+ }
+
+ static std::shared_ptr<sitetosite::Transaction>
createTransaction(sitetosite::RawSiteToSiteClient& client,
sitetosite::TransferDirection direction) {
+ return client.createTransaction(direction);
+ }
+
+ static bool sendFlowFile(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::FlowFile&
flow_file, core::ProcessSession& session) {
+ return client.sendFlowFile(transaction, flow_file, session);
+ }
+
+ static bool sendPacket(sitetosite::RawSiteToSiteClient& client, const
sitetosite::DataPacket& packet) {
+ return client.sendPacket(packet);
+ }
+
+ static std::pair<uint64_t, uint64_t>
readFlowFiles(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::ProcessSession&
session) {
+ return client.readFlowFiles(transaction, session);
+ }
+};
+
+void initializeMockBootstrapResponses(const
std::unique_ptr<SiteToSiteResponder>& collector) {
+ const char resource_ok_code =
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::RESOURCE_OK);
+ std::string resp_code;
+ resp_code.insert(resp_code.begin(), resource_ok_code);
+ collector->push_response(resp_code);
+
+ // Handshake response code
+ collector->push_response("R");
+ collector->push_response("C");
+ const char resource_code_properties_ok =
magic_enum::enum_underlying(sitetosite::ResponseCode::PROPERTIES_OK);
+ resp_code = resource_code_properties_ok;
+ collector->push_response(resp_code);
+
+ // Codec Negotiation
+ resp_code = resource_ok_code;
+ collector->push_response(resp_code);
+}
+
+void verifyBootstrapMessages(sitetosite::RawSiteToSiteClient& protocol,
SiteToSiteResponder& collector) {
+ protocol.setUseCompression(false);
+ protocol.setBatchDuration(std::chrono::milliseconds(100));
+ protocol.setBatchCount(5);
+ protocol.setTimeout(std::chrono::milliseconds(20000));
+
+ minifi::utils::Identifier fake_uuid =
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+ protocol.setPortId(fake_uuid);
+
+ REQUIRE(true == SiteToSiteClientTestAccessor::bootstrap(protocol));
+
+ REQUIRE(collector.get_next_client_response() == "NiFi");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "SocketFlowFileProtocol");
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "nifi://fake_host:65433");
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "BATCH_COUNT");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "5");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "BATCH_DURATION");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "100");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "GZIP");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "false");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "PORT_IDENTIFIER");
+ collector.get_next_client_response();
+
REQUIRE(minifi::utils::string::equalsIgnoreCase(collector.get_next_client_response(),
"c56a4180-65aa-42ec-a945-5fd21dec0538"));
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "REQUEST_EXPIRATION_MILLIS");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "20000");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "StandardFlowFileCodec");
+ collector.get_next_client_response(); // codec version
+}
+
+void verifySendResponses(SiteToSiteResponder& collector, const
std::vector<std::string>& expected_responses) {
+ for (const auto& expected_response : expected_responses) {
+ if (expected_response.empty()) {
+ collector.get_next_client_response();
+ continue;
+ }
+ CHECK(expected_response == collector.get_next_client_response());
+ }
+}
+
+TEST_CASE("TestSetPortId", "[S2S]") {
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::make_unique<org::apache::nifi::minifi::io::BufferStream>(),
"fake_host", 65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+ auto fake_uuid =
minifi::utils::Identifier::parse("c56a4180-65aa-42ec-a945-5fd21dec0538").value();
+ protocol.setPortId(fake_uuid);
+ REQUIRE(fake_uuid == protocol.getPortId());
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using data packet", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+ auto collector_ptr = collector.get();
+
+ initializeMockBootstrapResponses(collector);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::vector<std::string> expected_responses;
+ std::string payload = "Test MiNiFi payload";
+
+ expected_responses.push_back(""); // attribute count 0
+ expected_responses.push_back(""); // payload length
+ expected_responses.push_back(payload);
+
+ verifyBootstrapMessages(protocol, *collector_ptr);
+
+ // start to send the stuff
+ auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol,
sitetosite::TransferDirection::SEND);
+ REQUIRE(transaction);
+ collector_ptr->get_next_client_response();
+ REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+ std::map<std::string, std::string> attributes;
+ sitetosite::DataPacket packet(transaction, attributes, payload);
+ REQUIRE(SiteToSiteClientTestAccessor::sendPacket(protocol, packet));
+ verifySendResponses(*collector_ptr, expected_responses);
+ REQUIRE(transaction->getCRC() == 4000670133);
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using flowfile data", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+ auto collector_ptr = collector.get();
+
+ initializeMockBootstrapResponses(collector);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::vector<std::string> expected_responses;
+ std::string payload = "Test MiNiFi payload";
+
+ expected_responses.push_back(""); // attribute count
+ expected_responses.push_back(""); // attribute key length
+ expected_responses.push_back("filename");
+ expected_responses.push_back(""); // attribute value length
+ expected_responses.push_back("myfile");
+ expected_responses.push_back(""); // attribute key length
+ expected_responses.push_back("flow.id");
+ expected_responses.push_back(""); // attribute value length
+ expected_responses.push_back("test");
+ expected_responses.push_back(""); // payload length
+ expected_responses.push_back(payload);
+
+ protocol.setBatchDuration(std::chrono::milliseconds(100));
+ protocol.setBatchCount(5);
+ protocol.setTimeout(std::chrono::milliseconds(20000));
+
+ auto fake_uuid =
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+ protocol.setPortId(fake_uuid);
+
+ verifyBootstrapMessages(protocol, *collector_ptr);
+
+ // start to send the stuff
+ auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol,
sitetosite::TransferDirection::SEND);
+ REQUIRE(transaction);
+ collector_ptr->get_next_client_response();
+ REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+
+ TestController test_controller_;
+ TestController::PlanConfig plan_config_;
+ std::shared_ptr<TestPlan> test_plan =
test_controller_.createPlan(plan_config_);
+ test_plan->addProcessor("DummyProcessor", "dummyProcessor");
+ std::shared_ptr<minifi::core::ProcessContext> context = [&] {
test_plan->runNextProcessor(); return test_plan->getCurrentContext(); }();
+ std::unique_ptr<minifi::core::ProcessSession> session =
std::make_unique<core::ProcessSessionImpl>(context);
+
+ auto flow_file = session->create();
+ session->write(flow_file, [&](const std::shared_ptr<io::OutputStream>&
output_stream) {
+ std::span<const std::byte> span{reinterpret_cast<const
std::byte*>(payload.data()), payload.size()};
+ output_stream->write(span);
+ return payload.size();
+ });
+ flow_file->updateAttribute("filename", "myfile");
+ flow_file->updateAttribute("flow.id", "test");
+ session->transfer(flow_file, DummyProcessor::Success);
+ session->commit();
+
+ std::map<std::string, std::string> attributes;
+ REQUIRE(SiteToSiteClientTestAccessor::sendFlowFile(protocol, transaction,
*flow_file, *session));
+ verifySendResponses(*collector_ptr, expected_responses);
+ REQUIRE(transaction->getCRC() == 2886786428);
+}
+
+TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+
+ const char negotiated_abort_code =
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::NEGOTIATED_ABORT);
+ std::string resp_code;
+ resp_code.insert(resp_code.begin(), negotiated_abort_code);
+ collector->push_response(resp_code);
+ collector->push_response(resp_code);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+ minifi::utils::Identifier fake_uuid;
+ fake_uuid = uuid_str;
+ protocol.setPortId(fake_uuid);
+
+ REQUIRE_FALSE(SiteToSiteClientTestAccessor::bootstrap(protocol));
+}
+
+void iniitalizeMockRemoteClientReceiveDataResponses(SiteToSiteResponder&
collector) {
+ collector.push_response("R");
+ collector.push_response("C");
+ auto addResponseCode = [&collector](sitetosite::ResponseCode code) {
+ std::string resp_code;
+ resp_code.insert(resp_code.begin(), magic_enum::enum_underlying(code));
+ collector.push_response(resp_code);
+ };
+ addResponseCode(sitetosite::ResponseCode::MORE_DATA);
Review Comment:
I would rewrite this to
```suggestion
collector.push_response(std::string{static_cast<char>(magic_enum::enum_underlying(sitetosite::ResponseCode::MORE_DATA))});
```
because why use 6 lines of code where 1 will do
##########
libminifi/test/unit/SiteToSiteTests.cpp:
##########
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "io/BaseStream.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SiteToSiteHelper.h"
+#include "unit/DummyProcessor.h"
+#include "unit/ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SiteToSiteClientTestAccessor {
+ public:
+ static bool bootstrap(sitetosite::RawSiteToSiteClient& client) {
+ return client.bootstrap();
+ }
+
+ static std::shared_ptr<sitetosite::Transaction>
createTransaction(sitetosite::RawSiteToSiteClient& client,
sitetosite::TransferDirection direction) {
+ return client.createTransaction(direction);
+ }
+
+ static bool sendFlowFile(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::FlowFile&
flow_file, core::ProcessSession& session) {
+ return client.sendFlowFile(transaction, flow_file, session);
+ }
+
+ static bool sendPacket(sitetosite::RawSiteToSiteClient& client, const
sitetosite::DataPacket& packet) {
+ return client.sendPacket(packet);
+ }
+
+ static std::pair<uint64_t, uint64_t>
readFlowFiles(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::ProcessSession&
session) {
+ return client.readFlowFiles(transaction, session);
+ }
+};
+
+void initializeMockBootstrapResponses(const
std::unique_ptr<SiteToSiteResponder>& collector) {
+ const char resource_ok_code =
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::RESOURCE_OK);
+ std::string resp_code;
+ resp_code.insert(resp_code.begin(), resource_ok_code);
+ collector->push_response(resp_code);
+
+ // Handshake response code
+ collector->push_response("R");
+ collector->push_response("C");
+ const char resource_code_properties_ok =
magic_enum::enum_underlying(sitetosite::ResponseCode::PROPERTIES_OK);
+ resp_code = resource_code_properties_ok;
+ collector->push_response(resp_code);
+
+ // Codec Negotiation
+ resp_code = resource_ok_code;
+ collector->push_response(resp_code);
+}
+
+void verifyBootstrapMessages(sitetosite::RawSiteToSiteClient& protocol,
SiteToSiteResponder& collector) {
+ protocol.setUseCompression(false);
+ protocol.setBatchDuration(std::chrono::milliseconds(100));
+ protocol.setBatchCount(5);
+ protocol.setTimeout(std::chrono::milliseconds(20000));
+
+ minifi::utils::Identifier fake_uuid =
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+ protocol.setPortId(fake_uuid);
+
+ REQUIRE(true == SiteToSiteClientTestAccessor::bootstrap(protocol));
+
+ REQUIRE(collector.get_next_client_response() == "NiFi");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "SocketFlowFileProtocol");
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "nifi://fake_host:65433");
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "BATCH_COUNT");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "5");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "BATCH_DURATION");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "100");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "GZIP");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "false");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "PORT_IDENTIFIER");
+ collector.get_next_client_response();
+
REQUIRE(minifi::utils::string::equalsIgnoreCase(collector.get_next_client_response(),
"c56a4180-65aa-42ec-a945-5fd21dec0538"));
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "REQUEST_EXPIRATION_MILLIS");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "20000");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "StandardFlowFileCodec");
+ collector.get_next_client_response(); // codec version
+}
+
+void verifySendResponses(SiteToSiteResponder& collector, const
std::vector<std::string>& expected_responses) {
+ for (const auto& expected_response : expected_responses) {
+ if (expected_response.empty()) {
+ collector.get_next_client_response();
+ continue;
+ }
+ CHECK(expected_response == collector.get_next_client_response());
+ }
+}
+
+TEST_CASE("TestSetPortId", "[S2S]") {
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::make_unique<org::apache::nifi::minifi::io::BufferStream>(),
"fake_host", 65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+ auto fake_uuid =
minifi::utils::Identifier::parse("c56a4180-65aa-42ec-a945-5fd21dec0538").value();
+ protocol.setPortId(fake_uuid);
+ REQUIRE(fake_uuid == protocol.getPortId());
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using data packet", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+ auto collector_ptr = collector.get();
+
+ initializeMockBootstrapResponses(collector);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::vector<std::string> expected_responses;
+ std::string payload = "Test MiNiFi payload";
+
+ expected_responses.push_back(""); // attribute count 0
+ expected_responses.push_back(""); // payload length
+ expected_responses.push_back(payload);
+
+ verifyBootstrapMessages(protocol, *collector_ptr);
+
+ // start to send the stuff
+ auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol,
sitetosite::TransferDirection::SEND);
+ REQUIRE(transaction);
+ collector_ptr->get_next_client_response();
+ REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+ std::map<std::string, std::string> attributes;
+ sitetosite::DataPacket packet(transaction, attributes, payload);
+ REQUIRE(SiteToSiteClientTestAccessor::sendPacket(protocol, packet));
+ verifySendResponses(*collector_ptr, expected_responses);
+ REQUIRE(transaction->getCRC() == 4000670133);
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using flowfile data", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+ auto collector_ptr = collector.get();
+
+ initializeMockBootstrapResponses(collector);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::vector<std::string> expected_responses;
+ std::string payload = "Test MiNiFi payload";
+
+ expected_responses.push_back(""); // attribute count
+ expected_responses.push_back(""); // attribute key length
+ expected_responses.push_back("filename");
+ expected_responses.push_back(""); // attribute value length
+ expected_responses.push_back("myfile");
+ expected_responses.push_back(""); // attribute key length
+ expected_responses.push_back("flow.id");
+ expected_responses.push_back(""); // attribute value length
+ expected_responses.push_back("test");
+ expected_responses.push_back(""); // payload length
+ expected_responses.push_back(payload);
+
+ protocol.setBatchDuration(std::chrono::milliseconds(100));
+ protocol.setBatchCount(5);
+ protocol.setTimeout(std::chrono::milliseconds(20000));
+
+ auto fake_uuid =
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+ protocol.setPortId(fake_uuid);
+
+ verifyBootstrapMessages(protocol, *collector_ptr);
+
+ // start to send the stuff
+ auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol,
sitetosite::TransferDirection::SEND);
+ REQUIRE(transaction);
+ collector_ptr->get_next_client_response();
+ REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+
+ TestController test_controller_;
+ TestController::PlanConfig plan_config_;
+ std::shared_ptr<TestPlan> test_plan =
test_controller_.createPlan(plan_config_);
+ test_plan->addProcessor("DummyProcessor", "dummyProcessor");
+ std::shared_ptr<minifi::core::ProcessContext> context = [&] {
test_plan->runNextProcessor(); return test_plan->getCurrentContext(); }();
+ std::unique_ptr<minifi::core::ProcessSession> session =
std::make_unique<core::ProcessSessionImpl>(context);
+
+ auto flow_file = session->create();
+ session->write(flow_file, [&](const std::shared_ptr<io::OutputStream>&
output_stream) {
+ std::span<const std::byte> span{reinterpret_cast<const
std::byte*>(payload.data()), payload.size()};
+ output_stream->write(span);
+ return payload.size();
+ });
+ flow_file->updateAttribute("filename", "myfile");
+ flow_file->updateAttribute("flow.id", "test");
+ session->transfer(flow_file, DummyProcessor::Success);
+ session->commit();
+
+ std::map<std::string, std::string> attributes;
+ REQUIRE(SiteToSiteClientTestAccessor::sendFlowFile(protocol, transaction,
*flow_file, *session));
+ verifySendResponses(*collector_ptr, expected_responses);
+ REQUIRE(transaction->getCRC() == 2886786428);
+}
+
+TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+
+ const char negotiated_abort_code =
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::NEGOTIATED_ABORT);
+ std::string resp_code;
+ resp_code.insert(resp_code.begin(), negotiated_abort_code);
+ collector->push_response(resp_code);
+ collector->push_response(resp_code);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+ minifi::utils::Identifier fake_uuid;
+ fake_uuid = uuid_str;
+ protocol.setPortId(fake_uuid);
+
+ REQUIRE_FALSE(SiteToSiteClientTestAccessor::bootstrap(protocol));
+}
+
+void iniitalizeMockRemoteClientReceiveDataResponses(SiteToSiteResponder&
collector) {
Review Comment:
typo:
```suggestion
void initializeMockRemoteClientReceiveDataResponses(SiteToSiteResponder&
collector) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]