lordgamez commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2253639649
##########
libminifi/test/libtest/unit/DummyProcessor.h:
##########
@@ -41,7 +41,8 @@ class DummyProcessor : public minifi::core::ProcessorImpl {
.isSensitive(true)
.build();
static constexpr auto Properties = std::array<core::PropertyReference,
2>{SimpleProperty, SensitiveProperty};
- static constexpr auto Relationships =
std::array<core::RelationshipDefinition, 0>{};
+ static constexpr core::RelationshipDefinition Success{"success", "Success
relationship"};
+ static constexpr auto Relationships =
std::array<core::RelationshipDefinition, 1>{Success};
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
##########
libminifi/include/RemoteProcessGroupPort.h:
##########
@@ -155,102 +157,90 @@ class RemoteProcessorGroupPort : public
core::ProcessorImpl {
local_network_interface_ = ifc;
}
- std::string getInterface() {
- return local_network_interface_;
- }
+ void setURL(const std::string& val);
- /**
- * Sets the url. Supports a CSV
- */
- void setURL(std::string val) {
- auto urls = utils::string::split(val, ",");
- for (const auto& url : urls) {
- http::URL parsed_url{utils::string::trim(url)};
- if (parsed_url.isValid()) {
- logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url,
parsed_url.hostPort());
- nifi_instances_.push_back({parsed_url.host(), parsed_url.port(),
parsed_url.protocol()});
- } else {
- logger_->log_error("Could not parse RPG URL '{}'", url);
- }
- }
- }
-
- std::vector<RPG> getInstances() const {
+ [[nodiscard]] std::vector<RPG> getInstances() const {
return nifi_instances_;
}
void setHTTPProxy(const http::HTTPProxy &proxy) {
- this->proxy_ = proxy;
+ proxy_ = proxy;
}
- http::HTTPProxy getHTTPProxy() {
- return this->proxy_;
- }
- // refresh remoteSite2SiteInfo via nifi rest api
- std::pair<std::string, int> refreshRemoteSite2SiteInfo();
- // refresh site2site peer list
- void refreshPeerList();
+ [[nodiscard]] http::HTTPProxy getHTTPProxy() const {
+ return proxy_;
+ }
void notifyStop() override;
void enableHTTP() {
- client_type_ = sitetosite::HTTP;
+ client_type_ = sitetosite::ClientType::HTTP;
}
- protected:
- /**
- * Non static in case anything is loaded when this object is re-scheduled
- */
- bool is_http_disabled() {
- auto ptr =
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient",
"HTTPClient");
- if (ptr != nullptr) {
- delete ptr;
- return false;
- } else {
- return true;
- }
+ void setUseCompression(bool use_compression) {
+ use_compression_ = use_compression;
+ }
+
+ [[nodiscard]] bool getUseCompression() const {
+ return use_compression_;
+ }
+
+ void setBatchCount(uint64_t count) {
+ batch_count_ = count;
+ }
+
+ [[nodiscard]] std::optional<uint64_t> getBatchCount() const {
+ return batch_count_;
}
- std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol(bool create);
+ void setBatchSize(uint64_t size) {
+ batch_size_ = size;
+ }
+
+ [[nodiscard]] std::optional<uint64_t> getBatchSize() const {
+ return batch_size_;
+ }
+
+ void setBatchDuration(std::chrono::milliseconds duration) {
+ batch_duration_ = duration;
+ }
+
+ [[nodiscard]] std::optional<std::chrono::milliseconds> getBatchDuration()
const {
+ return batch_duration_;
+ }
+
+ protected:
+ std::optional<std::pair<std::string, uint16_t>>
refreshRemoteSiteToSiteInfo();
+ void refreshPeerList();
+ std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol();
void returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> protocol);
moodycamel::ConcurrentQueue<std::unique_ptr<sitetosite::SiteToSiteClient>>
available_protocols_;
-
std::shared_ptr<Configure> configure_;
- // Transaction Direction
sitetosite::TransferDirection direction_;
- // Transmitting
std::atomic<bool> transmitting_;
- // timeout
- uint64_t timeout_;
- // local network interface
+ std::optional<std::chrono::milliseconds> timeout_;
std::string local_network_interface_;
-
utils::Identifier protocol_uuid_;
-
- std::chrono::milliseconds idle_timeout_ = std::chrono::seconds(15);
-
- // rest API end point info
- std::vector<struct RPG> nifi_instances_;
-
- // http proxy
+ std::chrono::milliseconds idle_timeout_ = 15s;
+ std::vector<RPG> nifi_instances_;
http::HTTPProxy proxy_;
-
- bool bypass_rest_api_;
-
- sitetosite::CLIENT_TYPE client_type_;
-
- // Remote Site2Site Info
- bool site2site_secure_;
+ sitetosite::ClientType client_type_;
std::vector<sitetosite::PeerStatus> peers_;
- std::atomic<int> peer_index_;
+ std::atomic<int64_t> peer_index_;
std::mutex peer_mutex_;
Review Comment:
I don't think we need the atomic, but we need an additional block guarded by
a mutex, updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/807bd4b12eccb288119d2cab9f78287f78d5ba2d
##########
libminifi/src/RemoteProcessGroupPort.cpp:
##########
@@ -0,0 +1,379 @@
+/**
+ * @file RemoteProcessGroupPort.cpp
+ * RemoteProcessGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RemoteProcessGroupPort.h"
+
+#include <memory>
+#include <iostream>
+#include <vector>
+#include <string>
+#include <utility>
+#include <cinttypes>
+
+#include "sitetosite/Peer.h"
+#include "Exception.h"
+#include "sitetosite/SiteToSiteFactory.h"
+
+#include "rapidjson/document.h"
+
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Processor.h"
+#include "http/BaseHTTPClient.h"
+#include "controllers/SSLContextService.h"
+#include "utils/net/DNS.h"
+
+#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW,
which conflicts with rapidjson
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi {
+
+namespace {
+std::string buildFullSiteToSiteUrl(const RPG& nifi) {
+ std::stringstream full_url;
+ full_url << nifi.protocol << nifi.host;
+ // don't append port if it is 0 ( undefined )
+ if (nifi.port > 0) {
+ full_url << ":" << std::to_string(nifi.port);
+ }
+ full_url << "/nifi-api/site-to-site";
+ return full_url.str();
+}
+} // namespace
+
+const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME =
"RemoteProcessGroupPortSSLContextService";
+
+void RemoteProcessGroupPort::setURL(const std::string& val) {
+ auto urls = utils::string::split(val, ",");
+ for (const auto& url : urls) {
+ http::URL parsed_url{utils::string::trim(url)};
+ if (parsed_url.isValid()) {
+ logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url,
parsed_url.hostPort());
+ nifi_instances_.push_back({parsed_url.host(), parsed_url.port(),
parsed_url.protocol()});
+ } else {
+ logger_->log_error("Could not parse RPG URL '{}'", url);
+ }
+ }
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient>
RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration&
config) const {
+ config.setSecurityContext(ssl_service_);
+ config.setHTTPProxy(proxy_);
+ config.setIdleTimeout(idle_timeout_);
+ config.setUseCompression(use_compression_);
+ config.setBatchCount(batch_count_);
+ config.setBatchSize(batch_size_);
+ config.setBatchDuration(batch_duration_);
+ config.setTimeout(timeout_);
+
+ return sitetosite::createClient(config);
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient>
RemoteProcessGroupPort::getNextProtocol() {
+ std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+ if (!available_protocols_.try_dequeue(next_protocol)) {
+ if (peer_index_ >= 0) {
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ logger_->log_debug("Creating client from peer {}", peer_index_.load());
+ auto& peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ next_protocol = initializeProtocol(config);
+ } else {
+ logger_->log_debug("Refreshing the peer list since there are none
configured.");
+ refreshPeerList();
+ }
+ }
+ logger_->log_debug("Obtained protocol from available_protocols_");
+ return next_protocol;
+}
+
+void
RemoteProcessGroupPort::returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient>
return_protocol) {
+ auto count = peers_.size();
+ if (max_concurrent_tasks_ > count)
+ count = max_concurrent_tasks_;
+ if (available_protocols_.size_approx() >= count) {
+ logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
+ // let the memory be freed
+ return;
+ }
+ logger_->log_debug("enqueueing protocol {}, have a total of {}",
getUUIDStr(), available_protocols_.size_approx());
+ available_protocols_.enqueue(std::move(return_protocol));
+}
+
+void RemoteProcessGroupPort::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+
+ logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+ if (auto protocol_uuid = context.getProperty(portUUID)) {
+ protocol_uuid_ = *protocol_uuid;
+ }
+
+ auto context_name = context.getProperty(SSLContext);
+ if (!context_name || IsNullOrEmpty(*context_name)) {
+ context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+ }
+
+ std::shared_ptr<core::controller::ControllerService> service =
context.getControllerService(*context_name, getUUID());
+ if (nullptr != service) {
+ ssl_service_ =
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(service);
+ } else {
+ std::string secureStr;
+ if (configure_->get(Configure::nifi_remote_input_secure, secureStr) &&
utils::string::toBool(secureStr).value_or(false)) {
+ ssl_service_ =
std::make_shared<minifi::controllers::SSLContextServiceImpl>(RPG_SSL_CONTEXT_SERVICE_NAME,
configure_);
+ ssl_service_->onEnable();
+ }
+ }
+
+ idle_timeout_ = context.getProperty(idleTimeout) |
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) |
utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
+
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ if (!nifi_instances_.empty()) {
+ refreshPeerList();
+ if (!peers_.empty())
+ peer_index_ = 0;
+ }
+ // populate the site2site protocol for load balancing between them
+ if (!peers_.empty()) {
+ auto count = peers_.size();
+ if (max_concurrent_tasks_ > count)
+ count = max_concurrent_tasks_;
+ for (uint32_t i = 0; i < count; i++) {
+ auto peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ logger_->log_trace("Creating client");
+ auto next_protocol = initializeProtocol(config);
+ logger_->log_trace("Created client, moving into available protocols");
+ returnProtocol(std::move(next_protocol));
+ }
+ } else {
+ // we don't have any peers
+ logger_->log_error("No peers selected during scheduling");
+ }
+}
+
+void RemoteProcessGroupPort::notifyStop() {
+ transmitting_ = false;
+ RPGLatch count(false); // we're just a monitor
+ // we use the latch
+ while (count.getCount() > 0) {
+ }
Review Comment:
You are right, at the moment I don't see the use for the RPGLatch so I think
it can be removed, I removed it in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/0582224996e0e59fc2f95892ee12d96a76c24535
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+ return
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity,
const utils::Identifier &id) {
+ try {
+ rapidjson::Document root;
+ rapidjson::ParseResult ok = root.Parse(entity.c_str());
+ if (!ok) {
+ std::stringstream ss;
+ ss << "Failed to parse archive lens stack from JSON string with reason: "
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+ return
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity,
const utils::Identifier &id) {
+ try {
+ rapidjson::Document root;
+ rapidjson::ParseResult ok = root.Parse(entity.c_str());
+ if (!ok) {
+ std::stringstream ss;
+ ss << "Failed to parse archive lens stack from JSON string with reason: "
+ << rapidjson::GetParseError_En(ok.Code())
+ << " at offset " << ok.Offset();
+
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
+ }
+
+ std::vector<PeerStatus> peer_statuses;
+ if (!root.HasMember("peers") || !root["peers"].IsArray() ||
root["peers"].Size() <= 0) {
+ logger->log_debug("Peers is either not a member or is empty. String to
analyze: {}", entity);
+ return peer_statuses;
+ }
+
+ for (const auto &peer : root["peers"].GetArray()) {
+ std::string hostname;
+ int port = 0;
+ int flow_file_count = 0;
+
+ if (peer.HasMember("hostname") && peer["hostname"].IsString() &&
+ peer.HasMember("port") && peer["port"].IsNumber()) {
+ hostname = peer["hostname"].GetString();
+ port = peer["port"].GetInt();
+ }
+
+ if (peer.HasMember("flowFileCount")) {
+ if (peer["flowFileCount"].IsNumber()) {
+ flow_file_count = gsl::narrow<int>(peer["flowFileCount"].GetInt64());
+ } else {
+ logger->log_debug("Could not parse flowFileCount, so we're going to
continue without it");
+ }
+ }
+
+ // host name and port are required.
+ if (!IsNullOrEmpty(hostname) && port > 0) {
+ PeerStatus status(id, hostname, port, flow_file_count, true);
+ peer_statuses.push_back(std::move(status));
+ } else {
+ logger->log_debug("hostname empty or port is zero. hostname: {}, port:
{}", hostname, port);
+ }
+ }
+ return peer_statuses;
+ } catch (const Exception &exception) {
+ logger->log_debug("Caught Exception {}", exception.what());
+ return std::nullopt;
+ }
+}
+} // namespace
+
+std::shared_ptr<Transaction>
HttpSiteToSiteClient::createTransaction(TransferDirection direction) {
+ std::string dir_str = direction == TransferDirection::SEND ? "input-ports" :
"output-ports";
+ std::stringstream uri;
+ uri << getBaseURI() << "data-transfer/" << dir_str << "/" <<
getPortId().to_string() << "/transactions";
+ auto client = createHttpClient(uri.str(), http::HttpRequestMethod::POST);
+ setSiteToSiteHeaders(*client);
+ client->setConnectionTimeout(std::chrono::milliseconds(5000));
+ client->setContentType("application/json");
+ client->setRequestHeader("Accept", "application/json");
+ client->setRequestHeader("Transfer-Encoding", "chunked");
+ client->setPostFields("");
+ client->submit();
+
+ if (auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream())) {
+ logger_->log_debug("Closing {}", http_stream->getClientRef()->getURL());
+ }
+
+ if (client->getResponseCode() != 201) {
+ peer_->setStream(nullptr);
+ logger_->log_debug("Could not create transaction, received {}",
client->getResponseCode());
+ return nullptr;
+ }
+ // parse the headers
+ auto intent_name = client->getHeaderValue("x-location-uri-intent");
+ if (!utils::string::equalsIgnoreCase(intent_name, "transaction-url")) {
+ logger_->log_debug("Could not create transaction, intent is {}",
intent_name);
+ return nullptr;
+ }
+
+ auto url = client->getHeaderValue("Location");
+ if (IsNullOrEmpty(url)) {
+ logger_->log_debug("Location is empty");
+ return nullptr;
+ }
+
+ org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer>
crcstream(gsl::make_not_null(peer_.get()));
+ auto transaction = std::make_shared<HttpTransaction>(direction,
std::move(crcstream));
+ transaction->initialize(this, url);
+ auto transaction_id = parseTransactionId(url);
+ if (!transaction_id) {
+ logger_->log_debug("Transaction ID is empty");
+ return nullptr;
+ }
+ transaction->setTransactionId(transaction_id.value());
+ std::shared_ptr<minifi::http::HTTPClient> transaction_client;
+ if (transaction->getDirection() == TransferDirection::SEND) {
+ transaction_client = openConnectionForSending(transaction);
+ } else {
+ transaction_client = openConnectionForReceive(transaction);
+ transaction->setDataAvailable(true);
+ // 201 tells us that data is available. 200 would mean that nothing is
available.
+ }
+ gsl_Assert(transaction_client);
+
+ setSiteToSiteHeaders(*transaction_client);
+ peer_->setStream(std::make_unique<http::HttpStream>(transaction_client));
+ logger_->log_debug("Created transaction id -{}-",
transaction->getUUID().to_string());
+ known_transactions_[transaction->getUUID()] = transaction;
+ return transaction;
+}
+
+std::optional<SiteToSiteResponse>
HttpSiteToSiteClient::readResponseForReceiveTransfer(const
std::shared_ptr<Transaction>& transaction) {
+ SiteToSiteResponse response;
+ if (current_code_ == ResponseCode::FINISH_TRANSACTION) {
+ return response;
+ }
+
+ if (transaction->getState() == TransactionState::TRANSACTION_STARTED ||
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+ if (current_code_ == ResponseCode::CONFIRM_TRANSACTION &&
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+ auto stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+ if (!stream->isFinished()) {
+ logger_->log_debug("confirm read for {}, but not finished ",
transaction->getUUIDStr());
+ if (stream->waitForDataAvailable()) {
+ response.code = ResponseCode::CONTINUE_TRANSACTION;
+ return response;
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/ad00aeac318bd66a106110a27f2b50acbd27c1e9
##########
libminifi/test/unit/SiteToSiteTests.cpp:
##########
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "io/BaseStream.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SiteToSiteHelper.h"
+#include "unit/DummyProcessor.h"
+#include "unit/ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SiteToSiteClientTestAccessor {
+ public:
+ static bool bootstrap(sitetosite::RawSiteToSiteClient& client) {
+ return client.bootstrap();
+ }
+
+ static std::shared_ptr<sitetosite::Transaction>
createTransaction(sitetosite::RawSiteToSiteClient& client,
sitetosite::TransferDirection direction) {
+ return client.createTransaction(direction);
+ }
+
+ static bool sendFlowFile(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::FlowFile&
flow_file, core::ProcessSession& session) {
+ return client.sendFlowFile(transaction, flow_file, session);
+ }
+
+ static bool sendPacket(sitetosite::RawSiteToSiteClient& client, const
sitetosite::DataPacket& packet) {
+ return client.sendPacket(packet);
+ }
+
+ static std::pair<uint64_t, uint64_t>
readFlowFiles(sitetosite::RawSiteToSiteClient& client, const
std::shared_ptr<sitetosite::Transaction>& transaction, core::ProcessSession&
session) {
+ return client.readFlowFiles(transaction, session);
+ }
+};
+
+void initializeMockBootstrapResponses(const
std::unique_ptr<SiteToSiteResponder>& collector) {
+ const char resource_ok_code =
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::RESOURCE_OK);
+ std::string resp_code;
+ resp_code.insert(resp_code.begin(), resource_ok_code);
+ collector->push_response(resp_code);
+
+ // Handshake response code
+ collector->push_response("R");
+ collector->push_response("C");
+ const char resource_code_properties_ok =
magic_enum::enum_underlying(sitetosite::ResponseCode::PROPERTIES_OK);
+ resp_code = resource_code_properties_ok;
+ collector->push_response(resp_code);
+
+ // Codec Negotiation
+ resp_code = resource_ok_code;
+ collector->push_response(resp_code);
+}
+
+void verifyBootstrapMessages(sitetosite::RawSiteToSiteClient& protocol,
SiteToSiteResponder& collector) {
+ protocol.setUseCompression(false);
+ protocol.setBatchDuration(std::chrono::milliseconds(100));
+ protocol.setBatchCount(5);
+ protocol.setTimeout(std::chrono::milliseconds(20000));
+
+ minifi::utils::Identifier fake_uuid =
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+ protocol.setPortId(fake_uuid);
+
+ REQUIRE(true == SiteToSiteClientTestAccessor::bootstrap(protocol));
+
+ REQUIRE(collector.get_next_client_response() == "NiFi");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "SocketFlowFileProtocol");
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "nifi://fake_host:65433");
+ collector.get_next_client_response();
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "BATCH_COUNT");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "5");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "BATCH_DURATION");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "100");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "GZIP");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "false");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "PORT_IDENTIFIER");
+ collector.get_next_client_response();
+
REQUIRE(minifi::utils::string::equalsIgnoreCase(collector.get_next_client_response(),
"c56a4180-65aa-42ec-a945-5fd21dec0538"));
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "REQUEST_EXPIRATION_MILLIS");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "20000");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
+ collector.get_next_client_response();
+ REQUIRE(collector.get_next_client_response() == "StandardFlowFileCodec");
+ collector.get_next_client_response(); // codec version
+}
+
+void verifySendResponses(SiteToSiteResponder& collector, const
std::vector<std::string>& expected_responses) {
+ for (const auto& expected_response : expected_responses) {
+ if (expected_response.empty()) {
+ collector.get_next_client_response();
+ continue;
+ }
+ CHECK(expected_response == collector.get_next_client_response());
+ }
+}
+
+TEST_CASE("TestSetPortId", "[S2S]") {
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::make_unique<org::apache::nifi::minifi::io::BufferStream>(),
"fake_host", 65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+ auto fake_uuid =
minifi::utils::Identifier::parse("c56a4180-65aa-42ec-a945-5fd21dec0538").value();
+ protocol.setPortId(fake_uuid);
+ REQUIRE(fake_uuid == protocol.getPortId());
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using data packet", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+ auto collector_ptr = collector.get();
+
+ initializeMockBootstrapResponses(collector);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::vector<std::string> expected_responses;
+ std::string payload = "Test MiNiFi payload";
+
+ expected_responses.push_back(""); // attribute count 0
+ expected_responses.push_back(""); // payload length
+ expected_responses.push_back(payload);
+
+ verifyBootstrapMessages(protocol, *collector_ptr);
+
+ // start to send the stuff
+ auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol,
sitetosite::TransferDirection::SEND);
+ REQUIRE(transaction);
+ collector_ptr->get_next_client_response();
+ REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+ std::map<std::string, std::string> attributes;
+ sitetosite::DataPacket packet(transaction, attributes, payload);
+ REQUIRE(SiteToSiteClientTestAccessor::sendPacket(protocol, packet));
+ verifySendResponses(*collector_ptr, expected_responses);
+ REQUIRE(transaction->getCRC() == 4000670133);
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using flowfile data", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+ auto collector_ptr = collector.get();
+
+ initializeMockBootstrapResponses(collector);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::vector<std::string> expected_responses;
+ std::string payload = "Test MiNiFi payload";
+
+ expected_responses.push_back(""); // attribute count
+ expected_responses.push_back(""); // attribute key length
+ expected_responses.push_back("filename");
+ expected_responses.push_back(""); // attribute value length
+ expected_responses.push_back("myfile");
+ expected_responses.push_back(""); // attribute key length
+ expected_responses.push_back("flow.id");
+ expected_responses.push_back(""); // attribute value length
+ expected_responses.push_back("test");
+ expected_responses.push_back(""); // payload length
+ expected_responses.push_back(payload);
+
+ protocol.setBatchDuration(std::chrono::milliseconds(100));
+ protocol.setBatchCount(5);
+ protocol.setTimeout(std::chrono::milliseconds(20000));
+
+ auto fake_uuid =
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+ protocol.setPortId(fake_uuid);
+
+ verifyBootstrapMessages(protocol, *collector_ptr);
+
+ // start to send the stuff
+ auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol,
sitetosite::TransferDirection::SEND);
+ REQUIRE(transaction);
+ collector_ptr->get_next_client_response();
+ REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+
+ TestController test_controller_;
+ TestController::PlanConfig plan_config_;
+ std::shared_ptr<TestPlan> test_plan =
test_controller_.createPlan(plan_config_);
+ test_plan->addProcessor("DummyProcessor", "dummyProcessor");
+ std::shared_ptr<minifi::core::ProcessContext> context = [&] {
test_plan->runNextProcessor(); return test_plan->getCurrentContext(); }();
+ std::unique_ptr<minifi::core::ProcessSession> session =
std::make_unique<core::ProcessSessionImpl>(context);
+
+ auto flow_file = session->create();
+ session->write(flow_file, [&](const std::shared_ptr<io::OutputStream>&
output_stream) {
+ std::span<const std::byte> span{reinterpret_cast<const
std::byte*>(payload.data()), payload.size()};
+ output_stream->write(span);
+ return payload.size();
+ });
+ flow_file->updateAttribute("filename", "myfile");
+ flow_file->updateAttribute("flow.id", "test");
+ session->transfer(flow_file, DummyProcessor::Success);
+ session->commit();
+
+ std::map<std::string, std::string> attributes;
+ REQUIRE(SiteToSiteClientTestAccessor::sendFlowFile(protocol, transaction,
*flow_file, *session));
+ verifySendResponses(*collector_ptr, expected_responses);
+ REQUIRE(transaction->getCRC() == 2886786428);
+}
+
+TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S]") {
+ auto collector = std::make_unique<SiteToSiteResponder>();
+
+ const char negotiated_abort_code =
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::NEGOTIATED_ABORT);
+ std::string resp_code;
+ resp_code.insert(resp_code.begin(), negotiated_abort_code);
+ collector->push_response(resp_code);
+ collector->push_response(resp_code);
+
+ auto peer =
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host",
65433, "");
+ sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+ std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+ minifi::utils::Identifier fake_uuid;
+ fake_uuid = uuid_str;
+ protocol.setPortId(fake_uuid);
+
+ REQUIRE_FALSE(SiteToSiteClientTestAccessor::bootstrap(protocol));
+}
+
+void iniitalizeMockRemoteClientReceiveDataResponses(SiteToSiteResponder&
collector) {
Review Comment:
Fixed in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]