Repository: nifi-minifi-cpp Updated Branches: refs/heads/master f5832facf -> 9f161a27e
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/resources/nifi-cert-key.pem ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/nifi-cert-key.pem b/libminifi/test/resources/nifi-cert-key.pem new file mode 100644 index 0000000..0c3b7da --- /dev/null +++ b/libminifi/test/resources/nifi-cert-key.pem @@ -0,0 +1,47 @@ +-----BEGIN CERTIFICATE----- +MIIDSTCCAjGgAwIBAgIKAVpnU2gBAAAAADANBgkqhkiG9w0BAQsFADAjMQ0wCwYD +VQQLDAROSUZJMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTcwMjIyMTkzNjQzWhcN +MjAwMjIyMTkzNjQzWjAjMQ0wCwYDVQQLDAROSUZJMRIwEAYDVQQDDAlsb2NhbGhv +c3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDAIXpNyG57u1HroE+D +zzpY5g0+BLA1qrUolBK4ZwQcrbCUAS5h+JjP0tuMUtlEtFjYIX2YgZvrzdvKue84 +xyrpr5h1cNo4dmeQoFKwj/FPNJxdO/cQpK55nSwy4PsvTL/YKzaV/yF4FQRSqY17 +Xkn3K8RDUaknSPuIIKd4piON7MVaCVFK//2y6+pULEjGviP2Bc4ODJTfB3AjGRBQ +EleTga0OnTdHFzHoKHRUHCNwaRgeFywYdHODp1yvv3m0t85A0D9WWN7T5G0f8pT9 +ZeDOX2cq3EPtdTBnKGgSXM1x6E3RaYoOACYBKdy/Lv5hjMpTr0fXhD7af9ql2irH ++RNBAgMBAAGjfzB9MA4GA1UdDwEB/wQEAwIB/jAMBgNVHRMEBTADAQH/MB0GA1Ud +DgQWBBS3PamzlhgoNGePWLG1QDw06OajYjAfBgNVHSMEGDAWgBS3PamzlhgoNGeP +WLG1QDw06OajYjAdBgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwDQYJKoZI +hvcNAQELBQADggEBAKcXKG4rPnEuYS/AJfHs5RlGsg2h/VnbCT50A4OsUNcv5KSe +WFeP2ctp793MsIsUKA3FgGi+23aaUTtlAclQQs/xlBiDq6XmkED/DjLVzt5dTHrl +gS6aFtDHffMn5tLaUKOkNETOG/Fnc+XPAENB8GyqBd2Q/1L0SWSHXSTecX+aA9P6 +lvrRtWOqyGHiRoaRE5+VUjyO+0ToEgj9E+3rV8JL66BT7SWQusLGqbX1OoANCMTj +BRYeqB0g0PrXU+6chh6StpNSnYzkQdoxLUIDYYZx2XGsbkjDh/k6ni6bgJEKEOCu +T3Z2tyvGpc+PjLRXW/WyXCpg/xfr3+GSVKI6ark= +-----END CERTIFICATE----- +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAwCF6Tchue7tR66BPg886WOYNPgSwNaq1KJQSuGcEHK2wlAEu +YfiYz9LbjFLZRLRY2CF9mIGb683byrnvOMcq6a+YdXDaOHZnkKBSsI/xTzScXTv3 +EKSueZ0sMuD7L0y/2Cs2lf8heBUEUqmNe15J9yvEQ1GpJ0j7iCCneKYjjezFWglR +Sv/9suvqVCxIxr4j9gXODgyU3wdwIxkQUBJXk4GtDp03Rxcx6Ch0VBwjcGkYHhcs +GHRzg6dcr795tLfOQNA/Vlje0+RtH/KU/WXgzl9nKtxD7XUwZyhoElzNcehN0WmK +DgAmASncvy7+YYzKU69H14Q+2n/apdoqx/kTQQIDAQABAoIBAQCz7eY69+y4BXo3 +nz84Ipby8CcQoJVg/QiBAwLxHNCWBvdp9B069PQvFLo1FNWSaQ8XAW48p4yc7YHb +vftRgfwnMyIlQdWrsP9WSz6FSZhkY9HX4rODK6aWD+J3l4jFCCxVxkpteKwgaBZP +T6hHE8tTJfK8VLqEJu4g0uvjqjt7ydJT69lThdyf3VE0v6ZeSjsya5qqw+9RK+uC +q5T/8FxeFZgpfR6UXXnoLAmAkfcMZNIBo6cOJWi/BQHjZdpCOVXUBtu0/lC8bffa +4/ESaxRS8kOp+WEb64pT7u6F7yhD/kve6ZnJj/SX1EvN+RzB3zoVG42WUs/+/SwN +dU1ERz+tAoGBAPbgZPDnWuKxW7Cam/Aqmvux624C1lNfhfXEGURhyc+wHWjjhWRe +2vEPJOVxG5pN/FAo+lFoGiLe3QsLRLPlQrGfT/92W28QEcRrRSutjRZOL3wKezQA +DkAPU9HX3lACR5yQD6+a0HHgMr1MqeNFPi9MPPjywGywTyWzHd4WQqvTAoGBAMc7 +J4fpr5uPVq9mKemK67i7meJ8AxjjU7oNe8EN+2XfCYcQUmgIo+dLzV9+DTrYkoTz +iqjA6Ph2DNs6YHI/JNwsdSbAz6KVDteimt3t+uyNpiMGuyLmfOgpYEMJcHp+q6I6 +7PGKVS4c5iPFiYuIo23Is9ZMxOVQp76+UOy09rwbAoGBAOM5Za7VQjGkTGAf7ab/ +j+ZZu/dlZR8XrJSoCRmHZ9hgoLEJuJzJMXruFWeY028SmEivbrW+u0+dEJY5qOJr +ARe7KkZXCZEPmUrP8Lpi4pjFHa9tdjhGVNdhRCTAKz442vCfJ9DZDUHCuPDCvxsP +gEzIPtZjl/hxzmdElRj0JClBAoGAaXmfzAyjs6+HLQThW4r4kKyBI66T1TFEulM5 +GVPVrHEQEjlJ51nrrCAtckjBqE3QBCMLXZwDusaEt+uH8/QKB6Zhv0qEooZXfUHQ +y32aQnIbap+9oxRzPFXraJIuwisdop2fo6Cgx/D0xitmTkDghNaknue1tdGlfQ40 +uZx0o9ECgYBeKeNbMnWoO46ZOrhaz8On+fIY7xtboV2bALy7lvUbWd9B41ntqYUm +NHlYXDDU+Izs5wnNJnNnx4vECuUzYbpeY82dvMewlQwfl5aiyKrjo7VxLm//2U/K +hlID6DU5wi9O+TAQ319DhxT7Ja+AQxO/OFS/mfrtwJEevxXqJLu55Q== +-----END RSA PRIVATE KEY----- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/C2MetricsTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/C2MetricsTests.cpp b/libminifi/test/unit/C2MetricsTests.cpp new file mode 100644 index 0000000..75e6cc2 --- /dev/null +++ b/libminifi/test/unit/C2MetricsTests.cpp @@ -0,0 +1,230 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include <uuid/uuid.h> +#include <memory> +#include "../TestBase.h" +#include "io/ClientSocket.h" +#include "core/Processor.h" +#include "core/ClassLoader.h" +#include "core/yaml/YamlConfiguration.h" +#include "core/state/metrics/ProcessMetrics.h" +#include "core/state/metrics/RepositoryMetrics.h" +#include "core/state/metrics/QueueMetrics.h" +#include "core/state/metrics/SystemMetrics.h" + +TEST_CASE("TestProcessMetrics", "[c2m1]") { + minifi::state::metrics::ProcessMetrics metrics; + + REQUIRE("ProcessMetrics" == metrics.getName()); + + REQUIRE(2 == metrics.serialize().size()); + + REQUIRE("MemoryMetrics" == metrics.serialize().at(0).name); + REQUIRE("CpuMetrics" == metrics.serialize().at(1).name); +} + +TEST_CASE("TestSystemMetrics", "[c2m5]") { + minifi::state::metrics::SystemInformation metrics; + + REQUIRE("SystemInformation" == metrics.getName()); + + REQUIRE(3 == metrics.serialize().size()); + + REQUIRE("vcores" == metrics.serialize().at(0).name); + REQUIRE("physicalmem" == metrics.serialize().at(1).name); + REQUIRE("machinearch" == metrics.serialize().at(2).name); +} + +TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") { + minifi::state::metrics::QueueMetrics metrics; + + REQUIRE("QueueMetrics" == metrics.getName()); + + REQUIRE(0 == metrics.serialize().size()); +} + +TEST_CASE("QueueMetricsTestConnections", "[c2m3]") { + minifi::state::metrics::QueueMetrics metrics; + + REQUIRE("QueueMetrics" == metrics.getName()); + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(configuration); + + std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "testconnection"); + + metrics.addConnection(connection); + + connection->setMaxQueueDataSize(1024); + connection->setMaxQueueSize(1024); + + REQUIRE(1 == metrics.serialize().size()); + + minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + + REQUIRE("testconnection" == resp.name); + + REQUIRE(4 == resp.children.size()); + + minifi::state::metrics::MetricResponse datasize = resp.children.at(0); + + REQUIRE("datasize" == datasize.name); + REQUIRE("0" == datasize.value); + + minifi::state::metrics::MetricResponse datasizemax = resp.children.at(1); + + REQUIRE("datasizemax" == datasizemax.name); + REQUIRE("1024" == datasizemax.value); + + minifi::state::metrics::MetricResponse queued = resp.children.at(2); + + REQUIRE("queued" == queued.name); + REQUIRE("0" == queued.value); + + minifi::state::metrics::MetricResponse queuedmax = resp.children.at(3); + + REQUIRE("queuedmax" == queuedmax.name); + REQUIRE("1024" == queuedmax.value); +} + +TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") { + minifi::state::metrics::RepositoryMetrics metrics; + + REQUIRE("RepositoryMetrics" == metrics.getName()); + + REQUIRE(0 == metrics.serialize().size()); +} + +TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { + minifi::state::metrics::RepositoryMetrics metrics; + + REQUIRE("RepositoryMetrics" == metrics.getName()); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + metrics.addRepository(repo); + { + REQUIRE(1 == metrics.serialize().size()); + + minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + + REQUIRE("repo_name" == resp.name); + + REQUIRE(3 == resp.children.size()); + + minifi::state::metrics::MetricResponse running = resp.children.at(0); + + REQUIRE("running" == running.name); + REQUIRE("0" == running.value); + + minifi::state::metrics::MetricResponse full = resp.children.at(1); + + REQUIRE("full" == full.name); + REQUIRE("0" == full.value); + + minifi::state::metrics::MetricResponse size = resp.children.at(2); + + REQUIRE("size" == size.name); + REQUIRE("0" == size.value); + } + + repo->start(); + { + REQUIRE(1 == metrics.serialize().size()); + + minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + + REQUIRE("repo_name" == resp.name); + + REQUIRE(3 == resp.children.size()); + + minifi::state::metrics::MetricResponse running = resp.children.at(0); + + REQUIRE("running" == running.name); + REQUIRE("1" == running.value); + + minifi::state::metrics::MetricResponse full = resp.children.at(1); + + REQUIRE("full" == full.name); + REQUIRE("0" == full.value); + + minifi::state::metrics::MetricResponse size = resp.children.at(2); + + REQUIRE("size" == size.name); + REQUIRE("0" == size.value); + } + + repo->setFull(); + + { + REQUIRE(1 == metrics.serialize().size()); + + minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + + REQUIRE("repo_name" == resp.name); + + REQUIRE(3 == resp.children.size()); + + minifi::state::metrics::MetricResponse running = resp.children.at(0); + + REQUIRE("running" == running.name); + REQUIRE("1" == running.value); + + minifi::state::metrics::MetricResponse full = resp.children.at(1); + + REQUIRE("full" == full.name); + REQUIRE("1" == full.value); + + minifi::state::metrics::MetricResponse size = resp.children.at(2); + + REQUIRE("size" == size.name); + REQUIRE("0" == size.value); + } + + repo->stop(); + + { + REQUIRE(1 == metrics.serialize().size()); + + minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + + REQUIRE("repo_name" == resp.name); + + REQUIRE(3 == resp.children.size()); + + minifi::state::metrics::MetricResponse running = resp.children.at(0); + + REQUIRE("running" == running.name); + REQUIRE("0" == running.value); + + minifi::state::metrics::MetricResponse full = resp.children.at(1); + + REQUIRE("full" == full.name); + REQUIRE("1" == full.value); + + minifi::state::metrics::MetricResponse size = resp.children.at(2); + + REQUIRE("size" == size.name); + REQUIRE("0" == size.value); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/GetTCPTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/GetTCPTests.cpp b/libminifi/test/unit/GetTCPTests.cpp new file mode 100644 index 0000000..67f42a4 --- /dev/null +++ b/libminifi/test/unit/GetTCPTests.cpp @@ -0,0 +1,419 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include <uuid/uuid.h> +#include <utility> +#include <memory> +#include <string> +#include <vector> +#include <set> +#include <fstream> +#include "../unit/ProvenanceTestHelper.h" +#include "../TestBase.h" +#include "Scheduling.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" +#include "processors/GetTCP.h" +#include "core/Core.h" +#include "core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" + +TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") { + TestController testController; + std::vector<uint8_t> buffer; + for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") { + buffer.push_back(c); + } + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(std::make_shared<minifi::Configure>()); + + std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9184, 1); + + REQUIRE(-1 != server.initialize()); + + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + LogTestController::getInstance().setDebug<minifi::processors::GetTCP>(); + LogTestController::getInstance().setTrace<minifi::io::Socket>(); + + std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample"); + + std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + processor->setStreamFactory(stream_factory); + processor->initialize(); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + + uuid_t logattribute_uuid; + REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection"); + connection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); + connection2->setRelationship(core::Relationship("success", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(processor); + + // link the connections so that we can test results at the end for this + connection->setDestination(logAttribute); + + connection2->setSource(logAttribute); + + connection2->setSourceUUID(logattribute_uuid); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logattribute_uuid); + + processor->addConnection(connection); + logAttribute->addConnection(connection); + logAttribute->addConnection(connection2); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, "localhost:9184"); + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); + + REQUIRE(processor->getName() == "gettcpexample"); + + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + processor->onTrigger(context, session); + server.writeData(buffer, buffer.size()); + std::this_thread::sleep_for(std::chrono::seconds(2)); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + logAttribute->onSchedule(context2, factory2); + logAttribute->onTrigger(context2, session2); + + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session->get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(context, session); + reporter = session->getProvenanceReporter(); + + records = reporter->getEvents(); + session->commit(); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + logAttribute->onTrigger(context2, session2); + + records = reporter->getEvents(); + + REQUIRE(true == LogTestController::getInstance().contains("Size:45 Offset:0")); + + LogTestController::getInstance().reset(); +} + +TEST_CASE("GetTCPWithOEM", "[GetTCP2]") { + std::vector<uint8_t> buffer; + for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") { + buffer.push_back(c); + } + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(std::make_shared<minifi::Configure>()); + + std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + + TestController testController; + + org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1); + + REQUIRE(-1 != server.initialize()); + + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository >(); + LogTestController::getInstance().setDebug<minifi::processors::GetTCP>(); + LogTestController::getInstance().setTrace<minifi::io::Socket>(); + + std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample"); + + std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + processor->setStreamFactory(stream_factory); + processor->initialize(); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + + uuid_t logattribute_uuid; + REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection"); + connection->setRelationship(core::Relationship("partial", "description")); + + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); + connection2->setRelationship(core::Relationship("partial", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(processor); + + // link the connections so that we can test results at the end for this + connection->setDestination(logAttribute); + + connection2->setSource(logAttribute); + + connection2->setSourceUUID(logattribute_uuid); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logattribute_uuid); + + processor->addConnection(connection); + logAttribute->addConnection(connection); + logAttribute->addConnection(connection2); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, "localhost:9183"); + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec"); + // we're using new lines above + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte, "10"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); + + + REQUIRE(processor->getName() == "gettcpexample"); + + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + processor->onTrigger(context, session); + server.writeData(buffer, buffer.size()); + std::this_thread::sleep_for(std::chrono::seconds(2)); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + logAttribute->onSchedule(context2, factory2); + logAttribute->onTrigger(context2, session2); + + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session->get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(context, session); + reporter = session->getProvenanceReporter(); + + records = reporter->getEvents(); + session->commit(); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + logAttribute->onTrigger(context2, session2); + + records = reporter->getEvents(); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + logAttribute->onTrigger(context2, session2); + + records = reporter->getEvents(); + + REQUIRE(true == LogTestController::getInstance().contains("Size:11 Offset:0")); + REQUIRE(true == LogTestController::getInstance().contains("Size:12 Offset:0")); + REQUIRE(true == LogTestController::getInstance().contains("Size:22 Offset:0")); + + LogTestController::getInstance().reset(); +} + +TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") { + std::vector<uint8_t> buffer; + for (auto c : "\n") { + buffer.push_back(c); + } + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(std::make_shared<minifi::Configure>()); + + std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + + TestController testController; + + LogTestController::getInstance().setDebug<minifi::io::Socket>(); + + org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1); + + REQUIRE(-1 != server.initialize()); + + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + LogTestController::getInstance().setDebug<minifi::processors::GetTCP>(); + + std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample"); + + std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + processor->setStreamFactory(stream_factory); + processor->initialize(); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + + uuid_t logattribute_uuid; + REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection"); + connection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); + connection2->setRelationship(core::Relationship("success", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(processor); + + // link the connections so that we can test results at the end for this + connection->setDestination(logAttribute); + + connection2->setSource(logAttribute); + + connection2->setSourceUUID(logattribute_uuid); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logattribute_uuid); + + processor->addConnection(connection); + logAttribute->addConnection(connection); + logAttribute->addConnection(connection2); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, "localhost:9183"); + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec"); + // we're using new lines above + context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte, "10"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); + + + REQUIRE(processor->getName() == "gettcpexample"); + + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + processor->onTrigger(context, session); + server.writeData(buffer, buffer.size()); + std::this_thread::sleep_for(std::chrono::seconds(2)); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + logAttribute->onSchedule(context2, factory2); + logAttribute->onTrigger(context2, session2); + + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session->get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(context, session); + reporter = session->getProvenanceReporter(); + + records = reporter->getEvents(); + session->commit(); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + logAttribute->onTrigger(context2, session2); + + records = reporter->getEvents(); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + logAttribute->onTrigger(context2, session2); + + records = reporter->getEvents(); + + REQUIRE(true == LogTestController::getInstance().contains("Size:2 Offset:0")); + LogTestController::getInstance().reset(); +} + +TEST_CASE("GetTCPEmptyNoConnect", "[GetTCP3]") { + TestController testController; + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + LogTestController::getInstance().setDebug<minifi::processors::GetTCP>(); + LogTestController::getInstance().setTrace<minifi::io::Socket>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetTCP", "gettcpexample"); + + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); + + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetTCP::EndpointList.getName(), "localhost:9183"); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval.getName(), "100 msec"); + // we're using new lines above + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte.getName(), "10"); + + testController.runSession(plan, false); + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + REQUIRE(true == LogTestController::getInstance().contains("Could not create socket during initialization for localhost:9183")); + LogTestController::getInstance().reset(); +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/InvokeHTTPTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/InvokeHTTPTests.cpp b/libminifi/test/unit/InvokeHTTPTests.cpp index 2ef3c17..5e415a4 100644 --- a/libminifi/test/unit/InvokeHTTPTests.cpp +++ b/libminifi/test/unit/InvokeHTTPTests.cpp @@ -83,60 +83,60 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { invokehttp->addConnection(connection); invokehttp->addConnection(connection2); - core::ProcessorNode node(listenhttp); - core::ProcessorNode node2(invokehttp); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); - core::ProcessContext context2(node2, controller_services_provider, repo, repo, content_repo); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest"); - core::ProcessSession session(&context); - core::ProcessSession session2(&context2); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); REQUIRE(listenhttp->getName() == "listenhttp"); - core::ProcessSessionFactory factory(&context); + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); std::shared_ptr<core::FlowFile> record; listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onSchedule(&context, &factory); - listenhttp->onTrigger(&context, &session); + listenhttp->onSchedule(context, factory); + listenhttp->onTrigger(context, session); invokehttp->incrementActiveTasks(); invokehttp->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessSessionFactory factory2(&context2); - invokehttp->onSchedule(&context2, &factory2); - invokehttp->onTrigger(&context2, &session2); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + invokehttp->onSchedule(context2, factory2); + invokehttp->onTrigger(context2, session2); - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session.get(); + record = session->get(); REQUIRE(record == nullptr); REQUIRE(records.size() == 0); listenhttp->incrementActiveTasks(); listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onTrigger(&context, &session); + listenhttp->onTrigger(context, session); - reporter = session.getProvenanceReporter(); + reporter = session->getProvenanceReporter(); records = reporter->getEvents(); - session.commit(); + session->commit(); invokehttp->incrementActiveTasks(); invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(&context2, &session2); + invokehttp->onTrigger(context2, session2); - session2.commit(); + session2->commit(); records = reporter->getEvents(); for (provenance::ProvenanceEventRecord *provEventRecord : records) { REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); } - std::shared_ptr<core::FlowFile> ffr = session2.get(); + std::shared_ptr<core::FlowFile> ffr = session2->get(); REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); LogTestController::getInstance().reset(); } @@ -204,22 +204,23 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { invokehttp->addConnection(connection); invokehttp->addConnection(connection2); - core::ProcessorNode node(invokehttp); - core::ProcessorNode node2(listenhttp); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); - core::ProcessContext context2(node2, controller_services_provider, repo, repo, content_repo); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest"); - core::ProcessSession session(&context); - core::ProcessSession session2(&context2); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); REQUIRE(listenhttp->getName() == "listenhttp"); - core::ProcessSessionFactory factory(&context); + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); std::shared_ptr<core::FlowFile> record; @@ -228,45 +229,45 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { std::map<std::string, std::string> attributes; attributes["testy"] = "test"; std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes); - session2.write(flow, &callback); + session2->write(flow, &callback); invokehttp->incrementActiveTasks(); invokehttp->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessSessionFactory factory2(&context2); - invokehttp->onSchedule(&context2, &factory2); - invokehttp->onTrigger(&context2, &session2); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + invokehttp->onSchedule(context2, factory2); + invokehttp->onTrigger(context2, session2); listenhttp->incrementActiveTasks(); listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onSchedule(&context, &factory); - listenhttp->onTrigger(&context, &session); + listenhttp->onSchedule(context, factory); + listenhttp->onTrigger(context, session); - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session.get(); + record = session->get(); REQUIRE(record == nullptr); REQUIRE(records.size() == 0); listenhttp->incrementActiveTasks(); listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onTrigger(&context, &session); + listenhttp->onTrigger(context, session); - reporter = session.getProvenanceReporter(); + reporter = session->getProvenanceReporter(); records = reporter->getEvents(); - session.commit(); + session->commit(); invokehttp->incrementActiveTasks(); invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(&context2, &session2); + invokehttp->onTrigger(context2, session2); - session2.commit(); + session2->commit(); records = reporter->getEvents(); for (provenance::ProvenanceEventRecord *provEventRecord : records) { REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); } - std::shared_ptr<core::FlowFile> ffr = session2.get(); + std::shared_ptr<core::FlowFile> ffr = session2->get(); REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); LogTestController::getInstance().reset(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/LoggerTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp index c7509f5..9daf33e 100644 --- a/libminifi/test/unit/LoggerTests.cpp +++ b/libminifi/test/unit/LoggerTests.cpp @@ -19,9 +19,11 @@ #include <utility> #include <string> #include <memory> +#include <vector> #include <ctime> #include "../TestBase.h" + TEST_CASE("Test log Levels", "[ttl1]") { LogTestController::getInstance().setTrace<logging::Logger>(); std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<logging::Logger>::getLogger(); @@ -71,3 +73,7 @@ TEST_CASE("Test log Levels change", "[ttl5]") { REQUIRE(false == LogTestController::getInstance().contains("[org::apache::nifi::minifi::core::logging::Logger] [error] hello world")); LogTestController::getInstance().reset(); } + +TEST_CASE("Test Demangle template", "[ttl6]") { + std::cout << core::getClassName<std::vector<int>>() << std::endl; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 7f34ba4..25376c9 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -43,8 +43,9 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]") { REQUIRE(processor->getName() == "processorname"); } -TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { +TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") { TestController testController; + LogTestController::getInstance().setDebug<minifi::processors::GetFile>(); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); @@ -71,27 +72,30 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { processor->addConnection(connection); REQUIRE(dir != NULL); - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, test_repo, test_repo); - core::ProcessSessionFactory factory(&context); - context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir); + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + + context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir); // replicate 10 threads processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onSchedule(&context, &factory); + + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + + processor->onSchedule(context, factory); int prev = 0; for (int i = 0; i < 10; i++) { - core::ProcessSession session(&context); + auto session = std::make_shared<core::ProcessSession>(context); REQUIRE(processor->getName() == "getfileCreate2"); std::shared_ptr<core::FlowFile> record; - processor->onTrigger(&context, &session); + processor->onTrigger(context, session); - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session.get(); + record = session->get(); REQUIRE(record == nullptr); REQUIRE(records.size() == 0); @@ -104,9 +108,9 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { processor->incrementActiveTasks(); processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); + processor->onTrigger(context, session); unlink(ss.str().c_str()); - reporter = session.getProvenanceReporter(); + reporter = session->getProvenanceReporter(); REQUIRE(processor->getName() == "getfileCreate2"); @@ -115,15 +119,16 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { for (provenance::ProvenanceEventRecord *provEventRecord : records) { REQUIRE(provEventRecord->getComponentType() == processor->getName()); } - session.commit(); - std::shared_ptr<core::FlowFile> ffr = session.get(); + session->commit(); + std::shared_ptr<core::FlowFile> ffr = session->get(); - REQUIRE((repo->getRepoMap().size() % 2) == 0); - REQUIRE(repo->getRepoMap().size() == (prev + 2)); - prev += 2; + std::cout << repo->getRepoMap().size() << std::endl; + REQUIRE(repo->getRepoMap().size() == (prev + 1)); + prev++; } } + TEST_CASE("LogAttributeTest", "[getfileCreate3]") { TestController testController; LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); @@ -169,6 +174,7 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { TEST_CASE("Test Find file", "[getfileCreate3]") { TestController testController; + LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceReporter>(); std::shared_ptr<TestPlan> plan = testController.createPlan(); std::shared_ptr<core::Processor> processor = plan->addProcessor("GetFile", "getfileCreate2"); std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/PropertyTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/PropertyTests.cpp b/libminifi/test/unit/PropertyTests.cpp index af85579..eeb74ef 100644 --- a/libminifi/test/unit/PropertyTests.cpp +++ b/libminifi/test/unit/PropertyTests.cpp @@ -19,6 +19,7 @@ #include "../../include/core/Property.h" #include <string> #include "utils/StringUtils.h" +#include "core/Property.h" #include "../TestBase.h" TEST_CASE("Test Boolean Conversion", "[testboolConversion]") { @@ -33,6 +34,18 @@ TEST_CASE("Test Boolean Conversion", "[testboolConversion]") { REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("false", b)); } + +TEST_CASE("Test Is it Time", "[testTime]") { + core::TimeUnit unit; + int64_t max_partition_millis_; + + REQUIRE(true == org::apache::nifi::minifi::core::Property::StringToTime("1 SEC", max_partition_millis_, unit)); + REQUIRE(true == org::apache::nifi::minifi::core::Property::StringToTime("1 sec", max_partition_millis_, unit)); + + REQUIRE(true == org::apache::nifi::minifi::core::Property::StringToTime("1 s", max_partition_millis_, unit)); + REQUIRE(true == org::apache::nifi::minifi::core::Property::StringToTime("1 S", max_partition_millis_, unit)); +} + TEST_CASE("Test Trimmer Right", "[testTrims]") { std::string test = "a quick brown fox jumped over the road\t\n"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 1b39700..2583a09 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -47,6 +47,18 @@ class TestRepository : public core::Repository { return true; } + void start() { + running_ = true; + } + + void stop() { + running_ = false; + } + + void setFull() { + repo_full_ = true; + } + // Destructor virtual ~TestRepository() { @@ -183,7 +195,7 @@ class TestFlowRepository : public core::repository::FlowFileRepository { } } } - + void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { } @@ -198,7 +210,7 @@ class TestFlowController : public minifi::FlowController { public: TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo) - : minifi::FlowController(repo, flow_file_repo,std::make_shared<minifi::Configure>(), nullptr, std::make_shared<core::repository::VolatileContentRepository>(), "", true) { + : minifi::FlowController(repo, flow_file_repo, std::make_shared<minifi::Configure>(), nullptr, std::make_shared<core::repository::VolatileContentRepository>(), "", true) { } ~TestFlowController() { @@ -207,18 +219,23 @@ class TestFlowController : public minifi::FlowController { } - bool start() { + int16_t start() { running_.store(true); - return true; + return 0; } - void stop(bool force) { + int16_t stop(bool force, uint64_t timeToWait = 0) { running_.store(false); + return 0; } void waitUnload(const uint64_t timeToWaitMs) { stop(true); } + int16_t pause() { + return -1; + } + void unload() { stop(true); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp index 3b18310..6ee690b 100644 --- a/libminifi/test/unit/RepoTests.cpp +++ b/libminifi/test/unit/RepoTests.cpp @@ -19,10 +19,12 @@ #include "../TestBase.h" #include <memory> #include <string> +#include <map> #include "ProvenanceTestHelper.h" #include "provenance/Provenance.h" #include "FlowFileRecord.h" #include "core/Core.h" +#include "core/repository/FlowFileRepository.h" #include "../../include/core/repository/AtomicRepoEntries.h" #include "properties/Configure.h" @@ -106,3 +108,55 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { REQUIRE(true == record2.getAttribute("keyB", value)); REQUIRE("" == value); } + +TEST_CASE("Test Delete Content ", "[TestFFR4]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + + char *dir = testController.createTempDirectory(format); + + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1); + + std::map<std::string, std::string> attributes; + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + + repository->initialize(std::make_shared<minifi::Configure>()); + + repository->loadComponent(content_repo); + + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo); + + minifi::FlowFileRecord record(repository, content_repo, attributes, claim); + + record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); + + record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd"); + + REQUIRE(true == record.Serialize()); + + claim->decreaseFlowFileRecordOwnedCount(); + + claim->decreaseFlowFileRecordOwnedCount(); + + repository->Delete(record.getUUIDStr()); + + repository->flush(); + + repository->stop(); + + std::ifstream fileopen(ss.str()); + REQUIRE(false == fileopen.good()); + + LogTestController::getInstance().reset(); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/SchedulingAgentTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp new file mode 100644 index 0000000..898636a --- /dev/null +++ b/libminifi/test/unit/SchedulingAgentTests.cpp @@ -0,0 +1,36 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include <memory> +#include <string> +#include <vector> +#include "io/CRCStream.h" +#include "io/DataStream.h" +#include "../TestBase.h" +#include "processors/GetFile.h" +#include "processors/LogAttribute.h" +#include "SchedulingAgent.h" +#include "TimerDrivenSchedulingAgent.h" + + +TEST_CASE("TestTDAgent", "[test1]") { + std::shared_ptr<core::Processor> procA = std::make_shared<minifi::processors::GetFile>("getFile"); + std::shared_ptr<core::Processor> procB = std::make_shared<minifi::processors::LogAttribute>("logAttribute"); + // agent.run() +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/Site2SiteTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp index 2eef532..c17ffb3 100644 --- a/libminifi/test/unit/Site2SiteTests.cpp +++ b/libminifi/test/unit/Site2SiteTests.cpp @@ -146,7 +146,7 @@ TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") { REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES"); std::map<std::string, std::string> attributes; minifi::DataPacket packet(&protocol, transaction, attributes, payload); - REQUIRE(protocol.send(transactionID, &packet, nullptr, nullptr) == true); + REQUIRE(protocol.send(transactionID, &packet, nullptr, nullptr) == 0); collector->get_next_client_response(); collector->get_next_client_response(); std::string rx_payload = collector->get_next_client_response(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/TailFileTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp index eb33f8c..a7baec9 100644 --- a/libminifi/test/unit/TailFileTests.cpp +++ b/libminifi/test/unit/TailFileTests.cpp @@ -42,6 +42,80 @@ static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt"; static const char *STATE_FILE = "/tmp/minifi-state-file.txt"; TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << NEWLINE_FILE; + tmpfile.close(); + + TestController testController; + LogTestController::getInstance().setDebug<minifi::processors::TailFile>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc"); + + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE); + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE); + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n"); + + testController.runSession(plan, false); + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 4); + + LogTestController::getInstance().reset(); + + // Delete the test and state file. + std::remove(TMP_FILE); + std::remove(STATE_FILE); +} + +TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") { + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << NEWLINE_FILE; + tmpfile.close(); + + TestController testController; + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc"); + + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE); + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE); + + + testController.runSession(plan, false); + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 2); + + testController.runSession(plan, false); + + LogTestController::getInstance().reset(); + + // Delete the test and state file. + std::remove(TMP_FILE); + std::remove(STATE_FILE); +} +/* +TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { try { // Create and write to the test file std::ofstream tmpfile; @@ -76,7 +150,7 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { processor->addConnection(connection); - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); @@ -111,6 +185,7 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { std::remove(STATE_FILE); } + TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") { try { // Create and write to the test file @@ -143,7 +218,7 @@ TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") { processor->addConnection(connection); - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); @@ -176,3 +251,4 @@ TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") { std::remove(TMP_FILE); std::remove(STATE_FILE); } +*/ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/ThreadPoolTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index 670958a..cb9c536 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -19,6 +19,7 @@ #define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file #include <utility> #include <future> +#include <memory> #include "../TestBase.h" #include "utils/ThreadPool.h" @@ -26,6 +27,41 @@ bool function() { return true; } +class WorkerNumberExecutions : public utils::AfterExecute<int> { + public: + explicit WorkerNumberExecutions(int tasks) + : tasks(tasks), + runs(0) { + } + + explicit WorkerNumberExecutions(WorkerNumberExecutions && other) + : runs(std::move(other.runs)), + tasks(std::move(other.tasks)) { + } + + ~WorkerNumberExecutions() { + } + + virtual bool isFinished(const int &result) { + if (result > 0 && ++runs < tasks) { + return false; + } else { + return true; + } + } + virtual bool isCancelled(const int &result) { + return false; + } + + int getRuns() { + return runs; + } + + protected: + int runs; + int tasks; +}; + TEST_CASE("ThreadPoolTest1", "[TPT1]") { utils::ThreadPool<bool> pool(5); std::function<bool()> f_ex = function; @@ -36,3 +72,22 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") { fut.wait(); REQUIRE(true == fut.get()); } + +std::atomic<int> counter; + +int counterFunction() { + return ++counter; +} + +TEST_CASE("ThreadPoolTest2", "[TPT2]") { + counter = 0; + utils::ThreadPool<int> pool(5); + std::function<int()> f_ex = counterFunction; + std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(20)); + utils::Worker<int> functor(f_ex, "id", std::move(after_execute)); + pool.start(); + std::future<int> fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); + fut.wait(); + REQUIRE(20 == fut.get()); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/unit/YamlConfigurationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp b/libminifi/test/unit/YamlConfigurationTests.cpp index fc11d04..5c50de1 100644 --- a/libminifi/test/unit/YamlConfigurationTests.cpp +++ b/libminifi/test/unit/YamlConfigurationTests.cpp @@ -154,7 +154,7 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") { std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; rootFlowConfig->getConnections(connectionMap); - REQUIRE(1 == connectionMap.size()); + REQUIRE(2 == connectionMap.size()); // This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it for (auto it : connectionMap) { REQUIRE(it.second); @@ -188,4 +188,3 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") { REQUIRE_THROWS_AS(yamlConfig->getRoot(configYamlStream), std::invalid_argument); } } - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/main/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index 8c1a290..3fe3f0d 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -23,7 +23,7 @@ IF(POLICY CMP0048) CMAKE_POLICY(SET CMP0048 OLD) ENDIF(POLICY CMP0048) -include_directories(../libminifi/include ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/leveldb-1.18/include ../thirdparty/) +include_directories(../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/leveldb-1.18/include ../thirdparty/) find_package(Boost REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) @@ -45,10 +45,13 @@ include_directories(${OPENSSL_INCLUDE_DIR}) # Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and leveldb target_link_libraries(minifiexe minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES}) +set_property(TARGET minifiexe PROPERTY INTERPROCEDURAL_OPTIMIZATION True) set_target_properties(minifiexe PROPERTIES OUTPUT_NAME minifi) + + install(TARGETS minifiexe RUNTIME DESTINATION bin - COMPONENT bin) + COMPONENT bin) \ No newline at end of file