http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/C2UpdateTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/C2UpdateTest.cpp b/libminifi/test/integration/C2UpdateTest.cpp new file mode 100644 index 0000000..689c383 --- /dev/null +++ b/libminifi/test/integration/C2UpdateTest.cpp @@ -0,0 +1,185 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "../../include/c2/protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + bool handlePost(CivetServer *server, struct mg_connection *conn) { + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + + configuration->set("c2.rest.url", + "http://localhost:9090/update"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared + <minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr + <core::YamlConfiguration + >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + <TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = + std::make_shared<minifi::FlowController + >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup + >(ptr.get()); + ptr.release(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + + return 0; +}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/C2VerifyHeartbeatAndStop.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/C2VerifyHeartbeatAndStop.cpp b/libminifi/test/integration/C2VerifyHeartbeatAndStop.cpp new file mode 100644 index 0000000..705506f --- /dev/null +++ b/libminifi/test/integration/C2VerifyHeartbeatAndStop.cpp @@ -0,0 +1,153 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../TestServer.h" +#include "c2/C2Agent.h" +#include "c2/protocols/RESTReceiver.h" + +#include "IntegrationBase.h" + +class Responder : public CivetHandler { + public: + explicit Responder(bool isSecure) + : isSecure(isSecure) { + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + std::string resp = + "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, " + "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + resp.length()); + mg_printf(conn, "%s", resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class VerifyC2Heartbeat : public IntegrationBase { + public: + explicit VerifyC2Heartbeat(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<LogTestController>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + LogTestController::getInstance().reset(); + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("Received Ack from Server") == true); + + assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true); + + assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat"); + configuration->set("c2.agent.heartbeat.period", "1000"); + configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat"); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + url = "http://localhost:8888/api/heartbeat"; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Heartbeat harness(isSecure); + + harness.setKeyDir(key_dir); + + Responder responder(isSecure); + + harness.setUrl(url, &responder); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/C2VerifyServeResults.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/C2VerifyServeResults.cpp b/libminifi/test/integration/C2VerifyServeResults.cpp new file mode 100644 index 0000000..169a5e1 --- /dev/null +++ b/libminifi/test/integration/C2VerifyServeResults.cpp @@ -0,0 +1,131 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../TestServer.h" +#include "c2/C2Agent.h" +#include "c2/protocols/RESTReceiver.h" + +#include "IntegrationBase.h" + +class VerifyC2Server : public IntegrationBase { + public: + explicit VerifyC2Server(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("Import offset 0") == true); + + assert(LogTestController::getInstance().contains("Outputting success and response") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + std::cout << "url is " << url << std::endl; + + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + std::cout << "path is " << path << std::endl; + configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver"); + configuration->set("c2.rest.listener.port", port); + configuration->set("c2.agent.heartbeat.period", "10"); + configuration->set("c2.rest.listener.heartbeat.rooturi", path); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Server harness(isSecure); + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/ControllerServiceIntegrationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp index 15720eb..7a4ee35 100644 --- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp +++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ +#undef NDEBUG #include <cassert> #include <chrono> #include <fstream> @@ -47,7 +48,6 @@ std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(std:: std::shared_ptr<core::controller::ControllerService> service = std::make_shared<MockControllerService>(); std::shared_ptr<core::controller::StandardControllerServiceNode> testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, provider, id, std::make_shared<minifi::Configure>()); - return testNode; } @@ -109,16 +109,21 @@ int main(int argc, char **argv) { std::shared_ptr<core::controller::ControllerServiceNode> notexistNode = pg->findControllerService("MockItLikeItsWrong"); assert(notexistNode == nullptr); - controller->load(); - controller->start(); - std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest"); - ssl_client_cont->enable(); - assert(ssl_client_cont != nullptr); - assert(ssl_client_cont->getControllerServiceImplementation() != nullptr); - std::shared_ptr<minifi::controllers::SSLContextService> ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation()); + std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = nullptr; + std::shared_ptr<minifi::controllers::SSLContextService> ssl_client = nullptr; + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->load(); + controller->start(); + ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest"); + ssl_client_cont->enable(); + assert(ssl_client_cont != nullptr); + assert(ssl_client_cont->getControllerServiceImplementation() != nullptr); + ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation()); + } assert(ssl_client->getCACertificate().length() > 0); - + std::cout << "Disabling ID" << std::endl; // now let's disable one of the controller services. std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID"); assert(cs_id != nullptr); @@ -128,6 +133,7 @@ int main(int argc, char **argv) { disabled = true; waitToVerifyProcessor(); } + std::cout << "Disabled ID" << std::endl; { std::lock_guard<std::mutex> lock(control_mutex); controller->enableControllerService(cs_id); @@ -135,20 +141,23 @@ int main(int argc, char **argv) { waitToVerifyProcessor(); } std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); + std::cout << "Disabling MockItLikeIts1995" << std::endl; assert(cs_id->enabled()); - { +{ std::lock_guard<std::mutex> lock(control_mutex); controller->disableReferencingServices(mock_cont); disabled = true; waitToVerifyProcessor(); } - assert(cs_id->enabled() == false); - { +std::cout << "Disabled MockItLikeIts1995" << std::endl; + assert(cs_id->enabled() == false); +{ std::lock_guard<std::mutex> lock(control_mutex); controller->enableReferencingServices(mock_cont); disabled = false; waitToVerifyProcessor(); } +std::cout << "Enabled ref for MockItLikeIts1995" << std::endl; assert(cs_id->enabled() == true); controller->waitUnload(60000); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/GetFileNoData.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/GetFileNoData.cpp b/libminifi/test/integration/GetFileNoData.cpp new file mode 100644 index 0000000..500d90f --- /dev/null +++ b/libminifi/test/integration/GetFileNoData.cpp @@ -0,0 +1,184 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "../../include/c2/protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + bool handlePost(CivetServer *server, struct mg_connection *conn) { + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + + configuration->set("c2.rest.url", + "http://localhost:9090/update"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared + <minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr + <core::YamlConfiguration + >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + <TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = + std::make_shared<minifi::FlowController + >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup + >(ptr.get()); + ptr.release(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + LogTestController::getInstance().reset(); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/HttpConfigurationListenerTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/HttpConfigurationListenerTest.cpp b/libminifi/test/integration/HttpConfigurationListenerTest.cpp deleted file mode 100644 index b559f41..0000000 --- a/libminifi/test/integration/HttpConfigurationListenerTest.cpp +++ /dev/null @@ -1,131 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "HttpConfigurationListener.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include <cstring> - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(10)); -} - -class ConfigHandler : public CivetHandler { - public: - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::ifstream myfile(test_file_location_.c_str()); - - if (myfile.is_open()) { - std::stringstream buffer; - buffer << myfile.rdbuf(); - std::string str = buffer.str(); - myfile.close(); - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - str.length()); - mg_printf(conn, "%s", str.c_str()); - } else { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); - } - - return true; - } - std::string test_file_location_; -}; - -int main(int argc, char **argv) { - LogTestController::getInstance().setInfo<minifi::ConfigurationListener>(); - LogTestController::getInstance().setInfo<minifi::FlowController>(); - LogTestController::getInstance().setInfo<minifi::HttpConfigurationListener>(); - - const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; - std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - - CivetServer server(cpp_options); - ConfigHandler h_ex; - server.addHandler("/config", h_ex); - LogTestController::getInstance().setDebug<minifi::ConfigurationListener>(); - std::string key_dir, test_file_location; - if (argc > 1) { - h_ex.test_file_location_ = test_file_location = argv[1]; - key_dir = argv[2]; - } - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - configuration->set(minifi::Configure::nifi_default_directory, key_dir); - configuration->set(minifi::Configure::nifi_configuration_listener_type, "http"); - configuration->set(minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec"); - configuration->set(minifi::Configure::nifi_configuration_listener_http_url, "http://localhost:9090/config"); - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared < minifi::io::StreamFactory > (configuration); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr < core::YamlConfiguration - > (new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast < TestRepository > (test_repo); - - std::shared_ptr<minifi::FlowController> controller = std::make_shared < minifi::FlowController - > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup > (ptr.get()); - ptr.release(); - - controller->load(); - controller->start(); - waitToVerifyProcessor(); - - controller->waitUnload(60000); - std::string logs = LogTestController::getInstance().log_output.str(); - assert(logs.find("HttpConfigurationListener -- curl successful to http://localhost:9090/config") != std::string::npos); - assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); - LogTestController::getInstance().reset(); - rmdir("./content_repository"); - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/HttpGetIntegrationTest.cpp b/libminifi/test/integration/HttpGetIntegrationTest.cpp index a235759..f915812 100644 --- a/libminifi/test/integration/HttpGetIntegrationTest.cpp +++ b/libminifi/test/integration/HttpGetIntegrationTest.cpp @@ -16,7 +16,9 @@ * limitations under the License. */ +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 #include <sys/stat.h> +#undef NDEBUG #include <cassert> #include <utility> #include <chrono> @@ -26,6 +28,7 @@ #include <thread> #include <type_traits> #include <vector> +#include "utils/HTTPClient.h" #include "../TestServer.h" #include "../TestBase.h" #include "utils/StringUtils.h" @@ -52,11 +55,24 @@ int ssl_enable(void *ssl_context, void *user_data) { return 0; } +class HttpResponder : public CivetHandler { + public: + bool handleGet(CivetServer *server, struct mg_connection *conn) { + static const std::string site2site_rest_resp = "hi this is a get test"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } +}; + int main(int argc, char **argv) { init_webserver(); LogTestController::getInstance().setDebug<core::Processor>(); LogTestController::getInstance().setDebug<core::ProcessSession>(); - LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>(); LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>(); LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); std::string key_dir, test_file_location; @@ -101,7 +117,10 @@ int main(int argc, char **argv) { std::string url = ""; inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); ptr.release(); + HttpResponder h_ex; std::string port, scheme, path; + CivetServer *server = nullptr; + parse_http_components(url, port, scheme, path); struct mg_callbacks callback; if (url.find("localhost") != std::string::npos) { @@ -110,11 +129,11 @@ int main(int argc, char **argv) { cert = key_dir + "nifi-cert.pem"; memset(&callback, 0, sizeof(callback)); callback.init_ssl = ssl_enable; + port +="s"; callback.log_message = log_message; - std::cout << cert << std::endl; - start_webserver(port, path, "hi this is a get test", &callback, cert); + server = start_webserver(port, path, &h_ex, &callback, cert, cert); } else { - start_webserver(port, path, "hi this is a get test"); + server = start_webserver(port, path, &h_ex); } } controller->load(); @@ -122,28 +141,18 @@ int main(int argc, char **argv) { waitToVerifyProcessor(); controller->waitUnload(60000); - if (url.find("localhost") != std::string::npos) { - stop_webserver(); + if (url.find("localhost") == std::string::npos) { + stop_webserver(server); + exit(1); } std::string logs = LogTestController::getInstance().log_output.str(); assert(logs.find("key:filename value:") != std::string::npos); assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos); assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos); - std::string stringtofind = "Resource Claim created ./content_repository/"; - size_t loc = logs.find(stringtofind); - while (loc > 0) { - std::string id = logs.substr(loc + stringtofind.size(), 36); - - loc = logs.find(stringtofind, loc + 1); - std::string path = "content_repository/" + id; - unlink(path.c_str()); - - if (loc == std::string::npos) - break; - } LogTestController::getInstance().reset(); rmdir("./content_repository"); + stop_webserver(server); return 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/HttpPostIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/HttpPostIntegrationTest.cpp b/libminifi/test/integration/HttpPostIntegrationTest.cpp index 9a46574..2109b86 100644 --- a/libminifi/test/integration/HttpPostIntegrationTest.cpp +++ b/libminifi/test/integration/HttpPostIntegrationTest.cpp @@ -17,18 +17,21 @@ */ #include <sys/stat.h> +#undef NDEBUG #include <cassert> +#include <utility> #include <chrono> #include <fstream> -#include <utility> #include <memory> #include <string> #include <thread> #include <type_traits> #include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" #include "utils/StringUtils.h" #include "core/Core.h" -#include "../TestServer.h" #include "../include/core/logging/Logger.h" #include "core/ProcessGroup.h" #include "core/yaml/YamlConfiguration.h" @@ -36,82 +39,72 @@ #include "properties/Configure.h" #include "../unit/ProvenanceTestHelper.h" #include "io/StreamFactory.h" -#include "../TestBase.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../TestServer.h" -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(2)); -} +#include "IntegrationBase.h" -int main(int argc, char **argv) { - init_webserver(); - LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); - LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); - std::string test_file_location; - if (argc > 1) { - test_file_location = argv[1]; +class HttpTestHarness : public IntegrationBase { + public: + HttpTestHarness() { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); } - mkdir("/tmp/aljr39/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - std::ofstream myfile; - myfile.open("/tmp/aljr39/example.txt"); - myfile.close(); - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - content_repo->initialize(configuration); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, - true); + void testSetup() { + LogTestController::getInstance().setDebug<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); + LogTestController::getInstance().setDebug<core::ProcessContext>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + configuration->set("nifi.flow.engine.threads", "8"); + } - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + void cleanup() { + unlink(ss.str().c_str()); + } - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); - std::shared_ptr<core::Processor> proc = ptr->findProcessor("OhJeez"); - assert(proc != nullptr); + void runAssertions() { + assert(LogTestController::getInstance().contains("curl performed") == true); + assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); + assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); + } - std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + protected: + char *dir; + std::stringstream ss; + TestController testController; +}; - assert(inv != nullptr); - std::string url = ""; - inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); - ptr.release(); - std::string port, scheme, path; - parse_http_components(url, port, scheme, path); - start_webserver(port, path, "hi this is a post test"); - controller->load(); - controller->start(); - waitToVerifyProcessor(); +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } - controller->waitUnload(60000); - std::string logs = LogTestController::getInstance().log_output.str(); - // stop webserver - stop_webserver(); - assert(LogTestController::getInstance().contains("curl performed") == true); - assert(LogTestController::getInstance().contains("Import offset 0 length 22") == true); + HttpTestHarness harness; - std::string stringtofind = "Resource Claim created ./content_repository/"; + harness.setKeyDir(key_dir); - size_t loc = logs.find(stringtofind); - while (loc > 0 && loc != std::string::npos) { - std::string id = logs.substr(loc + stringtofind.size(), 36); - loc = logs.find(stringtofind, loc + 1); - std::string path = "content_repository/" + id; - unlink(path.c_str()); - if (loc == std::string::npos) - break; - } + harness.run(test_file_location); - rmdir("./content_repository"); - LogTestController::getInstance().reset(); return 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/IntegrationBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h new file mode 100644 index 0000000..a854508 --- /dev/null +++ b/libminifi/test/integration/IntegrationBase.h @@ -0,0 +1,177 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ +#define LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ + +#include "../TestServer.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_enable(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(3)); +} + +class IntegrationBase { + public: + IntegrationBase(); + + void setUrl(std::string url, CivetHandler *handler); + + virtual ~IntegrationBase(); + + void run(std::string test_file_location); + + void setKeyDir(const std::string key_dir) { + this->key_dir = key_dir; + configureSecurity(); + } + + virtual void testSetup() = 0; + + virtual void cleanup() = 0; + + virtual void runAssertions() = 0; + + protected: + + virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + + } + + void configureSecurity(); + CivetServer *server; + std::shared_ptr<minifi::Configure> configuration; + std::string port, scheme, path; + std::string key_dir; +}; + +IntegrationBase::IntegrationBase() + : server(nullptr), + configuration(std::make_shared< + minifi::Configure>()) +{ + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); +} + +IntegrationBase::~IntegrationBase() +{ + stop_webserver(server); + rmdir("./content_repository"); +} + +void IntegrationBase::configureSecurity() { + if (!key_dir.empty()) { + configuration->set(minifi::Configure::nifi_security_client_certificate, key_dir + "cn.crt.pem"); + configuration->set(minifi::Configure::nifi_security_client_private_key, key_dir + "cn.ckey.pem"); + configuration->set(minifi::Configure::nifi_security_client_pass_phrase, key_dir + "cn.pass"); + configuration->set(minifi::Configure::nifi_security_client_ca_certificate, key_dir + "nifi-cert.pem"); + configuration->set(minifi::Configure::nifi_default_directory, key_dir); + } +} + +void IntegrationBase::run(std::string test_file_location) { + testSetup(); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared + <minifi::io::StreamFactory>(configuration); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr + <core::YamlConfiguration + >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location)); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup + >(ptr.get()); + + queryRootProcessGroup(pg); + + ptr.release(); + + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + <TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = + std::make_shared<minifi::FlowController + >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + + runAssertions(); + + cleanup(); +} + +void IntegrationBase::setUrl(std::string url, CivetHandler *handler) { + + parse_http_components(url, port, scheme, path); + struct mg_callbacks callback; + if (url.find("localhost") != std::string::npos) { + if (scheme == "https" && !key_dir.empty()) { + std::string cert = ""; + cert = key_dir + "nifi-cert.pem"; + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_enable; + port += "s"; + callback.log_message = log_message; + server = start_webserver(port, path, handler, &callback, cert, cert); + } else { + server = start_webserver(port, path, handler); + } + } +} + +#endif /* LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/ProvenanceReportingTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp index a6dc377..5845767 100644 --- a/libminifi/test/integration/ProvenanceReportingTest.cpp +++ b/libminifi/test/integration/ProvenanceReportingTest.cpp @@ -17,6 +17,7 @@ */ #include <sys/stat.h> +#undef NDEBUG #include <cassert> #include <chrono> #include <fstream> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/Site2SiteRestTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/Site2SiteRestTest.cpp b/libminifi/test/integration/Site2SiteRestTest.cpp deleted file mode 100644 index 0e454af..0000000 --- a/libminifi/test/integration/Site2SiteRestTest.cpp +++ /dev/null @@ -1,148 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include "RemoteProcessorGroupPort.h" - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(10)); -} - -class ConfigHandler : public CivetHandler { - public: - bool handleGet(CivetServer *server, struct mg_connection *conn) { - static const std::string site2site_rest_resp = "{" - "\"revision\": {" - "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" - "}," - "\"controller\": {" - "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," - "\"name\": \"NiFi Flow\"," - "\"remoteSiteListeningPort\": 10001," - "\"siteToSiteSecure\": false" - "}}"; - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - site2site_rest_resp.length()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } -}; - -int main(int argc, char **argv) { - LogTestController::getInstance().setInfo<minifi::RemoteProcessorGroupPort>(); - LogTestController::getInstance().setInfo<minifi::FlowController>(); - - const char *options[] = { "document_root", ".", "listening_ports", "8082", 0 }; - std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - - mkdir("/tmp/site2siteGetFile/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - std::fstream file; - std::stringstream ss; - ss << "/tmp/site2siteGetFile/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - - CivetServer server(cpp_options); - ConfigHandler h_ex; - server.addHandler("/nifi-api/controller", h_ex); - LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); - - std::string key_dir, test_file_location; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - } - - std::shared_ptr<minifi::Configure> configuration = std::make_shared< - minifi::Configure>(); - configuration->set(minifi::Configure::nifi_default_directory, key_dir); - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - - std::shared_ptr<core::Repository> test_repo = - std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< - TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); - - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared - <minifi::io::StreamFactory>(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - - content_repo->initialize(configuration); - - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - content_repo, - DEFAULT_ROOT_GROUP_NAME, - true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, - configuration, - test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup - >(ptr.get()); - ptr.release(); - - controller->load(); - controller->start(); - waitToVerifyProcessor(); - - controller->waitUnload(60000); - std::string logs = LogTestController::getInstance().log_output.str(); - assert(logs.find("process group remote site2site port 10001, is secure 0") != std::string::npos); - LogTestController::getInstance().reset(); - unlink(ss.str().c_str()); - rmdir("/tmp/site2siteGetFile/"); - rmdir("./content_repository"); - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/SiteToSiteRestTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/SiteToSiteRestTest.cpp b/libminifi/test/integration/SiteToSiteRestTest.cpp new file mode 100644 index 0000000..7532d5c --- /dev/null +++ b/libminifi/test/integration/SiteToSiteRestTest.cpp @@ -0,0 +1,144 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../TestServer.h" + +#include "IntegrationBase.h" + +class Responder : public CivetHandler { + public: + explicit Responder(bool isSecure) + : isSecure(isSecure) { + } + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = "{" + "\"revision\": {" + "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" + "}," + "\"controller\": {" + "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," + "\"name\": \"NiFi Flow\"," + "\"remoteSiteListeningPort\": 10001," + "\"siteToSiteSecure\": "; + site2site_rest_resp += (isSecure ? "true" : "false"); + site2site_rest_resp += "}}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class SiteToSiteTestHarness : public IntegrationBase { + public: + explicit SiteToSiteTestHarness(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ConfigurableComponent>(); + + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + if (isSecure) { + assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1") == true); + } else { + assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0") == true); + } + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + url = argv[3]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + SiteToSiteTestHarness harness(isSecure); + + Responder responder(isSecure); + + harness.setKeyDir(key_dir); + + harness.setUrl(url, &responder); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/TestExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/TestExecuteProcess.cpp b/libminifi/test/integration/TestExecuteProcess.cpp index 5506c32..5f2c6e2 100644 --- a/libminifi/test/integration/TestExecuteProcess.cpp +++ b/libminifi/test/integration/TestExecuteProcess.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ +#undef NDEBUG #include <cassert> #include <chrono> #include <string> @@ -79,19 +80,19 @@ int main(int argc, char **argv) { std::vector<std::thread> processor_workers; - core::ProcessorNode node2(processor); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo, test_repo); - core::ProcessSessionFactory factory(contextset.get()); + core::ProcessSessionFactory factory(contextset); processor->onSchedule(contextset.get(), &factory); for (int i = 0; i < 1; i++) { processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() { - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo, test_repo); context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5"); - std::shared_ptr<core::ProcessSession> session = std::make_shared<core::ProcessSession>(context.get()); + std::shared_ptr<core::ProcessSession> session = std::make_shared<core::ProcessSession>(context); while (!is_ready.load(std::memory_order_relaxed)) { } processor->onTrigger(context.get(), session.get()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/ThreadPoolAdjust.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ThreadPoolAdjust.cpp b/libminifi/test/integration/ThreadPoolAdjust.cpp new file mode 100644 index 0000000..a74460d --- /dev/null +++ b/libminifi/test/integration/ThreadPoolAdjust.cpp @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../TestServer.h" +#include "IntegrationBase.h" + +class HttpTestHarness : public IntegrationBase { + public: + HttpTestHarness() { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); + LogTestController::getInstance().setDebug<core::ProcessContext>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + configuration->set("nifi.flow.engine.threads", "1"); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("curl performed") == true); + assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); + assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); + } + + protected: + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + HttpTestHarness harness; + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/resources/TestHTTPGet.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestHTTPGet.yml b/libminifi/test/resources/TestHTTPGet.yml index 58f95d9..83de7ac 100644 --- a/libminifi/test/resources/TestHTTPGet.yml +++ b/libminifi/test/resources/TestHTTPGet.yml @@ -33,7 +33,7 @@ Processors: Properties: HTTP Method: GET Remote URL: http://localhost:10003/geturl - - name: OhJeez + - name: LogAttribute id: 2438e3c8-015a-1000-79ca-83af40ec1992 class: org.apache.nifi.processors.standard.LogAttribute max concurrent tasks: 1 @@ -53,16 +53,16 @@ Connections: source name: invoke source id: 2438e3c8-015a-1000-79ca-83af40ec1991 source relationship name: success - destination name: OhJeez + destination name: LogAttribute destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 max work queue size: 0 max work queue data size: 1 MB flowfile expiration: 60 sec - name: TransferFilesToRPG2 id: 2438e3c8-015a-1000-79ca-83af40ec1917 - source name: OhJeez + source name: LogAttribute source id: 2438e3c8-015a-1000-79ca-83af40ec1992 - destination name: OhJeez + destination name: LogAttribute destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 source relationship name: success max work queue size: 0 @@ -70,4 +70,4 @@ Connections: flowfile expiration: 60 sec Remote Processing Groups: - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/resources/TestHTTPGetSecure.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestHTTPGetSecure.yml b/libminifi/test/resources/TestHTTPGetSecure.yml index 9d19632..1ac82bd 100644 --- a/libminifi/test/resources/TestHTTPGetSecure.yml +++ b/libminifi/test/resources/TestHTTPGetSecure.yml @@ -33,8 +33,9 @@ Processors: Properties: SSL Context Service: SSLContextService HTTP Method: GET - Remote URL: https://raw.githubusercontent.com/apache/nifi-minifi-cpp/master/docs/minifi-logo.png - - name: OhJeez + Remote URL: https://localhost:10003/geturl + Disable Peer Verification: true + - name: LogAttribute id: 2438e3c8-015a-1000-79ca-83af40ec1992 class: org.apache.nifi.processors.standard.LogAttribute max concurrent tasks: 1 @@ -54,16 +55,16 @@ Connections: source name: invoke source id: 2438e3c8-015a-1000-79ca-83af40ec1991 source relationship name: success - destination name: OhJeez + destination name: LogAttribute destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 max work queue size: 0 max work queue data size: 1 MB flowfile expiration: 60 sec - name: TransferFilesToRPG2 id: 2438e3c8-015a-1000-79ca-83af40ec1917 - source name: OhJeez + source name: LogAttribute source id: 2438e3c8-015a-1000-79ca-83af40ec1992 - destination name: OhJeez + destination name: LogAttribute destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 source relationship name: success max work queue size: 0 @@ -85,4 +86,4 @@ Controller Services: - value: nifi-cert.pem Remote Processing Groups: - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/resources/TestHTTPPost.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestHTTPPost.yml b/libminifi/test/resources/TestHTTPPost.yml index c76069a..32e4f42 100644 --- a/libminifi/test/resources/TestHTTPPost.yml +++ b/libminifi/test/resources/TestHTTPPost.yml @@ -20,9 +20,9 @@ Flow Controller: name: MiNiFi Flow id: 2438e3c8-015a-1000-79ca-83af40ec1990 Processors: - - name: invoke + - name: generate id: 2438e3c8-015a-1000-79ca-83af40ec1991 - class: org.apache.nifi.processors.standard.GetFile + class: org.apache.nifi.processors.standard.GenerateFlowFile max concurrent tasks: 1 scheduling strategy: TIMER_DRIVEN scheduling period: 1 sec @@ -31,10 +31,20 @@ Processors: run duration nanos: 0 auto-terminated relationships list: Properties: - Input Directory: /tmp/aljr39 - Keep Source File: false - - - name: OhJeez + - name: listen + id: 2438e3c8-015a-1000-79ca-83af40ec1994 + class: org.apache.nifi.processors.standard.ListenHTTP + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Base Path: urlofchampions + Listening Port: 10004 + - name: Invoke id: 2438e3c8-015a-1000-79ca-83af40ec1992 class: org.apache.nifi.processors.standard.InvokeHTTP max concurrent tasks: 1 @@ -43,11 +53,12 @@ Processors: penalization period: 30 sec yield period: 1 sec run duration nanos: 0 - auto-terminated relationships list: response + auto-terminated relationships list: + - success Properties: HTTP Method: POST - Remote URL: http://localhost:10003/urlofchampions - + Content-type: text/html + Remote URL: http://localhost:10004/urlofchampions - name: Loggit id: 2438e3c8-015a-1000-79ca-83af40ec1993 class: org.apache.nifi.processors.standard.LogAttribute @@ -57,31 +68,37 @@ Processors: penalization period: 30 sec yield period: 1 sec run duration nanos: 0 - auto-terminated relationships list: response + auto-terminated relationships list: success Properties: - LogLevel: info + LogLevel: debug -Connections: - - name: TransferFilesToRPG +Connections: + - name: Generate/Loggit + id: 2438e3c8-015a-1000-79ca-83af40ec1919 + source id: 2438e3c8-015a-1000-79ca-83af40ec1991 + destination id: 2438e3c8-015a-1000-79ca-83af40ec1993 + source relationship name: success + destination relationship name: success + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + - name: GenerateFlowFile/Invoke id: 2438e3c8-015a-1000-79ca-83af40ec1997 source name: invoke source id: 2438e3c8-015a-1000-79ca-83af40ec1991 source relationship name: success - destination name: OhJeez + destination name: LogAttribute destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 max work queue size: 0 max work queue data size: 1 MB flowfile expiration: 60 sec - - name: TransferFilesToRPG2 - id: 2438e3c8-015a-1000-79ca-83af40ec1917 - source name: OhJeez - source id: 2438e3c8-015a-1000-79ca-83af40ec1992 - destination name: OhJeez + - name: Listen/Loggit + id: 2438e3c8-015a-1000-79ca-83af40ec1918 + source id: 2438e3c8-015a-1000-79ca-83af40ec1994 destination id: 2438e3c8-015a-1000-79ca-83af40ec1993 source relationship name: success max work queue size: 0 max work queue data size: 1 MB flowfile expiration: 60 sec - Remote Processing Groups: - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml new file mode 100644 index 0000000..110783c --- /dev/null +++ b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +Flow Controller: + name: MiNiFi Flow + id: 2438e3c8-015a-1000-79ca-83af40ec1990 +Processors: + - name: generate + id: 2438e3c8-015a-1000-79ca-83af40ec1991 + class: org.apache.nifi.processors.standard.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + - name: listen + id: 2438e3c8-015a-1000-79ca-83af40ec1994 + class: org.apache.nifi.processors.standard.ListenHTTP + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Base Path: urlofchampions + Listening Port: 10004 + - name: Invoke + id: 2438e3c8-015a-1000-79ca-83af40ec1992 + class: org.apache.nifi.processors.standard.InvokeHTTP + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + - success + Properties: + HTTP Method: POST + Use Chunked Encoding: true + Content-type: text/html + Remote URL: http://localhost:10004/urlofchampions + - name: Loggit + id: 2438e3c8-015a-1000-79ca-83af40ec1993 + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + - success + Properties: + LogLevel: debug + +Connections: + - name: GenerateFlowFile/Invoke + id: 2438e3c8-015a-1000-79ca-83af40ec1997 + source name: invoke + source id: 2438e3c8-015a-1000-79ca-83af40ec1991 + source relationship name: success + destination name: LogAttribute + destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + - name: Listen/Loggit + id: 2438e3c8-015a-1000-79ca-83af40ec1918 + source id: 2438e3c8-015a-1000-79ca-83af40ec1994 + destination id: 2438e3c8-015a-1000-79ca-83af40ec1993 + source relationship name: success + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec +Remote Processing Groups: + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/resources/TestSite2SiteRestSecure.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestSite2SiteRestSecure.yml b/libminifi/test/resources/TestSite2SiteRestSecure.yml new file mode 100644 index 0000000..fd530ae --- /dev/null +++ b/libminifi/test/resources/TestSite2SiteRestSecure.yml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Flow Controller: + id: 471deef6-2a6e-4a7d-912a-81cc17e3a205 + name: MiNiFi Flow + +Processors: + - name: GetFile + id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 + class: org.apache.nifi.processors.standard.GetFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 10 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Input Directory: /tmp/site2siteGetFile + Keep Source File: true + +Connections: + - name: GenerateFlowFileS2S + id: 471deef6-2a6e-4a7d-912a-81cc17e3a207 + source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 + source relationship name: success + destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer + +Remote Processing Groups: + - name: NiFi Flow + id: 471deef6-2a6e-4a7d-912a-81cc17e3a208 + url: https://localhost:8082/nifi + timeout: 30 secs + yield period: 10 sec + Input Ports: + - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + name: From Node A + max concurrent tasks: 1 + use compression: false + Properties: # Deviates from spec and will later be removed when this is autonegotiated + Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/resources/cn.crt.key.pem ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/cn.crt.key.pem b/libminifi/test/resources/cn.crt.key.pem new file mode 100644 index 0000000..9f10139 --- /dev/null +++ b/libminifi/test/resources/cn.crt.key.pem @@ -0,0 +1,52 @@ +Bag Attributes + friendlyName: nifi-key + localKeyID: 73 E6 90 32 31 08 F5 87 C2 CE 8D 17 10 32 05 F2 95 6A 9E 9C +subject=/OU=NIFI/CN=test +issuer=/OU=NIFI/CN=localhost +-----BEGIN CERTIFICATE----- +MIIDQTCCAimgAwIBAgIKAVpnU2lhAAAAADANBgkqhkiG9w0BAQsFADAjMQ0wCwYD +VQQLDAROSUZJMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTcwMjIyMTkzNjQ0WhcN +MjAwMjIyMTkzNjQ0WjAeMQ0wCwYDVQQLDAROSUZJMQ0wCwYDVQQDDAR0ZXN0MIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAn9zCw5/EDWKe/jZWIou8aohC +0wDhe8azYAORb45kEEqB2Q1pJJDHIIoNgDukXhrcPWFz2FxdZ56so7kkrjkN507t +11dJn+wuFwfVot0us35NvOclt3rHNy38nid4gV4+F01mXksrNWCnvTNSmp4XIWlR +HxFkiZyKVTRxyo5U3fr2zmtm80LtopoBEf5Lr29WYZrjgA+A8iDMKvVlgmpriau4 +4Vie9L3DIot4eAO7eN3o5UG+oYE7RQ6BdXSo9+D/tbma0rZvS9WHw3Qin7gDygaG +KbwWEFuwhSYO+lcsywsB5hT3uxGTRnskafE29/vvzmsygMlNoRXk5NaRlnf/NQID +AQABo3wwejAdBgNVHQ4EFgQUc+aQMjEI9YfCzo0XEDIF8pVqnpwwHwYDVR0jBBgw +FoAUtz2ps5YYKDRnj1ixtUA8NOjmo2IwDgYDVR0PAQH/BAQDAgP4MAkGA1UdEwQC +MAAwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMA0GCSqGSIb3DQEBCwUA +A4IBAQBbpGba97qG7eXIQm3hiMKsuGpMNoybKxUlazOEeqG5QemMxSDOy2TDMBgM +2oQU3RImrNMLbfWCxnQ1biWclis9exR5IkUintb8hEE8Vbq/nTLCi7IvpEg+2oDc +kvSs8UTrI17EQAzh/vc9ljbgtvJD0L8khO8oPPSEpweD+vk/ZragQJOzeA7ohYMM +kJDJ/a9HNNe6nPx+NOCWQH2hJS8Ok2z57hBgy1oym/xJoT9z3vfjQcHtNK4qLNlx +P+o2mWNC2ZYNe84ghUfMD+btlnA3MrPEEOJHXOoE9UgaeNZXjwuF6N1qI2RCWSVW +IU4uwXjW1icAEYZbCK9f2l5cK+Vu +-----END CERTIFICATE----- +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAn9zCw5/EDWKe/jZWIou8aohC0wDhe8azYAORb45kEEqB2Q1p +JJDHIIoNgDukXhrcPWFz2FxdZ56so7kkrjkN507t11dJn+wuFwfVot0us35NvOcl +t3rHNy38nid4gV4+F01mXksrNWCnvTNSmp4XIWlRHxFkiZyKVTRxyo5U3fr2zmtm +80LtopoBEf5Lr29WYZrjgA+A8iDMKvVlgmpriau44Vie9L3DIot4eAO7eN3o5UG+ +oYE7RQ6BdXSo9+D/tbma0rZvS9WHw3Qin7gDygaGKbwWEFuwhSYO+lcsywsB5hT3 +uxGTRnskafE29/vvzmsygMlNoRXk5NaRlnf/NQIDAQABAoIBAD0Z6tNMiGm5TR3K +9OclU7HAPRJbHtPpYXwMQRVLblGPuMuWRZ61O3bjbT+bDDrvt0g6rEWBIC5PxpqZ +VDzHD9peH6gT888NPZyN4Qjr00LFvr659n+HjQc5K3njijqgI8lSSxM1VeGD/Pji +2erBDAQ4NcpTIoM6cuEYN3n9JtDk1B/nJ4vWD+1Ak/sS91hmaIy/1hOUofgaqp4W +8+pnur/lEn4d9yCfEbDOfkfep49si8kDw8SdBH6uFpr1s71E2q9PfGv5WN4LGxc+ +fX3zHHnxFRB9XRivo42bIxCRyf/U/GNeD0ldlXzmfUkVKw/V1CLioRKP+KaJ7aT6 +FYZE1GECgYEA6Wk4t7kkNGszWiFJpGlUmB50CmmH0zdb+qeiBhxuAAgcPveglXJX +qNAJOxDY29nSQqhtuRe/vhQEk88axNVfTIHBFF+29P8ODQPkQsOn5pyCeii+MTmJ +R1Kq8/5F08CPCJ2n/Rfd1xYKsiB6QJDeRCxbUHuqlOwwkuASNnHVPI0CgYEAr1Vd +SXToxlNOesCuvfK+IV+7qanAag49MSoHKYuzii4YhaS/9yC8HGNUNU4ZOr3gyOCo +rlPGsJx+7HGAIylg7M+w0xrAOlwcBIfD6DHSoFSCbpxKQGOdav5PpKvKmXDbnCeq +hvlJRSRepC9raZePRqHMeO+y0SCMCW/3kCvBZ0kCgYBPVEXEm5YkgrLHVTz0NiBc +Oy1xRIC3Vr9aJ5M3xH3WYOtIs1zOrjgA1gPtIUKzJgsCQMBaAipW7fLIUcFlWKe1 +FvyKciV6ix05azMvRdRQ2qy13BcYH9f6d6xGlQsidvWmFf88U9o/+evUspkaTKJe +RT0b04wcZVH0+Vb3TiUNGQKBgQCBmpxGfEDEuTe9zUhd6F1a1iUmGPC+dKAPA34f +28BvV/uQbmPoDS5/3XyVmpFTtp++XTLTCeJJEqVPqeaKF0AqTKvsoE2P1h5vytup +e4qyMUXo9WZ9iiUGvc2pb8NnAOGU4E1RUFw029mJi/PeLcYe8+LtUmHf7CsXnshk +rbNc4QKBgFAIoRZonaHDU4AYXx/6Xem2EneJ3tXyxMZ8WEjNQKaWmA5rkGbbSijk +0EaJ3bMexlyp1qgv86f4LrfLbZLdqTDSzhqlTV0x0b6D7MRBLuzcQUqsKmDmxkxq +CkJUqnNhVWM0SMG8cy9K1WMBecdot8I2OH8L00uwKkkM4pQ1En+o +-----END RSA PRIVATE KEY-----