fgerlits commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2205045738


##########
libminifi/src/sitetosite/RawSiteToSiteClient.cpp:
##########
@@ -0,0 +1,431 @@
+/**
+ * Site2SiteProtocol class implementation
+ *

Review Comment:
   this should be updated or deleted



##########
docker/test/integration/features/s2s.feature:
##########
@@ -25,73 +25,88 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S 
protocol
   Scenario: A MiNiFi instance produces and transfers data to a NiFi instance 
via s2s
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
-    And a RemoteProcessGroup node opened on 
"http://nifi-${feature_id}:8080/nifi";
-    And the "success" relationship of the GetFile processor is connected to 
the input port on the RemoteProcessGroup
+    And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on 
"http://nifi-${feature_id}:8080/nifi";
+    And an input port with name "to_nifi" is created on the RemoteProcessGroup 
named "RemoteProcessGroup"
+    And the "success" relationship of the GetFile processor is connected to 
the to_nifi
 
-    And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi"
+    And a NiFi flow is receiving data in an input port named "from-minifi" 
with the id of the port named "to_nifi" from the RemoteProcessGroup named 
"RemoteProcessGroup"

Review Comment:
   I understand this is already quite long, but I think
   ```suggestion
       And a NiFi flow is receiving data from the RemoteProcessGroup named 
"RemoteProcessGroup" in an input port named "from-minifi" which has the same id 
as the port named "to_nifi"
   ```
   would be easier to understand, even though it is even longer.



##########
docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py:
##########
@@ -134,8 +179,8 @@ def serialize_node(self, connectable, root, visited):
             for proc in conn_procs:
                 root['connections'].append({
                     'name': str(uuid.uuid4()),
-                    'source': {'id': str(connectable.uuid)},
-                    'destination': {'id': str(proc.uuid) if not 
isinstance(proc, InputPort) else str(proc.instance_id)}
+                    'source': {'id': str(connectable.uuid) if not 
isinstance(connectable, InputPort) and not isinstance(connectable, OutputPort) 
else str(connectable.instance_id)},
+                    'destination': {'id': str(proc.uuid) if not 
isinstance(proc, InputPort) and not isinstance(proc, OutputPort) else 
str(proc.instance_id)}

Review Comment:
   this is getting too complex; it could be simplified e.g. by adding an 
`id_for_connection` method in `Connectable`, and overriding it in `InputPort` 
and `OutputPort`



##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
 #pragma once
 
 #include <algorithm>
-#include <array>
 #include <map>
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
+#include <optional>
 
 #include "Peer.h"
 #include "SiteToSite.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
 
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
 
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+}  // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
  public:
-  DataPacket(std::shared_ptr<core::logging::Logger> logger, 
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> 
attributes, const std::string &payload)
-      : _attributes{std::move(attributes)},
-        transaction_{std::move(transaction)},
-        payload_{payload},
-        logger_reference_{std::move(logger)} {
+  DataPacket(std::shared_ptr<Transaction> transaction, const std::string 
&payload)
+      : transaction{std::move(transaction)},
+        payload{payload} {
+  }
+  DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string, 
std::string> attributes, const std::string &payload)
+      : attributes{std::move(attributes)},
+        transaction{std::move(transaction)},
+        payload{payload} {
   }
-  std::map<std::string, std::string> _attributes;
-  uint64_t _size{0};
-  std::shared_ptr<Transaction> transaction_;
-  const std::string & payload_;
-  std::shared_ptr<core::logging::Logger> logger_reference_;
+  std::map<std::string, std::string> attributes;
+  uint64_t size{0};
+  std::shared_ptr<Transaction> transaction;
+  const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+  ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+  std::string message;
 };
 
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
  public:
-  SiteToSiteClient()
-      : core::ConnectableImpl("SitetoSiteClient") {
+  explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+      : peer_(std::move(peer)) {
+    gsl_Assert(peer_);
   }
 
-  ~SiteToSiteClient() override = default;
+  SiteToSiteClient(const SiteToSiteClient&) = delete;
+  SiteToSiteClient(SiteToSiteClient&&) = delete;
+  SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+  SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
 
-  void setSSLContextService(const 
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
-    ssl_context_service_ = context_service;
-  }
+  virtual ~SiteToSiteClient() = default;
 
-  /**
-   * Creates a transaction using the transaction ID and the direction
-   * @param transactionID transaction identifier
-   * @param direction direction of transfer
-   */
-  virtual std::shared_ptr<Transaction> createTransaction(TransferDirection 
direction) = 0;
+  virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+  virtual bool transmitPayload(core::ProcessContext& context, const 
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
 
-  /**
-   * Transfers flow files
-   * @param direction transfer direction
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transfer(TransferDirection direction, core::ProcessContext& 
context, core::ProcessSession& session) {
-#ifndef WIN32
-    if (__builtin_expect(direction == SEND, 1)) {
+  bool transfer(TransferDirection direction, core::ProcessContext& context, 
core::ProcessSession& session) {
+    if (direction == TransferDirection::SEND) {
       return transferFlowFiles(context, session);
     } else {
       return receiveFlowFiles(context, session);
     }
-#else
-    if (direction == SEND) {
-      return transferFlowFiles(context, session);
-    } else {
-      return receiveFlowFiles(context, session);
-    }
-#endif
   }
 
-  /**
-   * Transfers flow files to server
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transferFlowFiles(core::ProcessContext& context, 
core::ProcessSession& session);
-
-  /**
-   * Receive flow files from server
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-
-  // Confirm the data that was sent or received by comparing CRC32's of the 
data sent and the data received.
-  // Receive flow files for the process session
-  bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession& 
session);
-
-  // Receive the data packet from the transaction
-  // Return false when any error occurs
-  bool receive(const utils::Identifier &transactionID, DataPacket *packet, 
bool &eof);
-  /**
-   * Transfers raw data and attributes  to server
-   * @param context process context
-   * @param session process session
-   * @param payload data to transmit
-   * @param attributes
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transmitPayload(core::ProcessContext& context, 
core::ProcessSession& session, const std::string &payload,
-                               std::map<std::string, std::string> attributes) 
= 0;
-
-  void setPortId(utils::Identifier &id) {
+  void setPortId(const utils::Identifier& id) {
     port_id_ = id;
   }
 
-  /**
-   * Sets the idle timeout.
-   */
   void setIdleTimeout(std::chrono::milliseconds timeout) {
      idle_timeout_ = timeout;
   }
 
-  /**
-   * Sets the base peer for this interface.
-   */
-  virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
-    peer_ = std::move(peer);
-  }
-
-  /**
-   * Provides a reference to the port identifier
-   * @returns port identifier
-   */
-  utils::Identifier getPortId() const {
+  [[nodiscard]] utils::Identifier getPortId() const {
     return port_id_;
   }
 
-  /**
-   * Obtains the peer list and places them into the provided vector
-   * @param peers peer vector.
-   * @return true if successful, false otherwise
-   */
-  virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
-  /**
-   * Establishes the interface.
-   * @return true if successful, false otherwise
-   */
-  virtual bool establish() = 0;
-
-  const std::shared_ptr<core::logging::Logger> &getLogger() {
+  [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
     return logger_;
   }
 
-  void yield() override {
+  void setSSLContextService(const 
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
+    ssl_context_service_ = context_service;
   }
 
-  /**
-   * Determines if we are connected and operating
-   */
-  bool isRunning() const override {
-    return running_;
+  void setUseCompression(bool use_compression) {
+    use_compression_ = use_compression;
   }
 
-  /**
-   * Determines if work is available by this connectable
-   * @return boolean if work is available.
-   */
-  bool isWorkAvailable() override {
-    return true;
+  void setBatchSize(uint64_t size) {
+    batch_size_ = size;
   }
 
-  virtual bool bootstrap() {
-    return true;
+  void setBatchCount(uint64_t count) {
+    batch_count_ = count;
   }
 
-  // Return -1 when any error occurs
-  virtual int16_t send(const utils::Identifier& transactionID, DataPacket* 
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession* 
session);
+  void setBatchDuration(std::chrono::milliseconds duration) {
+    batch_duration_ = duration;
+  }
 
- protected:
-  // Cancel the transaction
-  virtual void cancel(const utils::Identifier &transactionID);
-  // Complete the transaction
-  virtual bool complete(core::ProcessContext& context, const utils::Identifier 
&transactionID);
-  // Error the transaction
-  virtual void error(const utils::Identifier &transactionID);
+  virtual void setTimeout(std::chrono::milliseconds timeout) {
+    timeout_ = timeout;
+  }
 
-  virtual bool confirm(const utils::Identifier &transactionID);
-  // deleteTransaction
-  virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+  friend class test::SiteToSiteClientTestAccessor;
 
+  virtual bool bootstrap() = 0;
+  virtual bool establish() = 0;
+  virtual std::shared_ptr<Transaction> createTransaction(TransferDirection 
direction) = 0;
   virtual void tearDown() = 0;
 
-  // read Respond
-  virtual int readResponse(const std::shared_ptr<Transaction> &transaction, 
RespondCode &code, std::string &message);
-  // write respond
-  virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, 
RespondCode code, const std::string& message);
-  // getRespondCodeContext
-  virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
-    for (auto & i : SiteToSiteRequest::respondCodeContext) {
-      if (i.code == code) {
-        return &i;
-      }
-    }
-    return nullptr;
-  }
+  virtual void deleteTransaction(const utils::Identifier &transaction_id);
+  virtual std::optional<SiteToSiteResponse> readResponse(const 
std::shared_ptr<Transaction> &transaction);
+  virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction, 
const SiteToSiteResponse& response);
 
-  // Peer State
-  PeerState peer_state_{PeerState::IDLE};
+  bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+  bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>& 
transaction, const std::map<std::string, std::string>& attributes);
+  void finalizeSendTransaction(const std::shared_ptr<Transaction>& 
transaction, uint64_t sent_bytes);
+  bool sendPacket(const DataPacket& packet);
+  bool sendFlowFile(const std::shared_ptr<Transaction>& transaction, 
core::FlowFile& flow_file, core::ProcessSession& session);
 
-  // portId
-  utils::Identifier port_id_;
+  void cancel(const utils::Identifier &transaction_id);
+  bool complete(core::ProcessContext& context, const utils::Identifier 
&transaction_id);
+  void error(const utils::Identifier &transaction_id);
+  bool confirm(const utils::Identifier &transaction_id);
 
-  // idleTimeout
-  std::chrono::milliseconds idle_timeout_{15000};
+  void handleTransactionError(const std::shared_ptr<Transaction>& transaction, 
core::ProcessContext& context, const std::exception& exception);
 
-  // Peer Connection
+  PeerState peer_state_{PeerState::IDLE};
+  utils::Identifier port_id_;
+  std::chrono::milliseconds idle_timeout_{15000};

Review Comment:
   we could write
   ```suggestion
     std::chrono::milliseconds idle_timeout_{15s};
   ```
   since we are already including chrono literals



##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE  // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+  return 
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const 
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity, 
const utils::Identifier &id) {
+  try {
+    rapidjson::Document root;
+    rapidjson::ParseResult ok = root.Parse(entity.c_str());
+    if (!ok) {
+      std::stringstream ss;
+      ss << "Failed to parse archive lens stack from JSON string with reason: "
+          << rapidjson::GetParseError_En(ok.Code())
+          << " at offset " << ok.Offset();
+
+      throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
+    }
+
+    std::vector<PeerStatus> peer_statuses;
+    if (!root.HasMember("peers") || !root["peers"].IsArray() || 
root["peers"].Size() <= 0) {
+      logger->log_debug("Peers is either not a member or is empty. String to 
analyze: {}", entity);
+      return peer_statuses;

Review Comment:
   Is it intentional that we return an empty vector here? In other error cases, 
we return a `nullopt`.



##########
libminifi/src/sitetosite/RawSiteToSiteClient.cpp:
##########
@@ -0,0 +1,431 @@
+/**
+ * Site2SiteProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <utility>
+#include <map>
+#include <string>
+#include <memory>
+#include <vector>
+
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "utils/gsl.h"
+#include "utils/Enum.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+bool negotiateVersion(SiteToSitePeer& peer, const std::string& resource_name, 
const std::string& negotiation_type, uint32_t& current_version, uint32_t& 
current_version_index,
+    const std::vector<uint32_t>& supported_versions, const 
std::shared_ptr<core::logging::Logger>& logger) {
+  if (const auto ret = peer.write(resource_name); ret == 0 || 
io::isError(ret)) {
+    logger->log_debug("result of writing {} resource name is {}", 
negotiation_type, ret);
+    return false;
+  }
+
+  if (const auto ret = peer.write(current_version); ret == 0 || 
io::isError(ret)) {
+    logger->log_debug("result of {} version is {}", negotiation_type, ret);
+    return false;
+  }
+
+  uint8_t status_code_byte = 0;
+  if (const auto ret = peer.read(status_code_byte); ret == 0 || 
io::isError(ret)) {
+    logger->log_debug("result of writing {} version status code  {}", 
negotiation_type, ret);
+    return false;
+  }
+
+  auto status_code = 
magic_enum::enum_cast<ResourceNegotiationStatusCode>(status_code_byte);
+  if (!status_code) {
+    logger->log_error("Negotiate {} response unknown code {}", 
negotiation_type, status_code_byte);
+    return false;
+  }
+
+  switch (*status_code) {
+    case ResourceNegotiationStatusCode::RESOURCE_OK: {
+      logger->log_debug("Site2Site {} Negotiate version OK", negotiation_type);
+      return true;
+    }
+    case ResourceNegotiationStatusCode::DIFFERENT_RESOURCE_VERSION: {
+      uint32_t server_version = 0;
+      if (const auto ret = peer.read(server_version); ret == 0 || 
io::isError(ret)) {
+        return false;
+      }
+
+      logger->log_info("Site2Site Server Response asked for a different 
protocol version ", server_version);
+
+      for (uint32_t i = (current_version_index + 1); i < 
supported_versions.size(); i++) {
+        if (server_version >= supported_versions.at(i)) {
+          current_version = supported_versions.at(i);
+          current_version_index = i;
+          return negotiateVersion(peer, resource_name, negotiation_type, 
current_version, current_version_index, supported_versions, logger);
+        }
+      }
+      logger->log_error("Site2Site Negotiate {} failed to find a common 
version with server", negotiation_type);
+      return false;
+    }
+    case ResourceNegotiationStatusCode::NEGOTIATED_ABORT: {
+      logger->log_error("Site2Site Negotiate {} response ABORT", 
negotiation_type);
+      return false;
+    }
+    default: {
+      logger->log_error("Negotiate {} response unhandled code {}", 
negotiation_type, magic_enum::enum_name(*status_code));
+      return false;
+    }
+  }
+}
+}  // namespace
+
+bool RawSiteToSiteClient::establish() {
+  if (peer_state_ != PeerState::IDLE) {
+    logger_->log_error("Site2Site peer state is not idle while try to 
establish");
+    return false;
+  }
+
+  if (auto ret = peer_->open(); !ret) {
+    logger_->log_error("Site2Site peer socket open failed");
+    return false;
+  }
+
+  if (auto ret = initiateResourceNegotiation(); !ret) {
+    logger_->log_error("Site2Site Protocol Version Negotiation failed");
+    return false;
+  }
+
+  logger_->log_debug("Site2Site socket established");
+  peer_state_ = PeerState::ESTABLISHED;
+
+  return true;
+}
+
+bool RawSiteToSiteClient::initiateResourceNegotiation() {
+  if (peer_state_ != PeerState::IDLE) {
+    logger_->log_error("Site2Site peer state is not idle while 
initiateResourceNegotiation");
+    return false;
+  }
+
+  logger_->log_debug("Negotiate protocol version with destination port {} 
current version {}", port_id_.to_string(), current_version_);
+  return negotiateVersion(*peer_, std::string{PROTOCOL_RESOURCE_NAME}, 
"protocol", current_version_, current_version_index_, supported_version_, 
logger_);
+}
+
+bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
+  if (peer_state_ != PeerState::HANDSHAKED) {
+    logger_->log_error("Site2Site peer state is not handshaked while 
initiateCodecResourceNegotiation");
+    return false;
+  }
+
+  logger_->log_trace("Negotiate Codec version with destination port {} current 
version {}", port_id_.to_string(), current_codec_version_);
+  return negotiateVersion(*peer_, std::string{CODEC_RESOURCE_NAME}, "codec", 
current_codec_version_, current_codec_version_index_, supported_codec_version_, 
logger_);
+}
+
+bool RawSiteToSiteClient::handShake() {
+  if (peer_state_ != PeerState::ESTABLISHED) {
+    logger_->log_error("Site2Site peer state is not established while 
handshake");
+    return false;
+  }
+  logger_->log_debug("Site2Site Protocol Perform hand shake with destination 
port {}", port_id_.to_string());
+  comms_identifier_ = utils::IdGenerator::getIdGenerator()->generate();
+
+  if (const auto ret = peer_->write(comms_identifier_); ret == 0 || 
io::isError(ret)) {
+    logger_->log_error("Failed to write comms identifier {}", ret);
+    return false;
+  }
+
+  std::map<std::string, std::string> properties;
+  // TODO(lordgamez): send use_compression_ boolean value when compression 
support is added
+  properties[std::string(magic_enum::enum_name(HandshakeProperty::GZIP))] = 
"false";
+  
properties[std::string(magic_enum::enum_name(HandshakeProperty::PORT_IDENTIFIER))]
 = port_id_.to_string();
+  
properties[std::string(magic_enum::enum_name(HandshakeProperty::REQUEST_EXPIRATION_MILLIS))]
 = std::to_string(timeout_.load().count());
+  if (current_version_ >= 5) {
+    if (batch_count_ > 0) {
+      
properties[std::string(magic_enum::enum_name(HandshakeProperty::BATCH_COUNT))] 
= std::to_string(batch_count_);
+    }
+    if (batch_size_ > 0) {
+      
properties[std::string(magic_enum::enum_name(HandshakeProperty::BATCH_SIZE))] = 
std::to_string(batch_size_);
+    }
+    if (batch_duration_.load() > 0ms) {
+      
properties[std::string(magic_enum::enum_name(HandshakeProperty::BATCH_DURATION))]
 = std::to_string(batch_duration_.load().count());
+    }
+  }
+
+  if (current_version_ >= 3) {
+    if (const auto ret = peer_->write(peer_->getURL()); ret == 0 || 
io::isError(ret)) {
+      logger_->log_error("Failed to write peer URL {}", ret);
+      return false;
+    }
+  }
+
+  if (const auto ret = peer_->write(gsl::narrow<uint32_t>(properties.size())); 
ret == 0 || io::isError(ret)) {
+    logger_->log_error("Failed to write properties size {}", ret);
+    return false;
+  }
+
+  for (const auto& property : properties) {
+    if (const auto ret = peer_->write(property.first); ret == 0 || 
io::isError(ret)) {
+      logger_->log_error("Failed to write property key {}", ret);
+      return false;
+    }
+
+    if (const auto ret = peer_->write(property.second); ret == 0 || 
io::isError(ret)) {
+      logger_->log_error("Failed to write property value {}", ret);
+      return false;
+    }
+    logger_->log_debug("Site2Site Protocol Send handshake properties {} {}", 
property.first, property.second);
+  }
+
+  const auto response = readResponse(nullptr);
+  if (!response) {
+    return false;
+  }
+
+  auto logPortStateError = [this](const std::string& error) {
+    logger_->log_error("Site2Site HandShake Failed because destination port, 
{}, is {}", port_id_.to_string(), error);
+  };
+
+  switch (response->code) {
+    case ResponseCode::PROPERTIES_OK:
+      logger_->log_debug("Site2Site HandShake Completed");
+      peer_state_ = PeerState::HANDSHAKED;
+      return true;
+    case ResponseCode::PORT_NOT_IN_VALID_STATE:
+      logPortStateError("in invalid state");
+      return false;
+    case ResponseCode::UNKNOWN_PORT:
+      logPortStateError("an unknown port");
+      return false;
+    case ResponseCode::PORTS_DESTINATION_FULL:
+      logPortStateError("full");
+      return false;
+    case ResponseCode::UNAUTHORIZED:
+      logger_->log_error("Site2Site HandShake on port {} failed: 
UNAUTHORIZED", port_id_.to_string());
+      return false;
+    default:
+      logger_->log_error("Site2Site HandShake on port {} failed: unknown 
response code {}", port_id_.to_string(), 
magic_enum::enum_underlying(response->code));
+      return false;
+  }
+}
+
+void RawSiteToSiteClient::tearDown() {
+  if (magic_enum::enum_underlying(peer_state_) >= 
magic_enum::enum_underlying(PeerState::ESTABLISHED)) {
+    logger_->log_trace("Site2Site Protocol tearDown");
+    writeRequestType(RequestType::SHUTDOWN);
+  }
+
+  known_transactions_.clear();
+  peer_->close();
+  peer_state_ = PeerState::IDLE;
+}
+
+std::optional<std::vector<PeerStatus>> RawSiteToSiteClient::getPeerList() {
+  if (!establish() || !handShake()) {
+    tearDown();
+    return std::nullopt;
+  }
+
+  if (writeRequestType(RequestType::REQUEST_PEER_LIST) <= 0) {
+    tearDown();
+    return std::nullopt;
+  }
+
+  uint32_t number_of_peers = 0;
+  std::vector<PeerStatus> peers;
+  if (const auto ret = peer_->read(number_of_peers); ret == 0 || 
io::isError(ret)) {
+    tearDown();
+    return std::nullopt;
+  }
+
+  for (uint32_t i = 0; i < number_of_peers; i++) {
+    std::string host;
+    if (const auto ret = peer_->read(host); ret == 0 || io::isError(ret)) {
+      tearDown();
+      return std::nullopt;
+    }
+
+    uint32_t port = 0;
+    if (const auto ret = peer_->read(port); ret == 0 || io::isError(ret)) {
+      tearDown();
+      return std::nullopt;
+    }
+
+    uint8_t secure = 0;
+    if (const auto ret = peer_->read(secure); ret == 0 || io::isError(ret)) {
+      tearDown();
+      return std::nullopt;
+    }
+
+    uint32_t count = 0;
+    if (const auto ret = peer_->read(count); ret == 0 || io::isError(ret)) {
+      tearDown();
+      return std::nullopt;
+    }
+
+    peers.push_back(PeerStatus(port_id_, host, gsl::narrow<uint16_t>(port), 
count, true));
+    logger_->log_trace("Site2Site Peer host {} port {} Secure {}", host, port, 
std::to_string(secure));

Review Comment:
   `secure` is no longer used in `PeerStatus` (only in the log); I can't see it 
added back in #1974, either -- is this OK?



##########
libminifi/test/unit/SiteToSiteTests.cpp:
##########
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "io/BaseStream.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SiteToSiteHelper.h"
+#include "unit/DummyProcessor.h"
+#include "unit/ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SiteToSiteClientTestAccessor {
+ public:
+  static bool bootstrap(sitetosite::RawSiteToSiteClient& client) {
+    return client.bootstrap();
+  }
+
+  static std::shared_ptr<sitetosite::Transaction> 
createTransaction(sitetosite::RawSiteToSiteClient& client, 
sitetosite::TransferDirection direction) {
+    return client.createTransaction(direction);
+  }
+
+  static bool sendFlowFile(sitetosite::RawSiteToSiteClient& client, const 
std::shared_ptr<sitetosite::Transaction>& transaction, core::FlowFile& 
flow_file, core::ProcessSession& session) {
+    return client.sendFlowFile(transaction, flow_file, session);
+  }
+
+  static bool sendPacket(sitetosite::RawSiteToSiteClient& client, const 
sitetosite::DataPacket& packet) {
+    return client.sendPacket(packet);
+  }
+
+  static std::pair<uint64_t, uint64_t> 
readFlowFiles(sitetosite::RawSiteToSiteClient& client, const 
std::shared_ptr<sitetosite::Transaction>& transaction, core::ProcessSession& 
session) {
+    return client.readFlowFiles(transaction, session);
+  }
+};
+
+void initializeMockBootstrapResponses(const 
std::unique_ptr<SiteToSiteResponder>& collector) {

Review Comment:
   the parameter type of this could be `SiteToSiteResponder&`, too



##########
libminifi/include/sitetosite/Peer.h:
##########
@@ -17,88 +17,29 @@
  */
 #pragma once
 
-#include <errno.h>
-#include <stdio.h>
-
 #include <atomic>
-#include <map>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <utility>
+#include <array>
 
 #include "core/logging/LoggerFactory.h"
-#include "core/Property.h"
-#include "io/BaseStream.h"
-#include "io/BufferStream.h"
-#include "properties/Configure.h"
 #include "http/BaseHTTPClient.h"
-#include "utils/TimeUtil.h"
+#include "io/BaseStream.h"
 #include "io/NetworkPrioritizer.h"
 
-namespace org::apache::nifi::minifi::sitetosite {
-
-class Peer {
- public:
-  explicit Peer(const utils::Identifier &port_id, const std::string &host, 
uint16_t port, bool secure = false)
-      : host_(host),
-        port_(port),
-        secure_(secure) {
-    port_id_ = port_id;
-  }
-
-  explicit Peer(const std::string &host, uint16_t port, bool secure = false)
-      : host_(host),
-        port_(port),
-        secure_(secure) {
-  }
-
-  explicit Peer(const Peer &other)
-      : host_(other.host_),
-        port_(other.port_),
-        secure_(other.secure_) {
-    port_id_ = other.port_id_;
-  }
-
-  explicit Peer(Peer &&other)
-      : host_(std::move(other.host_)),
-        port_(std::move(other.port_)),
-        secure_(std::move(other.secure_)) {
-    port_id_ = other.port_id_;
-  }
-
-  uint16_t getPort() const {
-    return port_;
-  }
-
-  const std::string &getHost() const {
-    return host_;
-  }
+using namespace std::literals::chrono_literals;
 
-  bool isSecure() const {
-    return secure_;
-  }
-
-  utils::Identifier getPortId() const {
-    return port_id_;
-  }
-
- protected:
-  std::string host_;
-
-  uint16_t port_;
-
-  utils::Identifier port_id_;
-
-  // secore comms
+namespace org::apache::nifi::minifi::sitetosite {
 
-  bool secure_;
-};
+static constexpr std::array<char, 4> MAGIC_BYTES = { 'N', 'i', 'F', 'i' };

Review Comment:
   in a header, on namespace level, this should be
   ```suggestion
   inline constexpr std::array<char, 4> MAGIC_BYTES = { 'N', 'i', 'F', 'i' };
   ```



##########
extensions/standard-processors/tests/unit/FlowJsonTests.cpp:
##########
@@ -1786,4 +1808,69 @@ TEST_CASE("Parameter context names cannot conflict with 
parameter provider gener
     "with no parameter provider or generated by other parameter provider");
 }
 
+TEST_CASE("Output port can also be used in RPG") {
+  ConfigurationTestController test_controller;
+
+  core::flow::AdaptiveConfiguration config(test_controller.getContext());
+
+  static const std::string CONFIG_JSON =
+      R"(
+{
+  "rootGroup": {
+    "name": "MiNiFi Flow",
+    "processors": [{
+      "identifier": "00000000-0000-0000-0000-000000000001",
+      "name": "PutFile",
+      "type": "org.apache.nifi.processors.standard.PutFile",
+      "schedulingStrategy": "Event_DRIVEN",
+      "autoTerminatedRelationships": ["success"],
+      "properties": {}
+    }],
+    "connections": [{
+     "identifier": "00000000-0000-0000-0000-000000000008",
+      "name": "S2SToRPG",
+      "source": {
+        "id": "00000000-0000-0000-0000-000000000003",
+        "name": "AmazingOutputPort"
+      },
+      "destination": {
+        "id": "00000000-0000-0000-0000-000000000001",
+        "name": "PutFile"
+      },
+      "selectedRelationships": [""]
+    }],
+    "remoteProcessGroups": [{
+      "name": "NiFi Flow",
+      "targetUri": "https://localhost:8090/nifi";,
+      "communicationsTimeout": "19 sec",
+      "inputPorts": [{

Review Comment:
   this should be `outputPorts`



##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
 #pragma once
 
 #include <algorithm>
-#include <array>
 #include <map>
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
+#include <optional>
 
 #include "Peer.h"
 #include "SiteToSite.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
 
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
 
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+}  // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
  public:
-  DataPacket(std::shared_ptr<core::logging::Logger> logger, 
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> 
attributes, const std::string &payload)
-      : _attributes{std::move(attributes)},
-        transaction_{std::move(transaction)},
-        payload_{payload},
-        logger_reference_{std::move(logger)} {
+  DataPacket(std::shared_ptr<Transaction> transaction, const std::string 
&payload)
+      : transaction{std::move(transaction)},
+        payload{payload} {
+  }
+  DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string, 
std::string> attributes, const std::string &payload)
+      : attributes{std::move(attributes)},
+        transaction{std::move(transaction)},
+        payload{payload} {
   }
-  std::map<std::string, std::string> _attributes;
-  uint64_t _size{0};
-  std::shared_ptr<Transaction> transaction_;
-  const std::string & payload_;
-  std::shared_ptr<core::logging::Logger> logger_reference_;
+  std::map<std::string, std::string> attributes;
+  uint64_t size{0};

Review Comment:
   is this `size` field used anywhere?



##########
libminifi/src/sitetosite/SiteToSiteFactory.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/SiteToSiteFactory.h"
+
+#include <utility>
+#include <memory>
+
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "sitetosite/HttpSiteToSiteClient.h"
+#include "utils/net/AsioSocketUtils.h"
+#include "core/ClassLoader.h"
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::unique_ptr<SiteToSitePeer> createStreamingPeer(const 
SiteToSiteClientConfiguration &client_configuration) {
+  utils::net::SocketData socket_data{client_configuration.getHost(), 
client_configuration.getPort(), client_configuration.getSecurityContext()};
+  auto connection = 
std::make_unique<utils::net::AsioSocketConnection>(socket_data);
+  return std::make_unique<SiteToSitePeer>(std::move(connection), 
client_configuration.getHost(), client_configuration.getPort(), 
client_configuration.getInterface());
+}
+
+void setCommonConfigurationOptions(SiteToSiteClient& client, const 
SiteToSiteClientConfiguration &client_configuration) {
+  client.setSSLContextService(client_configuration.getSecurityContext());
+  client.setUseCompression(client_configuration.getUseCompression());
+  if (client_configuration.getBatchCount()) {
+    client.setBatchCount(client_configuration.getBatchCount().value());
+  }
+  if (client_configuration.getBatchSize()) {
+    client.setBatchSize(client_configuration.getBatchSize().value());
+  }
+  if (client_configuration.getBatchDuration()) {
+    client.setBatchDuration(client_configuration.getBatchDuration().value());
+  }
+  if (client_configuration.getTimeout()) {
+    client.setTimeout(client_configuration.getTimeout().value());
+  }
+}
+
+std::unique_ptr<SiteToSiteClient> createRawSocketSiteToSiteClient(const 
SiteToSiteClientConfiguration &client_configuration) {
+  auto raw_site_to_site_client = 
std::make_unique<RawSiteToSiteClient>(createStreamingPeer(client_configuration));
+  raw_site_to_site_client->setPortId(client_configuration.getPortId());
+  setCommonConfigurationOptions(*raw_site_to_site_client, 
client_configuration);
+  return raw_site_to_site_client;
+}
+
+std::unique_ptr<SiteToSiteClient> createHttpSiteToSiteClient(const 
SiteToSiteClientConfiguration &client_configuration) {
+  auto peer = std::make_unique<SiteToSitePeer>(client_configuration.getHost(), 
client_configuration.getPort(), client_configuration.getInterface());
+  peer->setHTTPProxy(client_configuration.getHTTPProxy());
+
+  auto http_site_to_site_client = 
std::make_unique<HttpSiteToSiteClient>(std::move(peer));
+  http_site_to_site_client->setPortId(client_configuration.getPortId());
+  
http_site_to_site_client->setIdleTimeout(client_configuration.getIdleTimeout());
+  setCommonConfigurationOptions(*http_site_to_site_client, 
client_configuration);
+  return http_site_to_site_client;
+}
+}  // namespace
+
+std::unique_ptr<SiteToSiteClient> createClient(const 
SiteToSiteClientConfiguration &client_configuration) {
+  switch (client_configuration.getClientType()) {
+    case ClientType::RAW:
+      return createRawSocketSiteToSiteClient(client_configuration);
+    case ClientType::HTTP:
+      return createHttpSiteToSiteClient(client_configuration);
+  }
+  return nullptr;

Review Comment:
   It would be better to terminate here; could this be a `gsl_Assert(false)` or 
something similar?



##########
libminifi/include/sitetosite/SiteToSite.h:
##########
@@ -23,35 +23,28 @@
 
 #include "minifi-cpp/controllers/SSLContextService.h"
 #include "Peer.h"
-#include "core/Property.h"
-#include "properties/Configure.h"
 #include "io/CRCStream.h"
 #include "utils/Id.h"
-#include "http/BaseHTTPClient.h"
-#include "utils/Export.h"
 
 namespace org::apache::nifi::minifi::sitetosite {
-#if defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wunused-variable"
-#endif
-
-// Resource Negotiated Status Code
-#define RESOURCE_OK 20
-#define DIFFERENT_RESOURCE_VERSION 21
-#define NEGOTIATED_ABORT 255
-// ! Max attributes
-#define MAX_NUM_ATTRIBUTES 25000
-
-// Respond Code Sequence Pattern
-static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
-static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
+
+enum class ResourceNegotiationStatusCode : uint8_t {
+  RESOURCE_OK = 20,
+  DIFFERENT_RESOURCE_VERSION = 21,
+  NEGOTIATED_ABORT = 255
+};
+
+static constexpr uint32_t MAX_NUM_ATTRIBUTES = 25000;
+
+// Response Code Sequence Pattern
+static constexpr uint8_t CODE_SEQUENCE_VALUE_1 = static_cast<uint8_t>('R');
+static constexpr uint8_t CODE_SEQUENCE_VALUE_2 = static_cast<uint8_t>('C');

Review Comment:
   these should be `inline constexpr`, too



##########
SITE_TO_SITE.md:
##########
@@ -0,0 +1,262 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+## Table of Contents
+- [Site-to-Site Overview](#site-to-site-overview)
+- [Site-to-Site Configuration](#site-to-site-configuration)
+  - [Site-to-Site Configuration on NiFi 
side](#site-to-site-configuration-on-nifi-side)
+  - [Site-to-Site Configuration on MiNiFi C++ 
side](#site-to-site-configuration-on-minifi-c-side)
+- [Additional examples](#additional-examples)
+
+## Site-to-Site Overview
+
+Site-to-Site protocol allows data to be transferred between MiNiFi C++ and 
NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote 
process groups. This is useful for scenarios where you want to send data from 
MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and 
HTTP protocols.
+
+At the moment site-to-site protocol is only supported between MiNiFi C++ and 
NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++ 
instances. It is recommended to use processors like InvokeHTTP and ListenHTTP 
to transfer data between MiNiFi C++ instances.
+
+## Site-to-Site Configuration
+
+### Site-to-Site Configuration on NiFi side
+
+On NiFi side, site-to-site protocol is configured by creating input and output 
ports. The input port is used to receive data from MiNiFi C++ and the output 
port is used to send data to MiNiFi C++. The input and output ports can be 
created in the NiFi UI by dragging and dropping the input and output port icons 
onto the canvas.
+
+To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the 
instance id of the port should be used. The instance id can be found in the 
NiFi UI by clicking on the input or output port and looking at the operation 
panel. It can be copied from that panel, or from the port "instanceIdentifier" 
field from configuration json file in the NiFi conf directory.
+
+### Site-to-Site Configuration on MiNiFi C++ side
+
+Site-to-Site protocol is configured on the MiNiFi C++ side by using remote 
process groups in the configuration. The remote process group represents the 
NiFi endpoint and uses the instance ids of the ports created on the NiFi side. 
The remote process group can be configured to use either raw TCP or HTTP 
protocol.
+
+Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++ 
where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol:
+
+```yaml
+MiNiFi Config Version: 3
+Flow Controller:
+  name: Simple GenerateFlowFile to RPG
+Processors:
+  - id: b0c04f28-0158-1000-0000-000000000000
+    name: GenerateFlowFile
+    class: org.apache.nifi.processors.standard.GenerateFlowFile
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 5 sec
+    auto-terminated relationships list: []
+    Properties:
+      Data Format: Text
+      Unique FlowFiles: false
+      Custom Text: Custom text
+Connections:
+  - id: b0c0c3cc-0158-1000-0000-000000000000
+    name: GenerateFlowFile/succes/nifi-inputport
+    source id: b0c04f28-0158-1000-0000-000000000000
+    destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6
+    source relationship name: success
+Remote Process Groups:
+  - id: b0c09ff0-0158-1000-0000-000000000000
+    name: "RPG"
+    url: http://localhost:8080/nifi
+    timeout: 20 sec
+    yield period: 10 sec
+    transport protocol: RAW
+    Input Ports:
+      - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6  # this is the instance id of 
the input port created in NiFi
+        name: nifi-inputport
+        max concurrent tasks: 1
+        use compression: false  # currently not supported and ignored in 
MiNiFi C++
+        batch size:
+          size: 10 MB
+          count: 10
+          duration: 30 sec
+    Output Ports: []
+```
+
+Here is another example in NiFi style json format how to configure 
site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving 
data from NiFi using the HTTP protocol:
+
+```json

Review Comment:
   Here in the readme, I think it would be better to have YAML configurations 
for both cases, because YAML is more readable, and it would be easier to 
compare the two cases (input vs output port).
   
   I think it's enough to include JSON versions of these configurations in the 
examples directory, and mention them in this readme (as you already do).



##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
 #pragma once
 
 #include <algorithm>
-#include <array>
 #include <map>
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
+#include <optional>
 
 #include "Peer.h"
 #include "SiteToSite.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
 
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
 
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+}  // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
  public:
-  DataPacket(std::shared_ptr<core::logging::Logger> logger, 
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> 
attributes, const std::string &payload)
-      : _attributes{std::move(attributes)},
-        transaction_{std::move(transaction)},
-        payload_{payload},
-        logger_reference_{std::move(logger)} {
+  DataPacket(std::shared_ptr<Transaction> transaction, const std::string 
&payload)
+      : transaction{std::move(transaction)},
+        payload{payload} {
+  }
+  DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string, 
std::string> attributes, const std::string &payload)
+      : attributes{std::move(attributes)},
+        transaction{std::move(transaction)},
+        payload{payload} {
   }
-  std::map<std::string, std::string> _attributes;
-  uint64_t _size{0};
-  std::shared_ptr<Transaction> transaction_;
-  const std::string & payload_;
-  std::shared_ptr<core::logging::Logger> logger_reference_;
+  std::map<std::string, std::string> attributes;
+  uint64_t size{0};
+  std::shared_ptr<Transaction> transaction;
+  const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+  ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+  std::string message;
 };
 
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
  public:
-  SiteToSiteClient()
-      : core::ConnectableImpl("SitetoSiteClient") {
+  explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+      : peer_(std::move(peer)) {
+    gsl_Assert(peer_);

Review Comment:
   Can we change the type to `gsl::not_null` (and get rid of the assert)?



##########
SITE_TO_SITE.md:
##########
@@ -0,0 +1,262 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+## Table of Contents
+- [Site-to-Site Overview](#site-to-site-overview)
+- [Site-to-Site Configuration](#site-to-site-configuration)
+  - [Site-to-Site Configuration on NiFi 
side](#site-to-site-configuration-on-nifi-side)
+  - [Site-to-Site Configuration on MiNiFi C++ 
side](#site-to-site-configuration-on-minifi-c-side)
+- [Additional examples](#additional-examples)
+
+## Site-to-Site Overview
+
+Site-to-Site protocol allows data to be transferred between MiNiFi C++ and 
NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote 
process groups. This is useful for scenarios where you want to send data from 
MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and 
HTTP protocols.
+
+At the moment site-to-site protocol is only supported between MiNiFi C++ and 
NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++ 
instances. It is recommended to use processors like InvokeHTTP and ListenHTTP 
to transfer data between MiNiFi C++ instances.
+
+## Site-to-Site Configuration
+
+### Site-to-Site Configuration on NiFi side
+
+On NiFi side, site-to-site protocol is configured by creating input and output 
ports. The input port is used to receive data from MiNiFi C++ and the output 
port is used to send data to MiNiFi C++. The input and output ports can be 
created in the NiFi UI by dragging and dropping the input and output port icons 
onto the canvas.
+
+To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the 
instance id of the port should be used. The instance id can be found in the 
NiFi UI by clicking on the input or output port and looking at the operation 
panel. It can be copied from that panel, or from the port "instanceIdentifier" 
field from configuration json file in the NiFi conf directory.
+
+### Site-to-Site Configuration on MiNiFi C++ side
+
+Site-to-Site protocol is configured on the MiNiFi C++ side by using remote 
process groups in the configuration. The remote process group represents the 
NiFi endpoint and uses the instance ids of the ports created on the NiFi side. 
The remote process group can be configured to use either raw TCP or HTTP 
protocol.
+
+Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++ 
where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol:
+
+```yaml
+MiNiFi Config Version: 3
+Flow Controller:
+  name: Simple GenerateFlowFile to RPG
+Processors:
+  - id: b0c04f28-0158-1000-0000-000000000000
+    name: GenerateFlowFile
+    class: org.apache.nifi.processors.standard.GenerateFlowFile
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 5 sec
+    auto-terminated relationships list: []
+    Properties:
+      Data Format: Text
+      Unique FlowFiles: false
+      Custom Text: Custom text
+Connections:
+  - id: b0c0c3cc-0158-1000-0000-000000000000
+    name: GenerateFlowFile/succes/nifi-inputport
+    source id: b0c04f28-0158-1000-0000-000000000000
+    destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6
+    source relationship name: success
+Remote Process Groups:
+  - id: b0c09ff0-0158-1000-0000-000000000000
+    name: "RPG"
+    url: http://localhost:8080/nifi
+    timeout: 20 sec
+    yield period: 10 sec
+    transport protocol: RAW
+    Input Ports:
+      - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6  # this is the instance id of 
the input port created in NiFi
+        name: nifi-inputport
+        max concurrent tasks: 1
+        use compression: false  # currently not supported and ignored in 
MiNiFi C++
+        batch size:
+          size: 10 MB
+          count: 10
+          duration: 30 sec
+    Output Ports: []
+```
+
+Here is another example in NiFi style json format how to configure 
site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving 
data from NiFi using the HTTP protocol:
+
+```json
+{
+    "encodingVersion": {
+        "majorVersion": 2,
+        "minorVersion": 0
+    },
+    "maxTimerDrivenThreadCount": 1,
+    "maxEventDrivenThreadCount": 1,
+    "parameterContexts": [],
+    "rootGroup": {
+        "identifier": "c5bceca3-9c20-4068-bf2d-425e14026471",
+        "instanceIdentifier": "3cb4b3ce-7cd8-4ab7-a6bf-d4640ac5db43",
+        "name": "root",
+        "position": {
+            "x": 0.0,
+            "y": 0.0
+        },
+        "processGroups": [],
+        "remoteProcessGroups": [
+            {
+                "identifier": "327b446a-0043-48d1-8bb4-df65ba1afa60",
+                "instanceIdentifier": "2ed47dca-38f5-476d-9c37-5ea0a5072f1e",
+                "name": "https://localhost:8443/nifi";,
+                "position": {
+                    "x": 235.0,
+                    "y": 71.00000762939453
+                },
+                "targetUri": "https://localhost:8443/nifi";,
+                "targetUris": "https://localhost:8443/nifi";,
+                "communicationsTimeout": "30 secs",
+                "yieldDuration": "10 sec",
+                "transportProtocol": "HTTP",
+                "inputPorts": [],
+                "outputPorts": [
+                    {
+                        "identifier": "de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+                        "instanceIdentifier": 
"de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+                        "name": "nifi-outputport",
+                        "remoteGroupId": 
"327b446a-0043-48d1-8bb4-df65ba1afa60",
+                        "componentType": "REMOTE_OUTPUT_PORT",
+                        "targetId": "de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+                        "groupIdentifier": 
"c5bceca3-9c20-4068-bf2d-425e14026471"
+                    }
+                ],
+                "componentType": "REMOTE_PROCESS_GROUP",
+                "groupIdentifier": "c5bceca3-9c20-4068-bf2d-425e14026471"
+            }
+        ],
+        "processors": [
+            {
+                "identifier": "f29a2667-7c86-4b22-a5d3-a23ee88f3c66",
+                "instanceIdentifier": "7511c14c-9923-43ef-90b4-ac3e05b1a9fa",
+                "name": "PutFile",
+                "comments": "",
+                "position": {
+                    "x": 1042.0,
+                    "y": 90.5
+                },
+                "type": "org.apache.nifi.minifi.processors.PutFile",
+                "bundle": {
+                    "group": "org.apache.nifi.minifi",
+                    "artifact": "minifi-standard-processors",
+                    "version": "1.0.0"
+                },
+                "properties": {
+                    "Create Missing Directories": "true",
+                    "Maximum File Count": "-1",
+                    "Directory": ".",
+                    "Conflict Resolution Strategy": "fail"
+                },
+                "propertyDescriptors": {
+                    "Permissions": {
+                        "name": "Permissions",
+                        "identifiesControllerService": false,
+                        "sensitive": false
+                    },
+                    "Create Missing Directories": {
+                        "name": "Create Missing Directories",
+                        "identifiesControllerService": false,
+                        "sensitive": false
+                    },
+                    "Maximum File Count": {
+                        "name": "Maximum File Count",
+                        "identifiesControllerService": false,
+                        "sensitive": false
+                    },
+                    "Directory Permissions": {
+                        "name": "Directory Permissions",
+                        "identifiesControllerService": false,
+                        "sensitive": false
+                    },
+                    "Directory": {
+                        "name": "Directory",
+                        "identifiesControllerService": false,
+                        "sensitive": false
+                    },
+                    "Conflict Resolution Strategy": {
+                        "name": "Conflict Resolution Strategy",
+                        "identifiesControllerService": false,
+                        "sensitive": false
+                    }
+                },
+                "style": {},
+                "schedulingPeriod": "1000 ms",
+                "schedulingStrategy": "TIMER_DRIVEN",
+                "executionNode": "ALL",
+                "penaltyDuration": "30000 ms",
+                "yieldDuration": "1000 ms",
+                "bulletinLevel": "WARN",
+                "runDurationMillis": 0,
+                "concurrentlySchedulableTaskCount": 1,
+                "autoTerminatedRelationships": [
+                    "success",
+                    "failure"
+                ],
+                "componentType": "PROCESSOR",
+                "groupIdentifier": "c5bceca3-9c20-4068-bf2d-425e14026471"
+            }
+        ],
+        "inputPorts": [],
+        "outputPorts": [],
+        "connections": [
+            {
+                "identifier": "bab1ce73-e9e5-4a9a-a990-ee9c65668d8c",
+                "instanceIdentifier": "9526b397-190a-4fe3-bf0f-bd7dfc2dfafc",
+                "name": "nifi-outputport/undefined/PutFile",
+                "position": {
+                    "x": 0.0,
+                    "y": 0.0
+                },
+                "source": {
+                    "id": "de7cc09a-0196-1000-2c63-ee6b4319ffb6",
+                    "type": "REMOTE_OUTPUT_PORT",
+                    "groupId": "327b446a-0043-48d1-8bb4-df65ba1afa60",
+                    "name": "nifi-outputport"
+                },
+                "destination": {
+                    "id": "f29a2667-7c86-4b22-a5d3-a23ee88f3c66",
+                    "type": "PROCESSOR",
+                    "groupId": "c5bceca3-9c20-4068-bf2d-425e14026471",
+                    "name": "PutFile",
+                    "instanceIdentifier": 
"7511c14c-9923-43ef-90b4-ac3e05b1a9fa"
+                },
+                "labelIndex": 1,
+                "zIndex": 0,
+                "selectedRelationships": [
+                    "undefined"
+                ],
+                "backPressureObjectThreshold": 2000,
+                "backPressureDataSizeThreshold": "100 MB",
+                "flowFileExpiration": "0 seconds",
+                "prioritizers": [],
+                "bends": [],
+                "componentType": "CONNECTION",
+                "groupIdentifier": "c5bceca3-9c20-4068-bf2d-425e14026471"
+            }
+        ],
+        "labels": [],
+        "funnels": [],
+        "controllerServices": [],
+        "variables": {},
+        "componentType": "PROCESS_GROUP"
+    }
+}
+```
+
+Notes on the configuration:
+
+- In the MiNiFi C++ configuration, in yaml configuration the remote input and 
output ports' `id` field, and in json configuration the ports' `identifier`, 
`instanceIdentifier`, and `targetId` fields should be set to the instance id of 
the input and output ports created in NiFi 
(`de7cc09a-0196-1000-2c63-ee6b4319ffb6` in the examples).
+- Connections from the remote output port to the processor should use the 
`undefined` relationship
+- `useCompression` can be set, but it is currently not supported in MiNiFi C++ 
so it will be set to false in the site-to-site messages
+- the `targetUri` and `targetUris` field in the remote process group should be 
set to the NiFi instance's URL, this can also use comma separated list of URLs 
if the remote process group is configured to use multiple NiFi nodes

Review Comment:
   ```suggestion
   - the `url` field (`targetUri` or `targetUris` in JSON) field in the remote 
process group should be set to the NiFi instance's URL, this can also use comma 
separated list of URLs if the remote process group is configured to use 
multiple NiFi nodes
   ```



##########
libminifi/include/sitetosite/Peer.h:
##########
@@ -109,175 +50,109 @@ class PeerStatus {
   PeerStatus& operator=(const PeerStatus &other) = default;
   PeerStatus& operator=(PeerStatus &&other) = default;
 
-  const std::shared_ptr<Peer> &getPeer() const {
-    return peer_;
+  const utils::Identifier &getPortId() const {
+    return port_id_;
+  }
+
+  const std::string &getHost() const {
+    return host_;
+  }
+
+  [[nodiscard]] uint16_t getPort() const {
+    return port_;
   }
 
-  uint32_t getFlowFileCount() {
+  [[nodiscard]] uint32_t getFlowFileCount() const {
     return flow_file_count_;
   }
 
-  bool getQueryForPeers() {
+  [[nodiscard]] bool getQueryForPeers() const {
     return query_for_peers_;
   }
 
  protected:
-  std::shared_ptr<Peer> peer_;
+  utils::Identifier port_id_;
+  std::string host_;
+  uint16_t port_;
   uint32_t flow_file_count_;
   bool query_for_peers_;
 };
 
-static const char MAGIC_BYTES[] = { 'N', 'i', 'F', 'i' };
-
-// Site2SitePeer Class
-class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStreamImpl {
+class SiteToSitePeer : public io::BaseStreamImpl {
  public:
-  SiteToSitePeer()
-      : stream_(nullptr),
-        host_(""),
-        port_(-1) {
-  }
-  /*
-   * Create a new site2site peer
-   */
-  explicit 
SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BaseStream> 
injected_socket,
-                          const std::string host,
-                          uint16_t port,
-                          const std::string &ifc)
+  SiteToSitePeer(std::unique_ptr<io::BaseStream> injected_socket, const 
std::string& host, uint16_t port, const std::string& ifc)
       : SiteToSitePeer(host, port, ifc) {
     stream_ = std::move(injected_socket);
   }
 
-  explicit SiteToSitePeer(const std::string &host, uint16_t port, const 
std::string &ifc)
-      : stream_(nullptr),
-        host_(host),
+  SiteToSitePeer(const std::string &host, uint16_t port, const std::string 
&ifc)
+      : host_(host),
         port_(port),
-        timeout_(std::chrono::seconds(30)),
-        logger_(core::logging::LoggerFactory<SiteToSitePeer>::getLogger()) {
-    url_ = "nifi://" + host_ + ":" + std::to_string(port_);
-    yield_expiration_ = std::chrono::system_clock::time_point();
-    timeout_ = std::chrono::seconds(30);
-    local_network_interface_ = io::NetworkInterface(ifc, nullptr);
+        url_("nifi://" + host_ + ":" + std::to_string(port_)),
+        local_network_interface_(io::NetworkInterface(ifc, nullptr)) {
+    timeout_.store(30s);

Review Comment:
   `timeout_` is already initialized below, by the default member initializer



##########
libminifi/test/libtest/integration/HTTPHandlers.cpp:
##########
@@ -93,6 +95,25 @@ TransactionResponder::TransactionResponder(std::string 
base_url, std::string por
 }
 
 bool TransactionResponder::handlePost(CivetServer* /*server*/, struct 
mg_connection *conn) {
+  auto req_info = mg_get_request_info(conn);
+  std::unordered_map<std::string, std::string> expected_headers {

Review Comment:
   This could be massaged into a `static constexpr`, but I'm not sure we want 
to bother in a test class. But at least it should be `static const`.



##########
libminifi/include/sitetosite/SiteToSite.h:
##########
@@ -205,140 +147,184 @@ typedef enum {
   ABORT = 250,
   UNRECOGNIZED_RESPONSE_CODE = 254,
   END_OF_STREAM = 255
-}RespondCode;
-
-// Respond Code Class
-typedef struct {
-  RespondCode code;
-  const char *description;
-  bool hasDescription;
-} RespondCodeContext;
-
-
+};
 
-// Request Type Str
-class SiteToSiteRequest {
- public:
-  MINIFIAPI static const char *RequestTypeStr[MAX_REQUEST_TYPE];
-  MINIFIAPI static RespondCodeContext respondCodeContext[21];
+struct ResponseCodeContext {
+  ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+  const std::string_view description;
+  bool has_description = false;
 };
 
+static constexpr std::array<ResponseCodeContext, 21> respond_code_contexts = {{

Review Comment:
   `response_code_contexts` would be a better name
   
   (also this should be `inline constexpr`)



##########
SITE_TO_SITE.md:
##########
@@ -0,0 +1,262 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+## Table of Contents
+- [Site-to-Site Overview](#site-to-site-overview)
+- [Site-to-Site Configuration](#site-to-site-configuration)
+  - [Site-to-Site Configuration on NiFi 
side](#site-to-site-configuration-on-nifi-side)
+  - [Site-to-Site Configuration on MiNiFi C++ 
side](#site-to-site-configuration-on-minifi-c-side)
+- [Additional examples](#additional-examples)
+
+## Site-to-Site Overview
+
+Site-to-Site protocol allows data to be transferred between MiNiFi C++ and 
NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote 
process groups. This is useful for scenarios where you want to send data from 
MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and 
HTTP protocols.
+
+At the moment site-to-site protocol is only supported between MiNiFi C++ and 
NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++ 
instances. It is recommended to use processors like InvokeHTTP and ListenHTTP 
to transfer data between MiNiFi C++ instances.
+
+## Site-to-Site Configuration
+
+### Site-to-Site Configuration on NiFi side
+
+On NiFi side, site-to-site protocol is configured by creating input and output 
ports. The input port is used to receive data from MiNiFi C++ and the output 
port is used to send data to MiNiFi C++. The input and output ports can be 
created in the NiFi UI by dragging and dropping the input and output port icons 
onto the canvas.
+
+To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the 
instance id of the port should be used. The instance id can be found in the 
NiFi UI by clicking on the input or output port and looking at the operation 
panel. It can be copied from that panel, or from the port "instanceIdentifier" 
field from configuration json file in the NiFi conf directory.
+
+### Site-to-Site Configuration on MiNiFi C++ side
+
+Site-to-Site protocol is configured on the MiNiFi C++ side by using remote 
process groups in the configuration. The remote process group represents the 
NiFi endpoint and uses the instance ids of the ports created on the NiFi side. 
The remote process group can be configured to use either raw TCP or HTTP 
protocol.
+
+Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++ 
where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol:
+
+```yaml
+MiNiFi Config Version: 3
+Flow Controller:
+  name: Simple GenerateFlowFile to RPG
+Processors:
+  - id: b0c04f28-0158-1000-0000-000000000000
+    name: GenerateFlowFile
+    class: org.apache.nifi.processors.standard.GenerateFlowFile
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 5 sec
+    auto-terminated relationships list: []
+    Properties:
+      Data Format: Text
+      Unique FlowFiles: false
+      Custom Text: Custom text
+Connections:
+  - id: b0c0c3cc-0158-1000-0000-000000000000
+    name: GenerateFlowFile/succes/nifi-inputport
+    source id: b0c04f28-0158-1000-0000-000000000000
+    destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6
+    source relationship name: success
+Remote Process Groups:
+  - id: b0c09ff0-0158-1000-0000-000000000000
+    name: "RPG"
+    url: http://localhost:8080/nifi
+    timeout: 20 sec
+    yield period: 10 sec
+    transport protocol: RAW
+    Input Ports:
+      - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6  # this is the instance id of 
the input port created in NiFi
+        name: nifi-inputport
+        max concurrent tasks: 1
+        use compression: false  # currently not supported and ignored in 
MiNiFi C++
+        batch size:
+          size: 10 MB
+          count: 10
+          duration: 30 sec
+    Output Ports: []
+```
+
+Here is another example in NiFi style json format how to configure 
site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving 
data from NiFi using the HTTP protocol:
+
+```json
+{
+    "encodingVersion": {
+        "majorVersion": 2,
+        "minorVersion": 0
+    },
+    "maxTimerDrivenThreadCount": 1,
+    "maxEventDrivenThreadCount": 1,
+    "parameterContexts": [],
+    "rootGroup": {
+        "identifier": "c5bceca3-9c20-4068-bf2d-425e14026471",
+        "instanceIdentifier": "3cb4b3ce-7cd8-4ab7-a6bf-d4640ac5db43",
+        "name": "root",
+        "position": {
+            "x": 0.0,
+            "y": 0.0
+        },
+        "processGroups": [],
+        "remoteProcessGroups": [
+            {
+                "identifier": "327b446a-0043-48d1-8bb4-df65ba1afa60",
+                "instanceIdentifier": "2ed47dca-38f5-476d-9c37-5ea0a5072f1e",
+                "name": "https://localhost:8443/nifi";,
+                "position": {
+                    "x": 235.0,
+                    "y": 71.00000762939453
+                },
+                "targetUri": "https://localhost:8443/nifi";,
+                "targetUris": "https://localhost:8443/nifi";,

Review Comment:
   We shouldn't have both `targetUri` and `targetUris` in the example, since 
minifi will only use the one it finds first in the config.



##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,182 @@
 #pragma once
 
 #include <algorithm>
-#include <array>
 #include <map>
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
+#include <optional>
 
 #include "Peer.h"
 #include "SiteToSite.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
 
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
 
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+}  // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
  public:
-  DataPacket(std::shared_ptr<core::logging::Logger> logger, 
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> 
attributes, const std::string &payload)
-      : _attributes{std::move(attributes)},
-        transaction_{std::move(transaction)},
-        payload_{payload},
-        logger_reference_{std::move(logger)} {
+  DataPacket(std::shared_ptr<Transaction> transaction, const std::string 
&payload)
+      : transaction{std::move(transaction)},
+        payload{payload} {
+  }
+  DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string, 
std::string> attributes, const std::string &payload)
+      : attributes{std::move(attributes)},
+        transaction{std::move(transaction)},
+        payload{payload} {
   }
-  std::map<std::string, std::string> _attributes;
-  uint64_t _size{0};
-  std::shared_ptr<Transaction> transaction_;
-  const std::string & payload_;
-  std::shared_ptr<core::logging::Logger> logger_reference_;
+  std::map<std::string, std::string> attributes;
+  uint64_t size{0};
+  std::shared_ptr<Transaction> transaction;
+  const std::string& payload;
+};
+
+struct SiteToSiteResponse {
+  ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+  std::string message;
 };
 
-class SiteToSiteClient : public core::ConnectableImpl {
+class SiteToSiteClient {
  public:
-  SiteToSiteClient()
-      : core::ConnectableImpl("SitetoSiteClient") {
+  explicit SiteToSiteClient(std::unique_ptr<SiteToSitePeer> peer)
+      : peer_(std::move(peer)) {
+    gsl_Assert(peer_);
   }
 
-  ~SiteToSiteClient() override = default;
+  SiteToSiteClient(const SiteToSiteClient&) = delete;
+  SiteToSiteClient(SiteToSiteClient&&) = delete;
+  SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+  SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
 
-  void setSSLContextService(const 
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
-    ssl_context_service_ = context_service;
-  }
+  virtual ~SiteToSiteClient() = default;
 
-  /**
-   * Creates a transaction using the transaction ID and the direction
-   * @param transactionID transaction identifier
-   * @param direction direction of transfer
-   */
-  virtual std::shared_ptr<Transaction> createTransaction(TransferDirection 
direction) = 0;
+  virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+  virtual bool transmitPayload(core::ProcessContext& context, const 
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
 
-  /**
-   * Transfers flow files
-   * @param direction transfer direction
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transfer(TransferDirection direction, core::ProcessContext& 
context, core::ProcessSession& session) {
-#ifndef WIN32
-    if (__builtin_expect(direction == SEND, 1)) {
+  bool transfer(TransferDirection direction, core::ProcessContext& context, 
core::ProcessSession& session) {
+    if (direction == TransferDirection::SEND) {
       return transferFlowFiles(context, session);
     } else {
       return receiveFlowFiles(context, session);
     }
-#else
-    if (direction == SEND) {
-      return transferFlowFiles(context, session);
-    } else {
-      return receiveFlowFiles(context, session);
-    }
-#endif
   }
 
-  /**
-   * Transfers flow files to server
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transferFlowFiles(core::ProcessContext& context, 
core::ProcessSession& session);
-
-  /**
-   * Receive flow files from server
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-
-  // Confirm the data that was sent or received by comparing CRC32's of the 
data sent and the data received.
-  // Receive flow files for the process session
-  bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession& 
session);
-
-  // Receive the data packet from the transaction
-  // Return false when any error occurs
-  bool receive(const utils::Identifier &transactionID, DataPacket *packet, 
bool &eof);
-  /**
-   * Transfers raw data and attributes  to server
-   * @param context process context
-   * @param session process session
-   * @param payload data to transmit
-   * @param attributes
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transmitPayload(core::ProcessContext& context, 
core::ProcessSession& session, const std::string &payload,
-                               std::map<std::string, std::string> attributes) 
= 0;
-
-  void setPortId(utils::Identifier &id) {
+  void setPortId(const utils::Identifier& id) {
     port_id_ = id;
   }
 
-  /**
-   * Sets the idle timeout.
-   */
   void setIdleTimeout(std::chrono::milliseconds timeout) {
      idle_timeout_ = timeout;
   }
 
-  /**
-   * Sets the base peer for this interface.
-   */
-  virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
-    peer_ = std::move(peer);
-  }
-
-  /**
-   * Provides a reference to the port identifier
-   * @returns port identifier
-   */
-  utils::Identifier getPortId() const {
+  [[nodiscard]] utils::Identifier getPortId() const {
     return port_id_;
   }
 
-  /**
-   * Obtains the peer list and places them into the provided vector
-   * @param peers peer vector.
-   * @return true if successful, false otherwise
-   */
-  virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
-  /**
-   * Establishes the interface.
-   * @return true if successful, false otherwise
-   */
-  virtual bool establish() = 0;
-
-  const std::shared_ptr<core::logging::Logger> &getLogger() {
+  [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
     return logger_;
   }
 
-  void yield() override {
+  void setSSLContextService(const 
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
+    ssl_context_service_ = context_service;
   }
 
-  /**
-   * Determines if we are connected and operating
-   */
-  bool isRunning() const override {
-    return running_;
+  void setUseCompression(bool use_compression) {
+    use_compression_ = use_compression;
   }
 
-  /**
-   * Determines if work is available by this connectable
-   * @return boolean if work is available.
-   */
-  bool isWorkAvailable() override {
-    return true;
+  void setBatchSize(uint64_t size) {
+    batch_size_ = size;
   }
 
-  virtual bool bootstrap() {
-    return true;
+  void setBatchCount(uint64_t count) {
+    batch_count_ = count;
   }
 
-  // Return -1 when any error occurs
-  virtual int16_t send(const utils::Identifier& transactionID, DataPacket* 
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession* 
session);
+  void setBatchDuration(std::chrono::milliseconds duration) {
+    batch_duration_ = duration;
+  }
 
- protected:
-  // Cancel the transaction
-  virtual void cancel(const utils::Identifier &transactionID);
-  // Complete the transaction
-  virtual bool complete(core::ProcessContext& context, const utils::Identifier 
&transactionID);
-  // Error the transaction
-  virtual void error(const utils::Identifier &transactionID);
+  virtual void setTimeout(std::chrono::milliseconds timeout) {
+    timeout_ = timeout;
+  }
 
-  virtual bool confirm(const utils::Identifier &transactionID);
-  // deleteTransaction
-  virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+  friend class test::SiteToSiteClientTestAccessor;
 
+  virtual bool bootstrap() = 0;
+  virtual bool establish() = 0;
+  virtual std::shared_ptr<Transaction> createTransaction(TransferDirection 
direction) = 0;
   virtual void tearDown() = 0;
 
-  // read Respond
-  virtual int readResponse(const std::shared_ptr<Transaction> &transaction, 
RespondCode &code, std::string &message);
-  // write respond
-  virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, 
RespondCode code, const std::string& message);
-  // getRespondCodeContext
-  virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
-    for (auto & i : SiteToSiteRequest::respondCodeContext) {
-      if (i.code == code) {
-        return &i;
-      }
-    }
-    return nullptr;
-  }
+  virtual void deleteTransaction(const utils::Identifier &transaction_id);
+  virtual std::optional<SiteToSiteResponse> readResponse(const 
std::shared_ptr<Transaction> &transaction);
+  virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction, 
const SiteToSiteResponse& response);
 
-  // Peer State
-  PeerState peer_state_{PeerState::IDLE};
+  bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+  bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>& 
transaction, const std::map<std::string, std::string>& attributes);
+  void finalizeSendTransaction(const std::shared_ptr<Transaction>& 
transaction, uint64_t sent_bytes);
+  bool sendPacket(const DataPacket& packet);
+  bool sendFlowFile(const std::shared_ptr<Transaction>& transaction, 
core::FlowFile& flow_file, core::ProcessSession& session);
 
-  // portId
-  utils::Identifier port_id_;
+  void cancel(const utils::Identifier &transaction_id);
+  bool complete(core::ProcessContext& context, const utils::Identifier 
&transaction_id);
+  void error(const utils::Identifier &transaction_id);
+  bool confirm(const utils::Identifier &transaction_id);
 
-  // idleTimeout
-  std::chrono::milliseconds idle_timeout_{15000};
+  void handleTransactionError(const std::shared_ptr<Transaction>& transaction, 
core::ProcessContext& context, const std::exception& exception);
 
-  // Peer Connection
+  PeerState peer_state_{PeerState::IDLE};
+  utils::Identifier port_id_;
+  std::chrono::milliseconds idle_timeout_{15000};
   std::unique_ptr<SiteToSitePeer> peer_;
-
-  std::atomic<bool> running_{false};
-
-  // transaction map
   std::map<utils::Identifier, std::shared_ptr<Transaction>> 
known_transactions_;
+  std::chrono::nanoseconds batch_send_nanos_{5s};
 
-  // BATCH_SEND_NANOS
-  std::chrono::nanoseconds _batchSendNanos = std::chrono::seconds(5);
-
-  /***
-   * versioning
-   */
-  uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
-  int _currentVersionIndex{0};
-  uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
-  uint32_t _supportedCodecVersion[1] = {1};
-  int _currentCodecVersionIndex{0};
-  uint32_t 
_currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
+  const std::vector<uint32_t> supported_version_ = {5, 4, 3, 2, 1};

Review Comment:
   nitpicking, but `supported_versions_` and `supported_codec_versions_` would 
be better names



##########
libminifi/test/libtest/unit/DummyProcessor.h:
##########
@@ -41,7 +41,8 @@ class DummyProcessor : public minifi::core::ProcessorImpl {
       .isSensitive(true)
       .build();
   static constexpr auto Properties = std::array<core::PropertyReference, 
2>{SimpleProperty, SensitiveProperty};
-  static constexpr auto Relationships = 
std::array<core::RelationshipDefinition, 0>{};
+  static constexpr core::RelationshipDefinition Success{"success", "Success 
relationship"};
+  static constexpr auto Relationships = 
std::array<core::RelationshipDefinition, 1>{Success};

Review Comment:
   no type conversion is needed here, so
   ```suggestion
     static constexpr auto Relationships = std::array{Success};
   ```
   would work



##########
libminifi/src/RemoteProcessGroupPort.cpp:
##########
@@ -0,0 +1,379 @@
+/**
+ * @file RemoteProcessGroupPort.cpp
+ * RemoteProcessGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RemoteProcessGroupPort.h"
+
+#include <memory>
+#include <iostream>
+#include <vector>
+#include <string>
+#include <utility>
+#include <cinttypes>
+
+#include "sitetosite/Peer.h"
+#include "Exception.h"
+#include "sitetosite/SiteToSiteFactory.h"
+
+#include "rapidjson/document.h"
+
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Processor.h"
+#include "http/BaseHTTPClient.h"
+#include "controllers/SSLContextService.h"
+#include "utils/net/DNS.h"
+
+#undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, 
which conflicts with rapidjson
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi {
+
+namespace {
+std::string buildFullSiteToSiteUrl(const RPG& nifi) {
+  std::stringstream full_url;
+  full_url << nifi.protocol << nifi.host;
+  // don't append port if it is 0 ( undefined )
+  if (nifi.port > 0) {
+    full_url << ":" << std::to_string(nifi.port);
+  }
+  full_url << "/nifi-api/site-to-site";
+  return full_url.str();
+}
+}  // namespace
+
+const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME = 
"RemoteProcessGroupPortSSLContextService";
+
+void RemoteProcessGroupPort::setURL(const std::string& val) {
+  auto urls = utils::string::split(val, ",");
+  for (const auto& url : urls) {
+    http::URL parsed_url{utils::string::trim(url)};
+    if (parsed_url.isValid()) {
+      logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url, 
parsed_url.hostPort());
+      nifi_instances_.push_back({parsed_url.host(), parsed_url.port(), 
parsed_url.protocol()});
+    } else {
+      logger_->log_error("Could not parse RPG URL '{}'", url);
+    }
+  }
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient> 
RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration&
 config) const {
+  config.setSecurityContext(ssl_service_);
+  config.setHTTPProxy(proxy_);
+  config.setIdleTimeout(idle_timeout_);
+  config.setUseCompression(use_compression_);
+  config.setBatchCount(batch_count_);
+  config.setBatchSize(batch_size_);
+  config.setBatchDuration(batch_duration_);
+  config.setTimeout(timeout_);
+
+  return sitetosite::createClient(config);
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient> 
RemoteProcessGroupPort::getNextProtocol() {
+  std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+  if (!available_protocols_.try_dequeue(next_protocol)) {
+    if (peer_index_ >= 0) {
+      std::lock_guard<std::mutex> lock(peer_mutex_);
+      logger_->log_debug("Creating client from peer {}", peer_index_.load());
+      auto& peer_status = peers_[peer_index_];
+      sitetosite::SiteToSiteClientConfiguration 
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), 
local_network_interface_, client_type_);
+      peer_index_++;
+      if (peer_index_ >= static_cast<int>(peers_.size())) {
+        peer_index_ = 0;
+      }
+      next_protocol = initializeProtocol(config);
+    } else {
+      logger_->log_debug("Refreshing the peer list since there are none 
configured.");
+      refreshPeerList();
+    }
+  }
+  logger_->log_debug("Obtained protocol from available_protocols_");
+  return next_protocol;
+}
+
+void 
RemoteProcessGroupPort::returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient>
 return_protocol) {
+  auto count = peers_.size();
+  if (max_concurrent_tasks_ > count)
+    count = max_concurrent_tasks_;
+  if (available_protocols_.size_approx() >= count) {
+    logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
+    // let the memory be freed
+    return;
+  }
+  logger_->log_debug("enqueueing protocol {}, have a total of {}", 
getUUIDStr(), available_protocols_.size_approx());
+  available_protocols_.enqueue(std::move(return_protocol));
+}
+
+void RemoteProcessGroupPort::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+
+  logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  if (auto protocol_uuid = context.getProperty(portUUID)) {
+    protocol_uuid_ = *protocol_uuid;
+  }
+
+  auto context_name = context.getProperty(SSLContext);
+  if (!context_name || IsNullOrEmpty(*context_name)) {
+    context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+  }
+
+  std::shared_ptr<core::controller::ControllerService> service = 
context.getControllerService(*context_name, getUUID());
+  if (nullptr != service) {
+    ssl_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(service);
+  } else {
+    std::string secureStr;
+    if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && 
utils::string::toBool(secureStr).value_or(false)) {
+      ssl_service_ = 
std::make_shared<minifi::controllers::SSLContextServiceImpl>(RPG_SSL_CONTEXT_SERVICE_NAME,
 configure_);
+      ssl_service_->onEnable();
+    }
+  }
+
+  idle_timeout_ = context.getProperty(idleTimeout) | 
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | 
utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
+
+  std::lock_guard<std::mutex> lock(peer_mutex_);
+  if (!nifi_instances_.empty()) {
+    refreshPeerList();
+    if (!peers_.empty())
+      peer_index_ = 0;
+  }
+  // populate the site2site protocol for load balancing between them
+  if (!peers_.empty()) {
+    auto count = peers_.size();
+    if (max_concurrent_tasks_ > count)
+      count = max_concurrent_tasks_;
+    for (uint32_t i = 0; i < count; i++) {
+      auto peer_status = peers_[peer_index_];
+      sitetosite::SiteToSiteClientConfiguration 
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), 
local_network_interface_, client_type_);
+      peer_index_++;
+      if (peer_index_ >= static_cast<int>(peers_.size())) {
+        peer_index_ = 0;
+      }
+      logger_->log_trace("Creating client");
+      auto next_protocol = initializeProtocol(config);
+      logger_->log_trace("Created client, moving into available protocols");
+      returnProtocol(std::move(next_protocol));
+    }
+  } else {
+    // we don't have any peers
+    logger_->log_error("No peers selected during scheduling");
+  }
+}
+
+void RemoteProcessGroupPort::notifyStop() {
+  transmitting_ = false;
+  RPGLatch count(false);  // we're just a monitor
+  // we use the latch
+  while (count.getCount() > 0) {
+  }

Review Comment:
   This doesn't seem to do anything. First, we create the `RPGLatch` object 
with a `count` member which is a pointer to a static atomic integer which is 
initialized to 0 and its value is never changed. Next, we check if `*count > 0` 
(it isn't), and finally we continue on our way.
   
   It used to do something, because there was an `RPGLatch count;` line in 
`RemoteProcessorGroupPort::onTrigger()`. This was supposed to increment the 
atomic integer, but it was broken because the constructor and destructor 
incremented and decremented the pointer instead of the atomic integer it points 
to.
   
   I think we should fix the `RPGLatch` class (or replace it with something 
from the standard library if there is a replacement) and put the counter 
increment back into `onTrigger()`.  Or if it is no longer needed, then we 
should remove the `RPGLatch` class.



##########
libminifi/include/RemoteProcessGroupPort.h:
##########
@@ -155,102 +157,90 @@ class RemoteProcessorGroupPort : public 
core::ProcessorImpl {
     local_network_interface_ = ifc;
   }
 
-  std::string getInterface() {
-    return local_network_interface_;
-  }
+  void setURL(const std::string& val);
 
-  /**
-   * Sets the url. Supports a CSV
-   */
-  void setURL(std::string val) {
-    auto urls = utils::string::split(val, ",");
-    for (const auto& url : urls) {
-      http::URL parsed_url{utils::string::trim(url)};
-      if (parsed_url.isValid()) {
-        logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url, 
parsed_url.hostPort());
-        nifi_instances_.push_back({parsed_url.host(), parsed_url.port(), 
parsed_url.protocol()});
-      } else {
-        logger_->log_error("Could not parse RPG URL '{}'", url);
-      }
-    }
-  }
-
-  std::vector<RPG> getInstances() const {
+  [[nodiscard]] std::vector<RPG> getInstances() const {
     return nifi_instances_;
   }
 
   void setHTTPProxy(const http::HTTPProxy &proxy) {
-    this->proxy_ = proxy;
+    proxy_ = proxy;
   }
-  http::HTTPProxy getHTTPProxy() {
-    return this->proxy_;
-  }
-  // refresh remoteSite2SiteInfo via nifi rest api
-  std::pair<std::string, int> refreshRemoteSite2SiteInfo();
 
-  // refresh site2site peer list
-  void refreshPeerList();
+  [[nodiscard]] http::HTTPProxy getHTTPProxy() const {
+    return proxy_;
+  }
 
   void notifyStop() override;
 
   void enableHTTP() {
-    client_type_ = sitetosite::HTTP;
+    client_type_ = sitetosite::ClientType::HTTP;
   }
 
- protected:
-  /**
-   * Non static in case anything is loaded when this object is re-scheduled
-   */
-  bool is_http_disabled() {
-    auto ptr = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", 
"HTTPClient");
-    if (ptr != nullptr) {
-      delete ptr;
-      return false;
-    } else {
-      return true;
-    }
+  void setUseCompression(bool use_compression) {
+    use_compression_ = use_compression;
+  }
+
+  [[nodiscard]] bool getUseCompression() const {
+    return use_compression_;
+  }
+
+  void setBatchCount(uint64_t count) {
+    batch_count_ = count;
+  }
+
+  [[nodiscard]] std::optional<uint64_t> getBatchCount() const {
+    return batch_count_;
   }
 
-  std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol(bool create);
+  void setBatchSize(uint64_t size) {
+    batch_size_ = size;
+  }
+
+  [[nodiscard]] std::optional<uint64_t> getBatchSize() const {
+    return batch_size_;
+  }
+
+  void setBatchDuration(std::chrono::milliseconds duration) {
+    batch_duration_ = duration;
+  }
+
+  [[nodiscard]] std::optional<std::chrono::milliseconds> getBatchDuration() 
const {
+    return batch_duration_;
+  }
+
+ protected:
+  std::optional<std::pair<std::string, uint16_t>> 
refreshRemoteSiteToSiteInfo();
+  void refreshPeerList();
+  std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol();
   void returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> protocol);
 
   moodycamel::ConcurrentQueue<std::unique_ptr<sitetosite::SiteToSiteClient>> 
available_protocols_;
-
   std::shared_ptr<Configure> configure_;
-  // Transaction Direction
   sitetosite::TransferDirection direction_;
-  // Transmitting
   std::atomic<bool> transmitting_;
-  // timeout
-  uint64_t timeout_;
-  // local network interface
+  std::optional<std::chrono::milliseconds> timeout_;
   std::string local_network_interface_;
-
   utils::Identifier protocol_uuid_;
-
-  std::chrono::milliseconds idle_timeout_ = std::chrono::seconds(15);
-
-  // rest API end point info
-  std::vector<struct RPG> nifi_instances_;
-
-  // http proxy
+  std::chrono::milliseconds idle_timeout_ = 15s;
+  std::vector<RPG> nifi_instances_;
   http::HTTPProxy proxy_;
-
-  bool bypass_rest_api_;
-
-  sitetosite::CLIENT_TYPE client_type_;
-
-  // Remote Site2Site Info
-  bool site2site_secure_;
+  sitetosite::ClientType client_type_;
   std::vector<sitetosite::PeerStatus> peers_;
-  std::atomic<int> peer_index_;
+  std::atomic<int64_t> peer_index_;
   std::mutex peer_mutex_;

Review Comment:
   Do we need both `atomic` and `peer_mutex`?



##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE  // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+  return 
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const 
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity, 
const utils::Identifier &id) {
+  try {
+    rapidjson::Document root;
+    rapidjson::ParseResult ok = root.Parse(entity.c_str());
+    if (!ok) {
+      std::stringstream ss;
+      ss << "Failed to parse archive lens stack from JSON string with reason: "
+          << rapidjson::GetParseError_En(ok.Code())
+          << " at offset " << ok.Offset();
+
+      throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
+    }
+
+    std::vector<PeerStatus> peer_statuses;
+    if (!root.HasMember("peers") || !root["peers"].IsArray() || 
root["peers"].Size() <= 0) {
+      logger->log_debug("Peers is either not a member or is empty. String to 
analyze: {}", entity);
+      return peer_statuses;
+    }
+
+    for (const auto &peer : root["peers"].GetArray()) {
+      std::string hostname;
+      int port = 0;
+      int flow_file_count = 0;
+
+      if (peer.HasMember("hostname") && peer["hostname"].IsString() &&
+          peer.HasMember("port") && peer["port"].IsNumber()) {
+        hostname = peer["hostname"].GetString();
+        port = peer["port"].GetInt();
+      }
+
+      if (peer.HasMember("flowFileCount")) {
+        if (peer["flowFileCount"].IsNumber()) {
+          flow_file_count = gsl::narrow<int>(peer["flowFileCount"].GetInt64());
+        } else {
+          logger->log_debug("Could not parse flowFileCount, so we're going to 
continue without it");
+        }
+      }
+
+      // host name and port are required.
+      if (!IsNullOrEmpty(hostname) && port > 0) {
+        PeerStatus status(id, hostname, port, flow_file_count, true);
+        peer_statuses.push_back(std::move(status));
+      } else {
+        logger->log_debug("hostname empty or port is zero. hostname: {}, port: 
{}", hostname, port);
+      }
+    }
+    return peer_statuses;
+  } catch (const Exception &exception) {
+    logger->log_debug("Caught Exception {}", exception.what());
+    return std::nullopt;
+  }
+}
+}  // namespace
+
+std::shared_ptr<Transaction> 
HttpSiteToSiteClient::createTransaction(TransferDirection direction) {
+  std::string dir_str = direction == TransferDirection::SEND ? "input-ports" : 
"output-ports";
+  std::stringstream uri;
+  uri << getBaseURI() << "data-transfer/" << dir_str << "/" << 
getPortId().to_string() << "/transactions";
+  auto client = createHttpClient(uri.str(), http::HttpRequestMethod::POST);
+  setSiteToSiteHeaders(*client);
+  client->setConnectionTimeout(std::chrono::milliseconds(5000));
+  client->setContentType("application/json");
+  client->setRequestHeader("Accept", "application/json");
+  client->setRequestHeader("Transfer-Encoding", "chunked");
+  client->setPostFields("");
+  client->submit();
+
+  if (auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream())) {
+    logger_->log_debug("Closing {}", http_stream->getClientRef()->getURL());
+  }
+
+  if (client->getResponseCode() != 201) {
+    peer_->setStream(nullptr);
+    logger_->log_debug("Could not create transaction, received {}", 
client->getResponseCode());
+    return nullptr;
+  }
+  // parse the headers
+  auto intent_name = client->getHeaderValue("x-location-uri-intent");
+  if (!utils::string::equalsIgnoreCase(intent_name, "transaction-url")) {
+    logger_->log_debug("Could not create transaction, intent is {}", 
intent_name);
+    return nullptr;
+  }
+
+  auto url = client->getHeaderValue("Location");
+  if (IsNullOrEmpty(url)) {
+    logger_->log_debug("Location is empty");
+    return nullptr;
+  }
+
+  org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> 
crcstream(gsl::make_not_null(peer_.get()));
+  auto transaction = std::make_shared<HttpTransaction>(direction, 
std::move(crcstream));
+  transaction->initialize(this, url);
+  auto transaction_id = parseTransactionId(url);
+  if (!transaction_id) {
+    logger_->log_debug("Transaction ID is empty");
+    return nullptr;
+  }
+  transaction->setTransactionId(transaction_id.value());
+  std::shared_ptr<minifi::http::HTTPClient> transaction_client;
+  if (transaction->getDirection() == TransferDirection::SEND) {
+    transaction_client = openConnectionForSending(transaction);
+  } else {
+    transaction_client = openConnectionForReceive(transaction);
+    transaction->setDataAvailable(true);
+    // 201 tells us that data is available. 200 would mean that nothing is 
available.
+  }
+  gsl_Assert(transaction_client);
+
+  setSiteToSiteHeaders(*transaction_client);
+  peer_->setStream(std::make_unique<http::HttpStream>(transaction_client));
+  logger_->log_debug("Created transaction id -{}-", 
transaction->getUUID().to_string());
+  known_transactions_[transaction->getUUID()] = transaction;
+  return transaction;
+}
+
+std::optional<SiteToSiteResponse> 
HttpSiteToSiteClient::readResponseForReceiveTransfer(const 
std::shared_ptr<Transaction>& transaction) {
+  SiteToSiteResponse response;
+  if (current_code_ == ResponseCode::FINISH_TRANSACTION) {
+    return response;
+  }
+
+  if (transaction->getState() == TransactionState::TRANSACTION_STARTED || 
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+    if (current_code_ == ResponseCode::CONFIRM_TRANSACTION && 
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+      auto stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+      if (!stream->isFinished()) {
+        logger_->log_debug("confirm read for {}, but not finished ", 
transaction->getUUIDStr());
+        if (stream->waitForDataAvailable()) {
+          response.code = ResponseCode::CONTINUE_TRANSACTION;
+          return response;

Review Comment:
   This style:
   ```suggestion
             return SiteToSiteResponse{.code = 
ResponseCode::CONTINUE_TRANSACTION);
   ```
   would be more readable for me. As it is, with a `response` object created at 
the start and returned at various points, it's not clear whether the object has 
been modified earlier in the function.



##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE  // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+  return 
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const 
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity, 
const utils::Identifier &id) {
+  try {
+    rapidjson::Document root;
+    rapidjson::ParseResult ok = root.Parse(entity.c_str());
+    if (!ok) {
+      std::stringstream ss;
+      ss << "Failed to parse archive lens stack from JSON string with reason: "

Review Comment:
   "archive lens stack"? -- seems to be a (very old) copy-paste error



##########
libminifi/test/unit/SiteToSiteTests.cpp:
##########
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "io/BaseStream.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SiteToSiteHelper.h"
+#include "unit/DummyProcessor.h"
+#include "unit/ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SiteToSiteClientTestAccessor {
+ public:
+  static bool bootstrap(sitetosite::RawSiteToSiteClient& client) {
+    return client.bootstrap();
+  }
+
+  static std::shared_ptr<sitetosite::Transaction> 
createTransaction(sitetosite::RawSiteToSiteClient& client, 
sitetosite::TransferDirection direction) {
+    return client.createTransaction(direction);
+  }
+
+  static bool sendFlowFile(sitetosite::RawSiteToSiteClient& client, const 
std::shared_ptr<sitetosite::Transaction>& transaction, core::FlowFile& 
flow_file, core::ProcessSession& session) {
+    return client.sendFlowFile(transaction, flow_file, session);
+  }
+
+  static bool sendPacket(sitetosite::RawSiteToSiteClient& client, const 
sitetosite::DataPacket& packet) {
+    return client.sendPacket(packet);
+  }
+
+  static std::pair<uint64_t, uint64_t> 
readFlowFiles(sitetosite::RawSiteToSiteClient& client, const 
std::shared_ptr<sitetosite::Transaction>& transaction, core::ProcessSession& 
session) {
+    return client.readFlowFiles(transaction, session);
+  }
+};
+
+void initializeMockBootstrapResponses(const 
std::unique_ptr<SiteToSiteResponder>& collector) {
+  const char resource_ok_code = 
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::RESOURCE_OK);
+  std::string resp_code;
+  resp_code.insert(resp_code.begin(), resource_ok_code);
+  collector->push_response(resp_code);
+
+  // Handshake response code
+  collector->push_response("R");
+  collector->push_response("C");
+  const char resource_code_properties_ok = 
magic_enum::enum_underlying(sitetosite::ResponseCode::PROPERTIES_OK);
+  resp_code = resource_code_properties_ok;
+  collector->push_response(resp_code);
+
+  // Codec Negotiation
+  resp_code = resource_ok_code;
+  collector->push_response(resp_code);
+}
+
+void verifyBootstrapMessages(sitetosite::RawSiteToSiteClient& protocol, 
SiteToSiteResponder& collector) {
+  protocol.setUseCompression(false);
+  protocol.setBatchDuration(std::chrono::milliseconds(100));
+  protocol.setBatchCount(5);
+  protocol.setTimeout(std::chrono::milliseconds(20000));
+
+  minifi::utils::Identifier fake_uuid = 
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+  protocol.setPortId(fake_uuid);
+
+  REQUIRE(true == SiteToSiteClientTestAccessor::bootstrap(protocol));
+
+  REQUIRE(collector.get_next_client_response() == "NiFi");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "SocketFlowFileProtocol");
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "nifi://fake_host:65433");
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "BATCH_COUNT");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "5");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "BATCH_DURATION");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "100");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "GZIP");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "false");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "PORT_IDENTIFIER");
+  collector.get_next_client_response();
+  
REQUIRE(minifi::utils::string::equalsIgnoreCase(collector.get_next_client_response(),
 "c56a4180-65aa-42ec-a945-5fd21dec0538"));
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "REQUEST_EXPIRATION_MILLIS");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "20000");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "StandardFlowFileCodec");
+  collector.get_next_client_response();  // codec version
+}
+
+void verifySendResponses(SiteToSiteResponder& collector, const 
std::vector<std::string>& expected_responses) {
+  for (const auto& expected_response : expected_responses) {
+    if (expected_response.empty()) {
+      collector.get_next_client_response();
+      continue;
+    }
+    CHECK(expected_response == collector.get_next_client_response());
+  }
+}
+
+TEST_CASE("TestSetPortId", "[S2S]") {
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::make_unique<org::apache::nifi::minifi::io::BufferStream>(),
 "fake_host", 65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+  auto fake_uuid = 
minifi::utils::Identifier::parse("c56a4180-65aa-42ec-a945-5fd21dec0538").value();
+  protocol.setPortId(fake_uuid);
+  REQUIRE(fake_uuid == protocol.getPortId());
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using data packet", "[S2S]") {
+  auto collector = std::make_unique<SiteToSiteResponder>();
+  auto collector_ptr = collector.get();
+
+  initializeMockBootstrapResponses(collector);
+
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host", 
65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+  std::vector<std::string> expected_responses;
+  std::string payload = "Test MiNiFi payload";
+
+  expected_responses.push_back("");  // attribute count 0
+  expected_responses.push_back("");  // payload length
+  expected_responses.push_back(payload);
+
+  verifyBootstrapMessages(protocol, *collector_ptr);
+
+  // start to send the stuff
+  auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol, 
sitetosite::TransferDirection::SEND);
+  REQUIRE(transaction);
+  collector_ptr->get_next_client_response();
+  REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+  std::map<std::string, std::string> attributes;
+  sitetosite::DataPacket packet(transaction, attributes, payload);
+  REQUIRE(SiteToSiteClientTestAccessor::sendPacket(protocol, packet));
+  verifySendResponses(*collector_ptr, expected_responses);
+  REQUIRE(transaction->getCRC() == 4000670133);
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using flowfile data", "[S2S]") {
+  auto collector = std::make_unique<SiteToSiteResponder>();
+  auto collector_ptr = collector.get();
+
+  initializeMockBootstrapResponses(collector);
+
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host", 
65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+  std::vector<std::string> expected_responses;
+  std::string payload = "Test MiNiFi payload";
+
+  expected_responses.push_back("");  // attribute count
+  expected_responses.push_back("");  // attribute key length
+  expected_responses.push_back("filename");
+  expected_responses.push_back("");  // attribute value length
+  expected_responses.push_back("myfile");
+  expected_responses.push_back("");  // attribute key length
+  expected_responses.push_back("flow.id");
+  expected_responses.push_back("");  // attribute value length
+  expected_responses.push_back("test");
+  expected_responses.push_back("");  // payload length
+  expected_responses.push_back(payload);
+
+  protocol.setBatchDuration(std::chrono::milliseconds(100));
+  protocol.setBatchCount(5);
+  protocol.setTimeout(std::chrono::milliseconds(20000));
+
+  auto fake_uuid = 
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+  protocol.setPortId(fake_uuid);
+
+  verifyBootstrapMessages(protocol, *collector_ptr);
+
+  // start to send the stuff
+  auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol, 
sitetosite::TransferDirection::SEND);
+  REQUIRE(transaction);
+  collector_ptr->get_next_client_response();
+  REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+
+  TestController test_controller_;
+  TestController::PlanConfig plan_config_;
+  std::shared_ptr<TestPlan> test_plan = 
test_controller_.createPlan(plan_config_);
+  test_plan->addProcessor("DummyProcessor", "dummyProcessor");
+  std::shared_ptr<minifi::core::ProcessContext> context = [&] { 
test_plan->runNextProcessor(); return test_plan->getCurrentContext(); }();
+  std::unique_ptr<minifi::core::ProcessSession> session = 
std::make_unique<core::ProcessSessionImpl>(context);
+
+  auto flow_file = session->create();
+  session->write(flow_file, [&](const std::shared_ptr<io::OutputStream>& 
output_stream) {
+    std::span<const std::byte> span{reinterpret_cast<const 
std::byte*>(payload.data()), payload.size()};
+    output_stream->write(span);
+    return payload.size();
+  });
+  flow_file->updateAttribute("filename", "myfile");
+  flow_file->updateAttribute("flow.id", "test");
+  session->transfer(flow_file, DummyProcessor::Success);
+  session->commit();
+
+  std::map<std::string, std::string> attributes;
+  REQUIRE(SiteToSiteClientTestAccessor::sendFlowFile(protocol, transaction, 
*flow_file, *session));
+  verifySendResponses(*collector_ptr, expected_responses);
+  REQUIRE(transaction->getCRC() == 2886786428);
+}
+
+TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S]") {
+  auto collector = std::make_unique<SiteToSiteResponder>();
+
+  const char negotiated_abort_code = 
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::NEGOTIATED_ABORT);
+  std::string resp_code;
+  resp_code.insert(resp_code.begin(), negotiated_abort_code);
+  collector->push_response(resp_code);
+  collector->push_response(resp_code);
+
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host", 
65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+  minifi::utils::Identifier fake_uuid;
+  fake_uuid = uuid_str;
+  protocol.setPortId(fake_uuid);
+
+  REQUIRE_FALSE(SiteToSiteClientTestAccessor::bootstrap(protocol));
+}
+
+void iniitalizeMockRemoteClientReceiveDataResponses(SiteToSiteResponder& 
collector) {
+  collector.push_response("R");
+  collector.push_response("C");
+  auto addResponseCode = [&collector](sitetosite::ResponseCode code) {
+    std::string resp_code;
+    resp_code.insert(resp_code.begin(), magic_enum::enum_underlying(code));
+    collector.push_response(resp_code);
+  };
+  addResponseCode(sitetosite::ResponseCode::MORE_DATA);

Review Comment:
   I would rewrite this to
   ```suggestion
       
collector.push_response(std::string{static_cast<char>(magic_enum::enum_underlying(sitetosite::ResponseCode::MORE_DATA))});
   ```
   because why use 6 lines of code where 1 will do



##########
libminifi/test/unit/SiteToSiteTests.cpp:
##########
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "io/BaseStream.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/RawSiteToSiteClient.h"
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SiteToSiteHelper.h"
+#include "unit/DummyProcessor.h"
+#include "unit/ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SiteToSiteClientTestAccessor {
+ public:
+  static bool bootstrap(sitetosite::RawSiteToSiteClient& client) {
+    return client.bootstrap();
+  }
+
+  static std::shared_ptr<sitetosite::Transaction> 
createTransaction(sitetosite::RawSiteToSiteClient& client, 
sitetosite::TransferDirection direction) {
+    return client.createTransaction(direction);
+  }
+
+  static bool sendFlowFile(sitetosite::RawSiteToSiteClient& client, const 
std::shared_ptr<sitetosite::Transaction>& transaction, core::FlowFile& 
flow_file, core::ProcessSession& session) {
+    return client.sendFlowFile(transaction, flow_file, session);
+  }
+
+  static bool sendPacket(sitetosite::RawSiteToSiteClient& client, const 
sitetosite::DataPacket& packet) {
+    return client.sendPacket(packet);
+  }
+
+  static std::pair<uint64_t, uint64_t> 
readFlowFiles(sitetosite::RawSiteToSiteClient& client, const 
std::shared_ptr<sitetosite::Transaction>& transaction, core::ProcessSession& 
session) {
+    return client.readFlowFiles(transaction, session);
+  }
+};
+
+void initializeMockBootstrapResponses(const 
std::unique_ptr<SiteToSiteResponder>& collector) {
+  const char resource_ok_code = 
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::RESOURCE_OK);
+  std::string resp_code;
+  resp_code.insert(resp_code.begin(), resource_ok_code);
+  collector->push_response(resp_code);
+
+  // Handshake response code
+  collector->push_response("R");
+  collector->push_response("C");
+  const char resource_code_properties_ok = 
magic_enum::enum_underlying(sitetosite::ResponseCode::PROPERTIES_OK);
+  resp_code = resource_code_properties_ok;
+  collector->push_response(resp_code);
+
+  // Codec Negotiation
+  resp_code = resource_ok_code;
+  collector->push_response(resp_code);
+}
+
+void verifyBootstrapMessages(sitetosite::RawSiteToSiteClient& protocol, 
SiteToSiteResponder& collector) {
+  protocol.setUseCompression(false);
+  protocol.setBatchDuration(std::chrono::milliseconds(100));
+  protocol.setBatchCount(5);
+  protocol.setTimeout(std::chrono::milliseconds(20000));
+
+  minifi::utils::Identifier fake_uuid = 
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+  protocol.setPortId(fake_uuid);
+
+  REQUIRE(true == SiteToSiteClientTestAccessor::bootstrap(protocol));
+
+  REQUIRE(collector.get_next_client_response() == "NiFi");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "SocketFlowFileProtocol");
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "nifi://fake_host:65433");
+  collector.get_next_client_response();
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "BATCH_COUNT");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "5");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "BATCH_DURATION");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "100");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "GZIP");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "false");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "PORT_IDENTIFIER");
+  collector.get_next_client_response();
+  
REQUIRE(minifi::utils::string::equalsIgnoreCase(collector.get_next_client_response(),
 "c56a4180-65aa-42ec-a945-5fd21dec0538"));
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "REQUEST_EXPIRATION_MILLIS");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "20000");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
+  collector.get_next_client_response();
+  REQUIRE(collector.get_next_client_response() == "StandardFlowFileCodec");
+  collector.get_next_client_response();  // codec version
+}
+
+void verifySendResponses(SiteToSiteResponder& collector, const 
std::vector<std::string>& expected_responses) {
+  for (const auto& expected_response : expected_responses) {
+    if (expected_response.empty()) {
+      collector.get_next_client_response();
+      continue;
+    }
+    CHECK(expected_response == collector.get_next_client_response());
+  }
+}
+
+TEST_CASE("TestSetPortId", "[S2S]") {
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::make_unique<org::apache::nifi::minifi::io::BufferStream>(),
 "fake_host", 65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+  auto fake_uuid = 
minifi::utils::Identifier::parse("c56a4180-65aa-42ec-a945-5fd21dec0538").value();
+  protocol.setPortId(fake_uuid);
+  REQUIRE(fake_uuid == protocol.getPortId());
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using data packet", "[S2S]") {
+  auto collector = std::make_unique<SiteToSiteResponder>();
+  auto collector_ptr = collector.get();
+
+  initializeMockBootstrapResponses(collector);
+
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host", 
65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+  std::vector<std::string> expected_responses;
+  std::string payload = "Test MiNiFi payload";
+
+  expected_responses.push_back("");  // attribute count 0
+  expected_responses.push_back("");  // payload length
+  expected_responses.push_back(payload);
+
+  verifyBootstrapMessages(protocol, *collector_ptr);
+
+  // start to send the stuff
+  auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol, 
sitetosite::TransferDirection::SEND);
+  REQUIRE(transaction);
+  collector_ptr->get_next_client_response();
+  REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+  std::map<std::string, std::string> attributes;
+  sitetosite::DataPacket packet(transaction, attributes, payload);
+  REQUIRE(SiteToSiteClientTestAccessor::sendPacket(protocol, packet));
+  verifySendResponses(*collector_ptr, expected_responses);
+  REQUIRE(transaction->getCRC() == 4000670133);
+}
+
+TEST_CASE("TestSiteToSiteVerifySend using flowfile data", "[S2S]") {
+  auto collector = std::make_unique<SiteToSiteResponder>();
+  auto collector_ptr = collector.get();
+
+  initializeMockBootstrapResponses(collector);
+
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host", 
65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+  std::vector<std::string> expected_responses;
+  std::string payload = "Test MiNiFi payload";
+
+  expected_responses.push_back("");  // attribute count
+  expected_responses.push_back("");  // attribute key length
+  expected_responses.push_back("filename");
+  expected_responses.push_back("");  // attribute value length
+  expected_responses.push_back("myfile");
+  expected_responses.push_back("");  // attribute key length
+  expected_responses.push_back("flow.id");
+  expected_responses.push_back("");  // attribute value length
+  expected_responses.push_back("test");
+  expected_responses.push_back("");  // payload length
+  expected_responses.push_back(payload);
+
+  protocol.setBatchDuration(std::chrono::milliseconds(100));
+  protocol.setBatchCount(5);
+  protocol.setTimeout(std::chrono::milliseconds(20000));
+
+  auto fake_uuid = 
minifi::utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
+  protocol.setPortId(fake_uuid);
+
+  verifyBootstrapMessages(protocol, *collector_ptr);
+
+  // start to send the stuff
+  auto transaction = SiteToSiteClientTestAccessor::createTransaction(protocol, 
sitetosite::TransferDirection::SEND);
+  REQUIRE(transaction);
+  collector_ptr->get_next_client_response();
+  REQUIRE(collector_ptr->get_next_client_response() == "SEND_FLOWFILES");
+
+  TestController test_controller_;
+  TestController::PlanConfig plan_config_;
+  std::shared_ptr<TestPlan> test_plan = 
test_controller_.createPlan(plan_config_);
+  test_plan->addProcessor("DummyProcessor", "dummyProcessor");
+  std::shared_ptr<minifi::core::ProcessContext> context = [&] { 
test_plan->runNextProcessor(); return test_plan->getCurrentContext(); }();
+  std::unique_ptr<minifi::core::ProcessSession> session = 
std::make_unique<core::ProcessSessionImpl>(context);
+
+  auto flow_file = session->create();
+  session->write(flow_file, [&](const std::shared_ptr<io::OutputStream>& 
output_stream) {
+    std::span<const std::byte> span{reinterpret_cast<const 
std::byte*>(payload.data()), payload.size()};
+    output_stream->write(span);
+    return payload.size();
+  });
+  flow_file->updateAttribute("filename", "myfile");
+  flow_file->updateAttribute("flow.id", "test");
+  session->transfer(flow_file, DummyProcessor::Success);
+  session->commit();
+
+  std::map<std::string, std::string> attributes;
+  REQUIRE(SiteToSiteClientTestAccessor::sendFlowFile(protocol, transaction, 
*flow_file, *session));
+  verifySendResponses(*collector_ptr, expected_responses);
+  REQUIRE(transaction->getCRC() == 2886786428);
+}
+
+TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S]") {
+  auto collector = std::make_unique<SiteToSiteResponder>();
+
+  const char negotiated_abort_code = 
magic_enum::enum_underlying(sitetosite::ResourceNegotiationStatusCode::NEGOTIATED_ABORT);
+  std::string resp_code;
+  resp_code.insert(resp_code.begin(), negotiated_abort_code);
+  collector->push_response(resp_code);
+  collector->push_response(resp_code);
+
+  auto peer = 
std::make_unique<sitetosite::SiteToSitePeer>(std::move(collector), "fake_host", 
65433, "");
+  sitetosite::RawSiteToSiteClient protocol(std::move(peer));
+
+  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+  minifi::utils::Identifier fake_uuid;
+  fake_uuid = uuid_str;
+  protocol.setPortId(fake_uuid);
+
+  REQUIRE_FALSE(SiteToSiteClientTestAccessor::bootstrap(protocol));
+}
+
+void iniitalizeMockRemoteClientReceiveDataResponses(SiteToSiteResponder& 
collector) {

Review Comment:
   typo:
   ```suggestion
   void initializeMockRemoteClientReceiveDataResponses(SiteToSiteResponder& 
collector) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to