lordgamez commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2253639189
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
+ std::shared_ptr<Transaction> transaction;
+ const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ std::string message;
};
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
public:
- SiteToSiteClient()
- : core::ConnectableImpl("SitetoSiteClient") {
+ explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+ : peer_(std::move(peer)) {
+ gsl_Assert(peer_);
}
- ~SiteToSiteClient() override = default;
+ SiteToSiteClient(const SiteToSiteClient&) = delete;
+ SiteToSiteClient(SiteToSiteClient&&) = delete;
+ SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+ SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
- void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
- ssl_context_service_ = context_service;
- }
+ virtual ~SiteToSiteClient() = default;
- /**
- * Creates a transaction using the transaction ID and the direction
- * @param transactionID transaction identifier
- * @param direction direction of transfer
- */
- virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
+ virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+ virtual bool transmitPayload(core::ProcessContext& context, const
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
- /**
- * Transfers flow files
- * @param direction transfer direction
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transfer(TransferDirection direction, core::ProcessContext&
context, core::ProcessSession& session) {
-#ifndef WIN32
- if (__builtin_expect(direction == SEND, 1)) {
+ bool transfer(TransferDirection direction, core::ProcessContext& context,
core::ProcessSession& session) {
+ if (direction == TransferDirection::SEND) {
return transferFlowFiles(context, session);
} else {
return receiveFlowFiles(context, session);
}
-#else
- if (direction == SEND) {
- return transferFlowFiles(context, session);
- } else {
- return receiveFlowFiles(context, session);
- }
-#endif
}
- /**
- * Transfers flow files to server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transferFlowFiles(core::ProcessContext& context,
core::ProcessSession& session);
-
- /**
- * Receive flow files from server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
-
- // Confirm the data that was sent or received by comparing CRC32's of the
data sent and the data received.
- // Receive flow files for the process session
- bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession&
session);
-
- // Receive the data packet from the transaction
- // Return false when any error occurs
- bool receive(const utils::Identifier &transactionID, DataPacket *packet,
bool &eof);
- /**
- * Transfers raw data and attributes to server
- * @param context process context
- * @param session process session
- * @param payload data to transmit
- * @param attributes
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transmitPayload(core::ProcessContext& context,
core::ProcessSession& session, const std::string &payload,
- std::map<std::string, std::string> attributes)
= 0;
-
- void setPortId(utils::Identifier &id) {
+ void setPortId(const utils::Identifier& id) {
port_id_ = id;
}
- /**
- * Sets the idle timeout.
- */
void setIdleTimeout(std::chrono::milliseconds timeout) {
idle_timeout_ = timeout;
}
- /**
- * Sets the base peer for this interface.
- */
- virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
- peer_ = std::move(peer);
- }
-
- /**
- * Provides a reference to the port identifier
- * @returns port identifier
- */
- utils::Identifier getPortId() const {
+ [[nodiscard]] utils::Identifier getPortId() const {
return port_id_;
}
- /**
- * Obtains the peer list and places them into the provided vector
- * @param peers peer vector.
- * @return true if successful, false otherwise
- */
- virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
- /**
- * Establishes the interface.
- * @return true if successful, false otherwise
- */
- virtual bool establish() = 0;
-
- const std::shared_ptr<core::logging::Logger> &getLogger() {
+ [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
return logger_;
}
- void yield() override {
+ void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
+ ssl_context_service_ = context_service;
}
- /**
- * Determines if we are connected and operating
- */
- bool isRunning() const override {
- return running_;
+ void setUseCompression(bool use_compression) {
+ use_compression_ = use_compression;
}
- /**
- * Determines if work is available by this connectable
- * @return boolean if work is available.
- */
- bool isWorkAvailable() override {
- return true;
+ void setBatchSize(uint64_t size) {
+ batch_size_ = size;
}
- virtual bool bootstrap() {
- return true;
+ void setBatchCount(uint64_t count) {
+ batch_count_ = count;
}
- // Return -1 when any error occurs
- virtual int16_t send(const utils::Identifier& transactionID, DataPacket*
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession*
session);
+ void setBatchDuration(std::chrono::milliseconds duration) {
+ batch_duration_ = duration;
+ }
- protected:
- // Cancel the transaction
- virtual void cancel(const utils::Identifier &transactionID);
- // Complete the transaction
- virtual bool complete(core::ProcessContext& context, const utils::Identifier
&transactionID);
- // Error the transaction
- virtual void error(const utils::Identifier &transactionID);
+ virtual void setTimeout(std::chrono::milliseconds timeout) {
+ timeout_ = timeout;
+ }
- virtual bool confirm(const utils::Identifier &transactionID);
- // deleteTransaction
- virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+ friend class test::SiteToSiteClientTestAccessor;
+ virtual bool bootstrap() = 0;
+ virtual bool establish() = 0;
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
virtual void tearDown() = 0;
- // read Respond
- virtual int readResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode &code, std::string &message);
- // write respond
- virtual int writeResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode code, const std::string& message);
- // getRespondCodeContext
- virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (auto & i : SiteToSiteRequest::respondCodeContext) {
- if (i.code == code) {
- return &i;
- }
- }
- return nullptr;
- }
+ virtual void deleteTransaction(const utils::Identifier &transaction_id);
+ virtual std::optional<SiteToSiteResponse> readResponse(const
std::shared_ptr<Transaction> &transaction);
+ virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction,
const SiteToSiteResponse& response);
- // Peer State
- PeerState peer_state_{PeerState::IDLE};
+ bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+ bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>&
transaction, const std::map<std::string, std::string>& attributes);
+ void finalizeSendTransaction(const std::shared_ptr<Transaction>&
transaction, uint64_t sent_bytes);
+ bool sendPacket(const DataPacket& packet);
+ bool sendFlowFile(const std::shared_ptr<Transaction>& transaction,
core::FlowFile& flow_file, core::ProcessSession& session);
- // portId
- utils::Identifier port_id_;
+ void cancel(const utils::Identifier &transaction_id);
+ bool complete(core::ProcessContext& context, const utils::Identifier
&transaction_id);
+ void error(const utils::Identifier &transaction_id);
+ bool confirm(const utils::Identifier &transaction_id);
- // idleTimeout
- std::chrono::milliseconds idle_timeout_{15000};
+ void handleTransactionError(const std::shared_ptr<Transaction>& transaction,
core::ProcessContext& context, const std::exception& exception);
- // Peer Connection
+ PeerState peer_state_{PeerState::IDLE};
+ utils::Identifier port_id_;
+ std::chrono::milliseconds idle_timeout_{15000};
std::unique_ptr<SiteToSitePeer> peer_;
-
- std::atomic<bool> running_{false};
-
- // transaction map
std::map<utils::Identifier, std::shared_ptr<Transaction>>
known_transactions_;
+ std::chrono::nanoseconds batch_send_nanos_{5s};
- // BATCH_SEND_NANOS
- std::chrono::nanoseconds _batchSendNanos = std::chrono::seconds(5);
-
- /***
- * versioning
- */
- uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
- int _currentVersionIndex{0};
- uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
- uint32_t _supportedCodecVersion[1] = {1};
- int _currentCodecVersionIndex{0};
- uint32_t
_currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
+ const std::vector<uint32_t> supported_version_ = {5, 4, 3, 2, 1};
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
##########
libminifi/include/sitetosite/SiteToSite.h:
##########
@@ -23,35 +23,28 @@
#include "minifi-cpp/controllers/SSLContextService.h"
#include "Peer.h"
-#include "core/Property.h"
-#include "properties/Configure.h"
#include "io/CRCStream.h"
#include "utils/Id.h"
-#include "http/BaseHTTPClient.h"
-#include "utils/Export.h"
namespace org::apache::nifi::minifi::sitetosite {
-#if defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wunused-variable"
-#endif
-
-// Resource Negotiated Status Code
-#define RESOURCE_OK 20
-#define DIFFERENT_RESOURCE_VERSION 21
-#define NEGOTIATED_ABORT 255
-// ! Max attributes
-#define MAX_NUM_ATTRIBUTES 25000
-
-// Respond Code Sequence Pattern
-static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
-static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
+
+enum class ResourceNegotiationStatusCode : uint8_t {
+ RESOURCE_OK = 20,
+ DIFFERENT_RESOURCE_VERSION = 21,
+ NEGOTIATED_ABORT = 255
+};
+
+static constexpr uint32_t MAX_NUM_ATTRIBUTES = 25000;
+
+// Response Code Sequence Pattern
+static constexpr uint8_t CODE_SEQUENCE_VALUE_1 = static_cast<uint8_t>('R');
+static constexpr uint8_t CODE_SEQUENCE_VALUE_2 = static_cast<uint8_t>('C');
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
Review Comment:
I think it is not used anymore, removed in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
+ std::shared_ptr<Transaction> transaction;
+ const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ std::string message;
};
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
public:
- SiteToSiteClient()
- : core::ConnectableImpl("SitetoSiteClient") {
+ explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+ : peer_(std::move(peer)) {
+ gsl_Assert(peer_);
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/96757448fff14b52dd92982bdbcd0f0a9baed4f6
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ uint64_t size{0};
+ std::shared_ptr<Transaction> transaction;
+ const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ std::string message;
};
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
public:
- SiteToSiteClient()
- : core::ConnectableImpl("SitetoSiteClient") {
+ explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+ : peer_(std::move(peer)) {
+ gsl_Assert(peer_);
}
- ~SiteToSiteClient() override = default;
+ SiteToSiteClient(const SiteToSiteClient&) = delete;
+ SiteToSiteClient(SiteToSiteClient&&) = delete;
+ SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+ SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
- void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
- ssl_context_service_ = context_service;
- }
+ virtual ~SiteToSiteClient() = default;
- /**
- * Creates a transaction using the transaction ID and the direction
- * @param transactionID transaction identifier
- * @param direction direction of transfer
- */
- virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
+ virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+ virtual bool transmitPayload(core::ProcessContext& context, const
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
- /**
- * Transfers flow files
- * @param direction transfer direction
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transfer(TransferDirection direction, core::ProcessContext&
context, core::ProcessSession& session) {
-#ifndef WIN32
- if (__builtin_expect(direction == SEND, 1)) {
+ bool transfer(TransferDirection direction, core::ProcessContext& context,
core::ProcessSession& session) {
+ if (direction == TransferDirection::SEND) {
return transferFlowFiles(context, session);
} else {
return receiveFlowFiles(context, session);
}
-#else
- if (direction == SEND) {
- return transferFlowFiles(context, session);
- } else {
- return receiveFlowFiles(context, session);
- }
-#endif
}
- /**
- * Transfers flow files to server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transferFlowFiles(core::ProcessContext& context,
core::ProcessSession& session);
-
- /**
- * Receive flow files from server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
-
- // Confirm the data that was sent or received by comparing CRC32's of the
data sent and the data received.
- // Receive flow files for the process session
- bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession&
session);
-
- // Receive the data packet from the transaction
- // Return false when any error occurs
- bool receive(const utils::Identifier &transactionID, DataPacket *packet,
bool &eof);
- /**
- * Transfers raw data and attributes to server
- * @param context process context
- * @param session process session
- * @param payload data to transmit
- * @param attributes
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transmitPayload(core::ProcessContext& context,
core::ProcessSession& session, const std::string &payload,
- std::map<std::string, std::string> attributes)
= 0;
-
- void setPortId(utils::Identifier &id) {
+ void setPortId(const utils::Identifier& id) {
port_id_ = id;
}
- /**
- * Sets the idle timeout.
- */
void setIdleTimeout(std::chrono::milliseconds timeout) {
idle_timeout_ = timeout;
}
- /**
- * Sets the base peer for this interface.
- */
- virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
- peer_ = std::move(peer);
- }
-
- /**
- * Provides a reference to the port identifier
- * @returns port identifier
- */
- utils::Identifier getPortId() const {
+ [[nodiscard]] utils::Identifier getPortId() const {
return port_id_;
}
- /**
- * Obtains the peer list and places them into the provided vector
- * @param peers peer vector.
- * @return true if successful, false otherwise
- */
- virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
- /**
- * Establishes the interface.
- * @return true if successful, false otherwise
- */
- virtual bool establish() = 0;
-
- const std::shared_ptr<core::logging::Logger> &getLogger() {
+ [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
return logger_;
}
- void yield() override {
+ void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
+ ssl_context_service_ = context_service;
}
- /**
- * Determines if we are connected and operating
- */
- bool isRunning() const override {
- return running_;
+ void setUseCompression(bool use_compression) {
+ use_compression_ = use_compression;
}
- /**
- * Determines if work is available by this connectable
- * @return boolean if work is available.
- */
- bool isWorkAvailable() override {
- return true;
+ void setBatchSize(uint64_t size) {
+ batch_size_ = size;
}
- virtual bool bootstrap() {
- return true;
+ void setBatchCount(uint64_t count) {
+ batch_count_ = count;
}
- // Return -1 when any error occurs
- virtual int16_t send(const utils::Identifier& transactionID, DataPacket*
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession*
session);
+ void setBatchDuration(std::chrono::milliseconds duration) {
+ batch_duration_ = duration;
+ }
- protected:
- // Cancel the transaction
- virtual void cancel(const utils::Identifier &transactionID);
- // Complete the transaction
- virtual bool complete(core::ProcessContext& context, const utils::Identifier
&transactionID);
- // Error the transaction
- virtual void error(const utils::Identifier &transactionID);
+ virtual void setTimeout(std::chrono::milliseconds timeout) {
+ timeout_ = timeout;
+ }
- virtual bool confirm(const utils::Identifier &transactionID);
- // deleteTransaction
- virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+ friend class test::SiteToSiteClientTestAccessor;
+ virtual bool bootstrap() = 0;
+ virtual bool establish() = 0;
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
virtual void tearDown() = 0;
- // read Respond
- virtual int readResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode &code, std::string &message);
- // write respond
- virtual int writeResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode code, const std::string& message);
- // getRespondCodeContext
- virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (auto & i : SiteToSiteRequest::respondCodeContext) {
- if (i.code == code) {
- return &i;
- }
- }
- return nullptr;
- }
+ virtual void deleteTransaction(const utils::Identifier &transaction_id);
+ virtual std::optional<SiteToSiteResponse> readResponse(const
std::shared_ptr<Transaction> &transaction);
+ virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction,
const SiteToSiteResponse& response);
- // Peer State
- PeerState peer_state_{PeerState::IDLE};
+ bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+ bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>&
transaction, const std::map<std::string, std::string>& attributes);
+ void finalizeSendTransaction(const std::shared_ptr<Transaction>&
transaction, uint64_t sent_bytes);
+ bool sendPacket(const DataPacket& packet);
+ bool sendFlowFile(const std::shared_ptr<Transaction>& transaction,
core::FlowFile& flow_file, core::ProcessSession& session);
- // portId
- utils::Identifier port_id_;
+ void cancel(const utils::Identifier &transaction_id);
+ bool complete(core::ProcessContext& context, const utils::Identifier
&transaction_id);
+ void error(const utils::Identifier &transaction_id);
+ bool confirm(const utils::Identifier &transaction_id);
- // idleTimeout
- std::chrono::milliseconds idle_timeout_{15000};
+ void handleTransactionError(const std::shared_ptr<Transaction>& transaction,
core::ProcessContext& context, const std::exception& exception);
- // Peer Connection
+ PeerState peer_state_{PeerState::IDLE};
+ utils::Identifier port_id_;
+ std::chrono::milliseconds idle_timeout_{15000};
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
##########
libminifi/include/sitetosite/SiteToSite.h:
##########
@@ -205,140 +147,184 @@ typedef enum {
ABORT = 250,
UNRECOGNIZED_RESPONSE_CODE = 254,
END_OF_STREAM = 255
-}RespondCode;
-
-// Respond Code Class
-typedef struct {
- RespondCode code;
- const char *description;
- bool hasDescription;
-} RespondCodeContext;
-
-
+};
-// Request Type Str
-class SiteToSiteRequest {
- public:
- MINIFIAPI static const char *RequestTypeStr[MAX_REQUEST_TYPE];
- MINIFIAPI static RespondCodeContext respondCodeContext[21];
+struct ResponseCodeContext {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ const std::string_view description;
+ bool has_description = false;
};
+static constexpr std::array<ResponseCodeContext, 21> respond_code_contexts = {{
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
##########
libminifi/src/sitetosite/SiteToSiteFactory.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/SiteToSiteFactory.h"
+
+#include <utility>
+#include <memory>
+
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "sitetosite/HttpSiteToSiteClient.h"
+#include "utils/net/AsioSocketUtils.h"
+#include "core/ClassLoader.h"
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::unique_ptr<SiteToSitePeer> createStreamingPeer(const
SiteToSiteClientConfiguration &client_configuration) {
+ utils::net::SocketData socket_data{client_configuration.getHost(),
client_configuration.getPort(), client_configuration.getSecurityContext()};
+ auto connection =
std::make_unique<utils::net::AsioSocketConnection>(socket_data);
+ return std::make_unique<SiteToSitePeer>(std::move(connection),
client_configuration.getHost(), client_configuration.getPort(),
client_configuration.getInterface());
+}
+
+void setCommonConfigurationOptions(SiteToSiteClient& client, const
SiteToSiteClientConfiguration &client_configuration) {
+ client.setSSLContextService(client_configuration.getSecurityContext());
+ client.setUseCompression(client_configuration.getUseCompression());
+ if (client_configuration.getBatchCount()) {
+ client.setBatchCount(client_configuration.getBatchCount().value());
+ }
+ if (client_configuration.getBatchSize()) {
+ client.setBatchSize(client_configuration.getBatchSize().value());
+ }
+ if (client_configuration.getBatchDuration()) {
+ client.setBatchDuration(client_configuration.getBatchDuration().value());
+ }
+ if (client_configuration.getTimeout()) {
+ client.setTimeout(client_configuration.getTimeout().value());
+ }
+}
+
+std::unique_ptr<SiteToSiteClient> createRawSocketSiteToSiteClient(const
SiteToSiteClientConfiguration &client_configuration) {
+ auto raw_site_to_site_client =
std::make_unique<RawSiteToSiteClient>(createStreamingPeer(client_configuration));
+ raw_site_to_site_client->setPortId(client_configuration.getPortId());
+ setCommonConfigurationOptions(*raw_site_to_site_client,
client_configuration);
+ return raw_site_to_site_client;
+}
+
+std::unique_ptr<SiteToSiteClient> createHttpSiteToSiteClient(const
SiteToSiteClientConfiguration &client_configuration) {
+ auto peer = std::make_unique<SiteToSitePeer>(client_configuration.getHost(),
client_configuration.getPort(), client_configuration.getInterface());
+ peer->setHTTPProxy(client_configuration.getHTTPProxy());
+
+ auto http_site_to_site_client =
std::make_unique<HttpSiteToSiteClient>(std::move(peer));
+ http_site_to_site_client->setPortId(client_configuration.getPortId());
+
http_site_to_site_client->setIdleTimeout(client_configuration.getIdleTimeout());
+ setCommonConfigurationOptions(*http_site_to_site_client,
client_configuration);
+ return http_site_to_site_client;
+}
+} // namespace
+
+std::unique_ptr<SiteToSiteClient> createClient(const
SiteToSiteClientConfiguration &client_configuration) {
+ switch (client_configuration.getClientType()) {
+ case ClientType::RAW:
+ return createRawSocketSiteToSiteClient(client_configuration);
+ case ClientType::HTTP:
+ return createHttpSiteToSiteClient(client_configuration);
+ }
+ return nullptr;
Review Comment:
Added std::terminate in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/96757448fff14b52dd92982bdbcd0f0a9baed4f6
##########
libminifi/test/libtest/integration/HTTPHandlers.cpp:
##########
@@ -93,6 +95,25 @@ TransactionResponder::TransactionResponder(std::string
base_url, std::string por
}
bool TransactionResponder::handlePost(CivetServer* /*server*/, struct
mg_connection *conn) {
+ auto req_info = mg_get_request_info(conn);
+ std::unordered_map<std::string, std::string> expected_headers {
Review Comment:
Updated to static const for now in
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]