Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 76f06675c -> 9f98a407c
MINIFI-325: Set stream_factory_ from FlowConfiguration constructor This closes #101. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9f98a407 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9f98a407 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9f98a407 Branch: refs/heads/master Commit: 9f98a407c348645ac234a910b19c8c468d24f230 Parents: 76f0667 Author: Marc Parisi <phroc...@apache.org> Authored: Sat May 20 16:41:25 2017 -0400 Committer: Aldrin Piri <ald...@apache.org> Committed: Mon May 22 12:48:12 2017 -0400 ---------------------------------------------------------------------- libminifi/include/core/FlowConfiguration.h | 3 +- .../integration/ProvenanceReportingTest.cpp | 115 +++++++++++++++++++ .../test/resources/TestProvenanceReporting.yml | 72 ++++++++++++ 3 files changed, 189 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f98a407/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 2e704b5..b918aac 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -65,7 +65,8 @@ class FlowConfiguration : public CoreComponent { const std::string path) : CoreComponent(core::getClassName<FlowConfiguration>()), flow_file_repo_(flow_file_repo), - config_path_(path) { + config_path_(path), + stream_factory_(stream_factory) { controller_services_ = std::make_shared< core::controller::ControllerServiceMap>(); service_provider_ = std::make_shared< http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f98a407/libminifi/test/integration/ProvenanceReportingTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp new file mode 100644 index 0000000..ff5d563 --- /dev/null +++ b/libminifi/test/integration/ProvenanceReportingTest.cpp @@ -0,0 +1,115 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#include <cassert> +#include <chrono> +#include <fstream> +#include <utility> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/LogAppenders.h" +#include "core/logging/BaseLogger.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" + + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(2)); +} + +int main(int argc, char **argv) { + std::string test_file_location; + if (argc > 1) { + test_file_location = argv[1]; + } + mkdir("/tmp/aljs39/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + std::ostringstream oss; + std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< + logging::BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + logger->setLogLevel("debug"); + + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared< + minifi::io::StreamFactory>(configuration); + + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr< + core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, stream_factory, + configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = + std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared< + minifi::FlowController>(test_repo, test_flow_repo, configuration, + std::move(yaml_ptr), + DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, + configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>( + ptr.get()); + ptr.release(); + std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = + std::make_shared<org::apache::nifi::minifi::io::SocketContext>( + std::make_shared<minifi::Configure>()); + org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", + 10001, 1); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + std::string logs = oss.str(); + assert(logs.find("Add processor SiteToSiteProvenanceReportingTask into process group MiNiFi Flow") != std::string::npos); + + + rmdir("./content_repository"); + rmdir("/tmp/aljs39/"); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f98a407/libminifi/test/resources/TestProvenanceReporting.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestProvenanceReporting.yml b/libminifi/test/resources/TestProvenanceReporting.yml new file mode 100644 index 0000000..5b906a5 --- /dev/null +++ b/libminifi/test/resources/TestProvenanceReporting.yml @@ -0,0 +1,72 @@ +Flow Controller: + name: MiNiFi Flow + id: 2438e3c8-015a-1000-79ca-83af40ec1990 + +Processors: + - name: invoke + id: 2438e3c8-015a-1000-79ca-83af40ec1991 + class: org.apache.nifi.processors.standard.GetFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Input Directory: /tmp/aljs39 + Keep Source File: false + +Connections: + - name: TransferFilesToRPG + id: 1438e3c8-015a-1000-79ca-83af40ec1997 + source name: MockProcessorIto + source id: 1438e3c8-015a-1000-79ca-83af40ec1991 + source relationship name: success + destination name: NiFi Flow + destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + destination relationship name: success + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + +Controller Services: + - name: MockItLikeIts1995 + id: 2438e3c8-015a-1000-79ca-83af40ec1991 + class: MockControllerService + Properties: + Linked Services: + - value: ID + - name: ID + id: 2438e3c8-015a-1000-79ca-83af40ec1992 + class: MockControllerService + Properties: + Linked Services: + - name: MockItLikeItsWrong + id: 2438e3c8-015a-1000-79ca-83af40ec1993 + class: MockControllerSer + Properties: + Linked Services: + - value: ID + +Remote Processing Groups: + - name: NiFi Flow + id: 471deef6-2a6e-4a7d-912a-81cc17e3a208 + url: http://localhost:8080/nifi + timeout: 30 secs + yield period: 10 sec + Input Ports: + - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + name: From Node A + max concurrent tasks: 1 + Properties: + Port: 10001 + Host Name: localhost + +Provenance Reporting: + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + port: 10001 + host: localhost + port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + batch size: 100 \ No newline at end of file