lordgamez commented on code in PR #1966: URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2253638739
########## SITE_TO_SITE.md: ########## @@ -0,0 +1,262 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +## Table of Contents +- [Site-to-Site Overview](#site-to-site-overview) +- [Site-to-Site Configuration](#site-to-site-configuration) + - [Site-to-Site Configuration on NiFi side](#site-to-site-configuration-on-nifi-side) + - [Site-to-Site Configuration on MiNiFi C++ side](#site-to-site-configuration-on-minifi-c-side) +- [Additional examples](#additional-examples) + +## Site-to-Site Overview + +Site-to-Site protocol allows data to be transferred between MiNiFi C++ and NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote process groups. This is useful for scenarios where you want to send data from MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and HTTP protocols. + +At the moment site-to-site protocol is only supported between MiNiFi C++ and NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++ instances. It is recommended to use processors like InvokeHTTP and ListenHTTP to transfer data between MiNiFi C++ instances. + +## Site-to-Site Configuration + +### Site-to-Site Configuration on NiFi side + +On NiFi side, site-to-site protocol is configured by creating input and output ports. The input port is used to receive data from MiNiFi C++ and the output port is used to send data to MiNiFi C++. The input and output ports can be created in the NiFi UI by dragging and dropping the input and output port icons onto the canvas. + +To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the instance id of the port should be used. The instance id can be found in the NiFi UI by clicking on the input or output port and looking at the operation panel. It can be copied from that panel, or from the port "instanceIdentifier" field from configuration json file in the NiFi conf directory. + +### Site-to-Site Configuration on MiNiFi C++ side + +Site-to-Site protocol is configured on the MiNiFi C++ side by using remote process groups in the configuration. The remote process group represents the NiFi endpoint and uses the instance ids of the ports created on the NiFi side. The remote process group can be configured to use either raw TCP or HTTP protocol. + +Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol: + +```yaml +MiNiFi Config Version: 3 +Flow Controller: + name: Simple GenerateFlowFile to RPG +Processors: + - id: b0c04f28-0158-1000-0000-000000000000 + name: GenerateFlowFile + class: org.apache.nifi.processors.standard.GenerateFlowFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 5 sec + auto-terminated relationships list: [] + Properties: + Data Format: Text + Unique FlowFiles: false + Custom Text: Custom text +Connections: + - id: b0c0c3cc-0158-1000-0000-000000000000 + name: GenerateFlowFile/succes/nifi-inputport + source id: b0c04f28-0158-1000-0000-000000000000 + destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6 + source relationship name: success +Remote Process Groups: + - id: b0c09ff0-0158-1000-0000-000000000000 + name: "RPG" + url: http://localhost:8080/nifi + timeout: 20 sec + yield period: 10 sec + transport protocol: RAW + Input Ports: + - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6 # this is the instance id of the input port created in NiFi + name: nifi-inputport + max concurrent tasks: 1 + use compression: false # currently not supported and ignored in MiNiFi C++ + batch size: + size: 10 MB + count: 10 + duration: 30 sec + Output Ports: [] +``` + +Here is another example in NiFi style json format how to configure site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving data from NiFi using the HTTP protocol: + +```json Review Comment: Replaced with yaml in https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb ########## SITE_TO_SITE.md: ########## @@ -0,0 +1,262 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +## Table of Contents +- [Site-to-Site Overview](#site-to-site-overview) +- [Site-to-Site Configuration](#site-to-site-configuration) + - [Site-to-Site Configuration on NiFi side](#site-to-site-configuration-on-nifi-side) + - [Site-to-Site Configuration on MiNiFi C++ side](#site-to-site-configuration-on-minifi-c-side) +- [Additional examples](#additional-examples) + +## Site-to-Site Overview + +Site-to-Site protocol allows data to be transferred between MiNiFi C++ and NiFi instances. MiNiFi C++ can send or receive data from NiFi using remote process groups. This is useful for scenarios where you want to send data from MiNiFi C++ to NiFi or vice versa. Site-to-Site protocol support raw TCP and HTTP protocols. + +At the moment site-to-site protocol is only supported between MiNiFi C++ and NiFi instances, it cannot be used to transfer data between multiple MiNiFi C++ instances. It is recommended to use processors like InvokeHTTP and ListenHTTP to transfer data between MiNiFi C++ instances. + +## Site-to-Site Configuration + +### Site-to-Site Configuration on NiFi side + +On NiFi side, site-to-site protocol is configured by creating input and output ports. The input port is used to receive data from MiNiFi C++ and the output port is used to send data to MiNiFi C++. The input and output ports can be created in the NiFi UI by dragging and dropping the input and output port icons onto the canvas. + +To use the input or output port of the NiFi flow in the MiNiFi C++ flow, the instance id of the port should be used. The instance id can be found in the NiFi UI by clicking on the input or output port and looking at the operation panel. It can be copied from that panel, or from the port "instanceIdentifier" field from configuration json file in the NiFi conf directory. + +### Site-to-Site Configuration on MiNiFi C++ side + +Site-to-Site protocol is configured on the MiNiFi C++ side by using remote process groups in the configuration. The remote process group represents the NiFi endpoint and uses the instance ids of the ports created on the NiFi side. The remote process group can be configured to use either raw TCP or HTTP protocol. + +Here is a yaml example of how to configure site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is sending data to NiFi using raw socket protocol: + +```yaml +MiNiFi Config Version: 3 +Flow Controller: + name: Simple GenerateFlowFile to RPG +Processors: + - id: b0c04f28-0158-1000-0000-000000000000 + name: GenerateFlowFile + class: org.apache.nifi.processors.standard.GenerateFlowFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 5 sec + auto-terminated relationships list: [] + Properties: + Data Format: Text + Unique FlowFiles: false + Custom Text: Custom text +Connections: + - id: b0c0c3cc-0158-1000-0000-000000000000 + name: GenerateFlowFile/succes/nifi-inputport + source id: b0c04f28-0158-1000-0000-000000000000 + destination id: de7cc09a-0196-1000-2c63-ee6b4319ffb6 + source relationship name: success +Remote Process Groups: + - id: b0c09ff0-0158-1000-0000-000000000000 + name: "RPG" + url: http://localhost:8080/nifi + timeout: 20 sec + yield period: 10 sec + transport protocol: RAW + Input Ports: + - id: de7cc09a-0196-1000-2c63-ee6b4319ffb6 # this is the instance id of the input port created in NiFi + name: nifi-inputport + max concurrent tasks: 1 + use compression: false # currently not supported and ignored in MiNiFi C++ + batch size: + size: 10 MB + count: 10 + duration: 30 sec + Output Ports: [] +``` + +Here is another example in NiFi style json format how to configure site-to-site protocol in MiNiFi C++ where the MiNiFi C++ instance is receiving data from NiFi using the HTTP protocol: + +```json +{ + "encodingVersion": { + "majorVersion": 2, + "minorVersion": 0 + }, + "maxTimerDrivenThreadCount": 1, + "maxEventDrivenThreadCount": 1, + "parameterContexts": [], + "rootGroup": { + "identifier": "c5bceca3-9c20-4068-bf2d-425e14026471", + "instanceIdentifier": "3cb4b3ce-7cd8-4ab7-a6bf-d4640ac5db43", + "name": "root", + "position": { + "x": 0.0, + "y": 0.0 + }, + "processGroups": [], + "remoteProcessGroups": [ + { + "identifier": "327b446a-0043-48d1-8bb4-df65ba1afa60", + "instanceIdentifier": "2ed47dca-38f5-476d-9c37-5ea0a5072f1e", + "name": "https://localhost:8443/nifi", + "position": { + "x": 235.0, + "y": 71.00000762939453 + }, + "targetUri": "https://localhost:8443/nifi", + "targetUris": "https://localhost:8443/nifi", Review Comment: Updated in https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb ########## docker/test/integration/features/s2s.feature: ########## @@ -25,73 +25,88 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node opened on "http://nifi-${feature_id}:8080/nifi" - And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" + And the "success" relationship of the GetFile processor is connected to the to_nifi - And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" + And a NiFi flow is receiving data in an input port named "from-minifi" with the id of the port named "to_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" Review Comment: Updated in https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb ########## docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py: ########## @@ -134,8 +179,8 @@ def serialize_node(self, connectable, root, visited): for proc in conn_procs: root['connections'].append({ 'name': str(uuid.uuid4()), - 'source': {'id': str(connectable.uuid)}, - 'destination': {'id': str(proc.uuid) if not isinstance(proc, InputPort) else str(proc.instance_id)} + 'source': {'id': str(connectable.uuid) if not isinstance(connectable, InputPort) and not isinstance(connectable, OutputPort) else str(connectable.instance_id)}, + 'destination': {'id': str(proc.uuid) if not isinstance(proc, InputPort) and not isinstance(proc, OutputPort) else str(proc.instance_id)} Review Comment: Good idea, updated in https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb ########## extensions/standard-processors/tests/unit/FlowJsonTests.cpp: ########## @@ -1786,4 +1808,69 @@ TEST_CASE("Parameter context names cannot conflict with parameter provider gener "with no parameter provider or generated by other parameter provider"); } +TEST_CASE("Output port can also be used in RPG") { + ConfigurationTestController test_controller; + + core::flow::AdaptiveConfiguration config(test_controller.getContext()); + + static const std::string CONFIG_JSON = + R"( +{ + "rootGroup": { + "name": "MiNiFi Flow", + "processors": [{ + "identifier": "00000000-0000-0000-0000-000000000001", + "name": "PutFile", + "type": "org.apache.nifi.processors.standard.PutFile", + "schedulingStrategy": "Event_DRIVEN", + "autoTerminatedRelationships": ["success"], + "properties": {} + }], + "connections": [{ + "identifier": "00000000-0000-0000-0000-000000000008", + "name": "S2SToRPG", + "source": { + "id": "00000000-0000-0000-0000-000000000003", + "name": "AmazingOutputPort" + }, + "destination": { + "id": "00000000-0000-0000-0000-000000000001", + "name": "PutFile" + }, + "selectedRelationships": [""] + }], + "remoteProcessGroups": [{ + "name": "NiFi Flow", + "targetUri": "https://localhost:8090/nifi", + "communicationsTimeout": "19 sec", + "inputPorts": [{ Review Comment: Updated in https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb ########## libminifi/include/sitetosite/Peer.h: ########## @@ -109,175 +50,109 @@ class PeerStatus { PeerStatus& operator=(const PeerStatus &other) = default; PeerStatus& operator=(PeerStatus &&other) = default; - const std::shared_ptr<Peer> &getPeer() const { - return peer_; + const utils::Identifier &getPortId() const { + return port_id_; + } + + const std::string &getHost() const { + return host_; + } + + [[nodiscard]] uint16_t getPort() const { + return port_; } - uint32_t getFlowFileCount() { + [[nodiscard]] uint32_t getFlowFileCount() const { return flow_file_count_; } - bool getQueryForPeers() { + [[nodiscard]] bool getQueryForPeers() const { return query_for_peers_; } protected: - std::shared_ptr<Peer> peer_; + utils::Identifier port_id_; + std::string host_; + uint16_t port_; uint32_t flow_file_count_; bool query_for_peers_; }; -static const char MAGIC_BYTES[] = { 'N', 'i', 'F', 'i' }; - -// Site2SitePeer Class -class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStreamImpl { +class SiteToSitePeer : public io::BaseStreamImpl { public: - SiteToSitePeer() - : stream_(nullptr), - host_(""), - port_(-1) { - } - /* - * Create a new site2site peer - */ - explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BaseStream> injected_socket, - const std::string host, - uint16_t port, - const std::string &ifc) + SiteToSitePeer(std::unique_ptr<io::BaseStream> injected_socket, const std::string& host, uint16_t port, const std::string& ifc) : SiteToSitePeer(host, port, ifc) { stream_ = std::move(injected_socket); } - explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &ifc) - : stream_(nullptr), - host_(host), + SiteToSitePeer(const std::string &host, uint16_t port, const std::string &ifc) + : host_(host), port_(port), - timeout_(std::chrono::seconds(30)), - logger_(core::logging::LoggerFactory<SiteToSitePeer>::getLogger()) { - url_ = "nifi://" + host_ + ":" + std::to_string(port_); - yield_expiration_ = std::chrono::system_clock::time_point(); - timeout_ = std::chrono::seconds(30); - local_network_interface_ = io::NetworkInterface(ifc, nullptr); + url_("nifi://" + host_ + ":" + std::to_string(port_)), + local_network_interface_(io::NetworkInterface(ifc, nullptr)) { + timeout_.store(30s); Review Comment: Removed in https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb ########## libminifi/src/sitetosite/RawSiteToSiteClient.cpp: ########## @@ -0,0 +1,431 @@ +/** + * Site2SiteProtocol class implementation + * Review Comment: Deleted in https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/76e948709325fdbef2805bd8d563094696d394fb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
