MINIFICPP-601: Add scheduling information This closes #392.
Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2ecb997c Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2ecb997c Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2ecb997c Branch: refs/heads/master Commit: 2ecb997c72a5b6d80fb73d1eaf67b1e30cbf5c31 Parents: 7ac309b Author: Marc Parisi <[email protected]> Authored: Tue Aug 21 20:22:57 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Tue Aug 28 12:59:54 2018 -0400 ---------------------------------------------------------------------- libminifi/include/core/Processor.h | 9 +- libminifi/include/core/ProcessorConfig.h | 8 ++ .../include/core/state/nodes/AgentInformation.h | 58 ++++++++ libminifi/src/core/Processor.cpp | 5 +- libminifi/src/core/yaml/YamlConfiguration.cpp | 6 +- libminifi/test/unit/ManifestTests.cpp | 143 +++++++++++++++++++ .../unit/PropertyValidationAgentInfoTests.cpp | 120 ---------------- 7 files changed, 218 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 18fe2a0..6426f4c 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -53,9 +53,6 @@ namespace core { // Minimum scheduling period in Nano Second #define MINIMUM_SCHEDULING_NANOS 30000 -// Default yield period in second -#define DEFAULT_YIELD_PERIOD_SECONDS 1 - // Default penalization period in second #define DEFAULT_PENALIZATION_PERIOD_SECONDS 30 @@ -107,11 +104,11 @@ class __attribute__((visibility("default"))) Processor : public Connectable, pub } // Set Processor Run Duration in Nano Second void setRunDurationNano(uint64_t period) { - run_durantion_nano_ = period; + run_duration_nano_ = period; } // Get Processor Run Duration in Nano Second uint64_t getRunDurationNano(void) { - return (run_durantion_nano_); + return (run_duration_nano_); } // Set Processor yield period in MilliSecond void setYieldPeriodMsec(uint64_t period) { @@ -258,7 +255,7 @@ class __attribute__((visibility("default"))) Processor : public Connectable, pub // SchedulePeriod in Nano Seconds std::atomic<uint64_t> scheduling_period_nano_; // Run Duration in Nano Seconds - std::atomic<uint64_t> run_durantion_nano_; + std::atomic<uint64_t> run_duration_nano_; // Yield Period in Milliseconds std::atomic<uint64_t> yield_period_msec_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/ProcessorConfig.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h index c1d563e..aa8e18c 100644 --- a/libminifi/include/core/ProcessorConfig.h +++ b/libminifi/include/core/ProcessorConfig.h @@ -26,6 +26,14 @@ namespace nifi { namespace minifi { namespace core { + +#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN" +#define DEFAULT_SCHEDULING_PERIOD "1 sec" +#define DEFAULT_RUN_DURATION 0 +#define DEFAULT_MAX_CONCURRENT_TASKS 1 +// Default yield period in second +#define DEFAULT_YIELD_PERIOD_SECONDS 1 + struct ProcessorConfig { std::string id; std::string name; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/state/nodes/AgentInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h index baafbbe..48a451c 100644 --- a/libminifi/include/core/state/nodes/AgentInformation.h +++ b/libminifi/include/core/state/nodes/AgentInformation.h @@ -48,6 +48,7 @@ #include "agent/build_description.h" #include "core/ClassLoader.h" #include "../nodes/StateMonitor.h" +#include "core/ProcessorConfig.h" namespace org { namespace apache { @@ -284,6 +285,57 @@ class Bundles : public DeviceInformation { }; +class SchedulingDefaults : public DeviceInformation { + public: + SchedulingDefaults(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid) { + } + + SchedulingDefaults(const std::string &name) + : DeviceInformation(name, 0) { + } + + std::string getName() const { + return "schedulingDefaults"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + SerializedResponseNode schedulingDefaults; + schedulingDefaults.name = "schedulingDefaults"; + + SerializedResponseNode defaultSchedulingStrategy; + defaultSchedulingStrategy.name = "defaultSchedulingStrategy"; + defaultSchedulingStrategy.value = DEFAULT_SCHEDULING_STRATEGY; + + schedulingDefaults.children.push_back(defaultSchedulingStrategy); + + SerializedResponseNode defaultSchedulingPeriod; + defaultSchedulingPeriod.name = "defaultSchedulingPeriod"; + defaultSchedulingPeriod.value = DEFAULT_SCHEDULING_PERIOD; + + schedulingDefaults.children.push_back(defaultSchedulingPeriod); + + SerializedResponseNode defaultRunDuration; + defaultRunDuration.name = "defaultRunDurationNanos"; + defaultRunDuration.value = DEFAULT_RUN_DURATION; + + schedulingDefaults.children.push_back(defaultRunDuration); + + SerializedResponseNode defaultMaxConcurrentTasks; + defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks"; + defaultMaxConcurrentTasks.value = DEFAULT_MAX_CONCURRENT_TASKS; + + schedulingDefaults.children.push_back(defaultMaxConcurrentTasks); + + serialized.push_back(schedulingDefaults); + + return serialized; + } + +}; + /** * Justification and Purpose: Provides available extensions for the agent information block. */ @@ -491,6 +543,12 @@ class AgentManifest : public DeviceInformation { serialized.push_back(bundle); } + SchedulingDefaults defaults("schedulingDefaults", nullptr); + + for (auto defaultNode : defaults.serialize()) { + serialized.push_back(defaultNode); + } + return serialized; } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 7adf718..d49bd55 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -31,6 +31,7 @@ #include <functional> #include <utility> #include "Connection.h" +#include "core/ProcessorConfig.h" #include "core/Connectable.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -55,10 +56,10 @@ Processor::Processor(std::string name, uuid_t uuid) loss_tolerant_ = false; _triggerWhenEmpty = false; scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS; - run_durantion_nano_ = 0; + run_duration_nano_ = DEFAULT_RUN_DURATION; yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000; _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000; - max_concurrent_tasks_ = 1; + max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS; active_tasks_ = 0; yield_expiration_ = 0; incoming_connections_Iter = this->_incomingConnections.begin(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 902bd89..c850afc 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -125,14 +125,14 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: auto strategyNode = getOptionalField(&procNode, "scheduling strategy", - YAML::Node("EVENT_DRIVEN"), + YAML::Node(DEFAULT_SCHEDULING_STRATEGY), CONFIG_YAML_PROCESSORS_KEY); procCfg.schedulingStrategy = strategyNode.as<std::string>(); logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy); auto periodNode = getOptionalField(&procNode, "scheduling period", - YAML::Node("1 sec"), + YAML::Node(DEFAULT_SCHEDULING_PERIOD), CONFIG_YAML_PROCESSORS_KEY); procCfg.schedulingPeriod = periodNode.as<std::string>(); logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); @@ -153,7 +153,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: } if (procNode["run duration nanos"]) { - procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); + procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>(); logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/test/unit/ManifestTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ManifestTests.cpp b/libminifi/test/unit/ManifestTests.cpp new file mode 100644 index 0000000..09e278a --- /dev/null +++ b/libminifi/test/unit/ManifestTests.cpp @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <memory> + +#include "../../include/core/Processor.h" +#include "../../include/core/state/nodes/AgentInformation.h" +#include "../TestBase.h" +#include "io/ClientSocket.h" +#include "core/Processor.h" +#include "core/ClassLoader.h" + +TEST_CASE("Test Required", "[required]") { + minifi::state::response::ComponentManifest manifest("minifi-system"); + auto serialized = manifest.serialize(); + REQUIRE(serialized.size() > 0); + const auto &resp = serialized[0]; + REQUIRE(resp.children.size() > 0); + const auto &processors = resp.children[0]; + REQUIRE(processors.children.size() > 0); + const auto &proc_0 = processors.children[0]; + REQUIRE(proc_0.children.size() > 0); + const auto &prop_descriptors = proc_0.children[0]; + REQUIRE(prop_descriptors.children.size() > 0); + const auto &prop_0 = prop_descriptors.children[0]; + REQUIRE(prop_0.children.size() >= 3); + const auto &prop_0_required = prop_0.children[2]; + REQUIRE("required" == prop_0_required.name); + REQUIRE(!std::dynamic_pointer_cast<minifi::state::response::BoolValue>(prop_0_required.value.getValue())->getValue()); +} + +TEST_CASE("Test Valid Regex", "[validRegex]") { + minifi::state::response::ComponentManifest manifest("minifi-system"); + auto serialized = manifest.serialize(); + REQUIRE(serialized.size() > 0); + const auto &resp = serialized[0]; + REQUIRE(resp.children.size() > 0); + const auto &processors = resp.children[0]; + REQUIRE(processors.children.size() > 0); + const auto &proc_0 = processors.children[0]; + REQUIRE(proc_0.children.size() > 0); + const auto &prop_descriptors = proc_0.children[0]; + REQUIRE(prop_descriptors.children.size() > 0); + const auto &prop_0 = prop_descriptors.children[0]; + REQUIRE(prop_0.children.size() >= 3); + const auto &df = prop_0.children[3]; + REQUIRE("defaultValue" == df.name); + const auto &prop_0_valid_regex = prop_0.children[4]; + REQUIRE("validRegex" == prop_0_valid_regex.name); +} + +TEST_CASE("Test Relationships", "[rel1]") { + minifi::state::response::ComponentManifest manifest("minifi-system"); + auto serialized = manifest.serialize(); + REQUIRE(serialized.size() > 0); + const auto &resp = serialized[0]; + REQUIRE(resp.children.size() > 0); + const auto &processors = resp.children[0]; + REQUIRE(processors.children.size() > 0); + minifi::state::response::SerializedResponseNode proc_0; + for (const auto &node : processors.children) { + if ("org::apache::nifi::minifi::processors::PutFile" == node.name) { + proc_0 = node; + } + } + REQUIRE(proc_0.children.size() > 0); + const auto &relationships = proc_0.children[1]; + REQUIRE("supportedRelationships" == relationships.name); + // this is because they are now nested + REQUIRE("supportedRelationships" == relationships.children[0].name); + REQUIRE("name" == relationships.children[0].children[0].name); + REQUIRE("failure" == relationships.children[0].children[0].value.to_string()); + REQUIRE("description" == relationships.children[0].children[1].name); + + REQUIRE("success" == relationships.children[1].children[0].value.to_string()); + REQUIRE("description" == relationships.children[1].children[1].name); +} + +TEST_CASE("Test Dependent", "[dependent]") { + minifi::state::response::ComponentManifest manifest("minifi-system"); + auto serialized = manifest.serialize(); + REQUIRE(serialized.size() > 0); + const auto &resp = serialized[0]; + REQUIRE(resp.children.size() > 0); + const auto &processors = resp.children[0]; + REQUIRE(processors.children.size() > 0); + minifi::state::response::SerializedResponseNode proc_0; + for (const auto &node : processors.children) { + if ("org::apache::nifi::minifi::processors::PutFile" == node.name) { + proc_0 = node; + } + } + REQUIRE(proc_0.children.size() > 0); + const auto &prop_descriptors = proc_0.children[0]; + REQUIRE(prop_descriptors.children.size() > 0); + const auto &prop_0 = prop_descriptors.children[1]; + REQUIRE(prop_0.children.size() >= 3); + REQUIRE("defaultValue" == prop_0.children[3].name); + REQUIRE("validRegex" == prop_0.children[4].name); + const auto &prop_0_dependent_0 = prop_descriptors.children[2]; + REQUIRE("Directory" == prop_0_dependent_0.name); +} + +TEST_CASE("Test Scheduling Defaults", "[schedDef]") { + minifi::state::response::AgentManifest manifest("minifi-system"); + auto serialized = manifest.serialize(); + REQUIRE(serialized.size() > 0); + minifi::state::response::SerializedResponseNode proc_0; + for (const auto &node : serialized) { + if ("schedulingDefaults" == node.name) { + proc_0 = node; + } + } + REQUIRE(proc_0.children.size() == 4); + for (const auto &child : proc_0.children) { + if ("defaultMaxConcurrentTasks" == child.name) { + REQUIRE("1" == child.value.to_string()); + } else if ("defaultRunDurationNanos" == child.name) { + REQUIRE("0" == child.value.to_string()); + } else if ("defaultSchedulingPeriod" == child.name) { + REQUIRE("1 sec" == child.value.to_string()); + } else if ("defaultSchedulingStrategy" == child.name) { + REQUIRE("TIMER_DRIVEN" == child.value.to_string()); + } else { + FAIL("UNKNOWQN NODE"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp b/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp deleted file mode 100644 index 31f9b7b..0000000 --- a/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <memory> - -#include "../../include/core/Processor.h" -#include "../../include/core/state/nodes/AgentInformation.h" -#include "../TestBase.h" -#include "io/ClientSocket.h" -#include "core/Processor.h" -#include "core/ClassLoader.h" - -TEST_CASE("Test Required", "[required]") { - minifi::state::response::ComponentManifest manifest("minifi-system"); - auto serialized = manifest.serialize(); - REQUIRE(serialized.size() > 0); - const auto &resp = serialized[0]; - REQUIRE(resp.children.size() > 0); - const auto &processors = resp.children[0]; - REQUIRE(processors.children.size() > 0); - const auto &proc_0 = processors.children[0]; - REQUIRE(proc_0.children.size() > 0); - const auto &prop_descriptors = proc_0.children[0]; - REQUIRE(prop_descriptors.children.size() > 0); - const auto &prop_0 = prop_descriptors.children[0]; - REQUIRE(prop_0.children.size() >= 3); - const auto &prop_0_required = prop_0.children[2]; - REQUIRE("required" == prop_0_required.name); - REQUIRE(!std::dynamic_pointer_cast<minifi::state::response::BoolValue>(prop_0_required.value.getValue())->getValue()); -} - -TEST_CASE("Test Valid Regex", "[validRegex]") { - minifi::state::response::ComponentManifest manifest("minifi-system"); - auto serialized = manifest.serialize(); - REQUIRE(serialized.size() > 0); - const auto &resp = serialized[0]; - REQUIRE(resp.children.size() > 0); - const auto &processors = resp.children[0]; - REQUIRE(processors.children.size() > 0); - const auto &proc_0 = processors.children[0]; - REQUIRE(proc_0.children.size() > 0); - const auto &prop_descriptors = proc_0.children[0]; - REQUIRE(prop_descriptors.children.size() > 0); - const auto &prop_0 = prop_descriptors.children[0]; - REQUIRE(prop_0.children.size() >= 3); - const auto &df = prop_0.children[3]; - REQUIRE("defaultValue" == df.name); - const auto &prop_0_valid_regex = prop_0.children[4]; - REQUIRE("validRegex" == prop_0_valid_regex.name); -} - -TEST_CASE("Test Relationships", "[rel1]") { - minifi::state::response::ComponentManifest manifest("minifi-system"); - auto serialized = manifest.serialize(); - REQUIRE(serialized.size() > 0); - const auto &resp = serialized[0]; - REQUIRE(resp.children.size() > 0); - const auto &processors = resp.children[0]; - REQUIRE(processors.children.size() > 0); - minifi::state::response::SerializedResponseNode proc_0; - for (const auto &node : processors.children) { - if ("org::apache::nifi::minifi::processors::PutFile" == node.name) { - proc_0 = node; - } - } - REQUIRE(proc_0.children.size() > 0); - const auto &relationships = proc_0.children[1]; - REQUIRE("supportedRelationships" == relationships.name); - // this is because they are now nested - REQUIRE("supportedRelationships" == relationships.children[0].name); - REQUIRE("name" == relationships.children[0].children[0].name); - REQUIRE("failure" == relationships.children[0].children[0].value.to_string()); - REQUIRE("description" == relationships.children[0].children[1].name); - - REQUIRE("success" == relationships.children[1].children[0].value.to_string()); - REQUIRE("description" == relationships.children[1].children[1].name); - - -} - - -TEST_CASE("Test Dependent", "[dependent]") { - minifi::state::response::ComponentManifest manifest("minifi-system"); - auto serialized = manifest.serialize(); - REQUIRE(serialized.size() > 0); - const auto &resp = serialized[0]; - REQUIRE(resp.children.size() > 0); - const auto &processors = resp.children[0]; - REQUIRE(processors.children.size() > 0); - minifi::state::response::SerializedResponseNode proc_0; - for (const auto &node : processors.children) { - if ("org::apache::nifi::minifi::processors::PutFile" == node.name) { - proc_0 = node; - } - } - REQUIRE(proc_0.children.size() > 0); - const auto &prop_descriptors = proc_0.children[0]; - REQUIRE(prop_descriptors.children.size() > 0); - const auto &prop_0 = prop_descriptors.children[1]; - REQUIRE(prop_0.children.size() >= 3); - REQUIRE("defaultValue" == prop_0.children[3].name); - REQUIRE("validRegex" == prop_0.children[4].name); - const auto &prop_0_dependent_0 = prop_descriptors.children[2]; - REQUIRE("Directory" == prop_0_dependent_0.name); -}
