This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit f54d498a2d5dd716131769561cd790b416579e8e Author: Andrei Sekretenko <asekrete...@apache.org> AuthorDate: Thu Jul 16 20:45:43 2020 +0200 Implemented offer constraints filter with Exists/NotExists predicates. This patch implements an offer filtering object that supports the Exists/NotExists offer constraints, and adds it into the allocator interface. More constraints will be added to this filter in further patches. Review: https://reviews.apache.org/r/72741 --- include/mesos/allocator/allocator.hpp | 43 +++ src/CMakeLists.txt | 1 + src/Makefile.am | 1 + .../allocator/mesos/offer_constraints_filter.cpp | 360 +++++++++++++++++++++ src/master/master.cpp | 7 +- 5 files changed, 409 insertions(+), 3 deletions(-) diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp index 065597f..05d0e9c 100644 --- a/include/mesos/allocator/allocator.hpp +++ b/include/mesos/allocator/allocator.hpp @@ -22,6 +22,7 @@ // ONLY USEFUL AFTER RUNNING PROTOC. #include <mesos/allocator/allocator.pb.h> +#include <mesos/scheduler/scheduler.pb.h> #include <mesos/maintenance/maintenance.hpp> @@ -68,6 +69,43 @@ struct Options }; +namespace internal { +class OfferConstraintsFilterImpl; + +} // namespace internal { + + +class OfferConstraintsFilter +{ +public: + static Try<OfferConstraintsFilter> create( + scheduler::OfferConstraints&& constraints); + + OfferConstraintsFilter() = delete; + + // Definitions of these need `OfferConstraintsFilterImpl` to be a complete + // type. + OfferConstraintsFilter(OfferConstraintsFilter&&); + OfferConstraintsFilter& operator=(OfferConstraintsFilter&&); + ~OfferConstraintsFilter(); + + /** + * Returns `true` if the allocator is allowed to offer resoureces + * on the agent to the framework's role, and `false` otherwise. + */ + bool isAgentExcluded( + const std::string& role, + const SlaveInfo& agentInfo) const; + + // TODO(asekretenko): Add a method for filtering `Resources` on an agent. + +private: + std::unique_ptr<internal::OfferConstraintsFilterImpl> impl; + + OfferConstraintsFilter(internal::OfferConstraintsFilterImpl&& impl_); +}; + + /** * Per-framework allocator-specific options that are not part of * `FrameworkInfo`. @@ -78,6 +116,11 @@ struct FrameworkOptions * The set of roles for which the allocator should not generate offers. */ std::set<std::string> suppressedRoles; + + /** + * The internal representation of framework's offer constraints. + */ + Option<OfferConstraintsFilter> offerConstraintsFilter; }; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a976dc1..668a506 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -429,6 +429,7 @@ set(MASTER_SRC master/allocator/allocator.cpp master/allocator/mesos/hierarchical.cpp master/allocator/mesos/metrics.cpp + master/allocator/mesos/offer_constraints_filter.cpp master/allocator/mesos/sorter/drf/metrics.cpp master/allocator/mesos/sorter/drf/sorter.cpp master/allocator/mesos/sorter/random/sorter.cpp diff --git a/src/Makefile.am b/src/Makefile.am index 6d68ed0..a91678e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1123,6 +1123,7 @@ libmesos_no_3rdparty_la_SOURCES += \ master/allocator/mesos/hierarchical.hpp \ master/allocator/mesos/metrics.cpp \ master/allocator/mesos/metrics.hpp \ + master/allocator/mesos/offer_constraints_filter.cpp \ master/allocator/mesos/sorter/drf/metrics.cpp \ master/allocator/mesos/sorter/drf/metrics.hpp \ master/allocator/mesos/sorter/drf/sorter.cpp \ diff --git a/src/master/allocator/mesos/offer_constraints_filter.cpp b/src/master/allocator/mesos/offer_constraints_filter.cpp new file mode 100644 index 0000000..ef8a948 --- /dev/null +++ b/src/master/allocator/mesos/offer_constraints_filter.cpp @@ -0,0 +1,360 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include <stout/option.hpp> +#include <stout/try.hpp> +#include <stout/variant.hpp> + +#include <mesos/allocator/allocator.hpp> +#include <mesos/attributes.hpp> + +using std::string; +using std::vector; +using std::unique_ptr; +using std::unordered_map; + +using ::mesos::scheduler::AttributeConstraint; +using ::mesos::scheduler::OfferConstraints; + +namespace mesos { +namespace allocator { + +namespace internal { + +using Selector = AttributeConstraint::Selector; + + +static Option<Error> validate(const Selector& selector) +{ + switch(selector.selector_case()) { + case Selector::kPseudoattributeType: + switch (selector.pseudoattribute_type()) { + case Selector::HOSTNAME: + case Selector::REGION: + case Selector::ZONE: + return None(); + case Selector::UNKNOWN: + break; + } + + return Error("Unknown pseudoattribute type"); + + case Selector::kAttributeName: + return None(); + + case Selector::SELECTOR_NOT_SET: + return Error( + "Exactly one of 'AttributeConstraint::Selector::name' or" + " 'AttributeConstraint::Selector::pseudoattribute_type' must be set"); + } + + UNREACHABLE(); +} + + +class AttributeConstraintPredicate +{ +public: + bool apply(const Nothing& _) const { return apply_(_); } + bool apply(const string& str) const { return apply_(str); } + bool apply(const Attribute& attr) const { return apply_(attr); } + + static Try<AttributeConstraintPredicate> create( + AttributeConstraint::Predicate&& predicate) + { + using Self = AttributeConstraintPredicate; + + switch (predicate.predicate_case()) { + case AttributeConstraint::Predicate::kExists: + return Self(Exists{}); + case AttributeConstraint::Predicate::kNotExists: + return Self(NotExists{}); + case AttributeConstraint::Predicate::PREDICATE_NOT_SET: + return Error("Unknown predicate type"); + } + UNREACHABLE(); + } + +private: + // The following helper structs apply the predicate using + // overloads for the three cases: + // (1) Nothing -> non existent (pseudo) attribute + // (2) string -> pseudo attribute value + // (3) Attribute -> named attribute value + struct Exists + { + bool apply(const Nothing&) const { return false; } + bool apply(const string&) const { return true; } + bool apply(const Attribute&) const { return true; } + }; + + struct NotExists + { + bool apply(const Nothing&) const { return true; } + bool apply(const string&) const { return false; } + bool apply(const Attribute&) const { return false; } + }; + + // TODO(asekretenko): Introduce offer constraints for attribute equality + // (MESOS-10172) and regex match (MESOS-10173). + using Predicate = Variant<Nothing, Exists, NotExists>; + + Predicate predicate; + + AttributeConstraintPredicate(Predicate&& p) : predicate(std::move(p)){}; + + template <class T> + bool apply_(const T& attribute) const + { + return predicate.visit( + [](const Nothing& p) -> bool { + LOG(FATAL) << "Predciate not initialized properly"; + UNREACHABLE(); + }, + [&](const Exists& p) { return p.apply(attribute); }, + [&](const NotExists& p) { return p.apply(attribute); }); + } +}; + + +class AttributeConstraintEvaluator +{ +public: + bool evaluate(const SlaveInfo& info) const + { + switch (selector.selector_case()) { + case Selector::kAttributeName: { + const string& name = selector.attribute_name(); + const auto attr = std::find_if( + info.attributes().cbegin(), + info.attributes().cend(), + [&name](const Attribute& a) { return a.name() == name; }); + + return attr == info.attributes().cend() ? predicate.apply(Nothing()) + : predicate.apply(*attr); + } + + case Selector::kPseudoattributeType: + switch (selector.pseudoattribute_type()) { + case Selector::HOSTNAME: + return predicate.apply(info.hostname()); + + case Selector::REGION: + return info.has_domain() && info.domain().has_fault_domain() + ? predicate.apply(info.domain().fault_domain().region().name()) + : predicate.apply(Nothing()); + + case Selector::ZONE: + return info.has_domain() && info.domain().has_fault_domain() + ? predicate.apply(info.domain().fault_domain().zone().name()) + : predicate.apply(Nothing()); + + case Selector::UNKNOWN: + LOG(FATAL) << "Unknown pseudoattribute value passed validation"; + } + + UNREACHABLE(); + + case Selector::SELECTOR_NOT_SET: + LOG(FATAL) << "'AttributeConstraint::Selector::selector' oneof that" + " has no known value set passed validation"; + } + + UNREACHABLE(); + } + + static Try<AttributeConstraintEvaluator> create( + AttributeConstraint&& constraint) + { + Option<Error> error = validate(constraint.selector()); + if (error.isSome()) { + return *error; + } + + Try<AttributeConstraintPredicate> predicate = + AttributeConstraintPredicate::create( + std::move(*constraint.mutable_predicate())); + + if (predicate.isError()) { + return Error(predicate.error()); + } + + return AttributeConstraintEvaluator{ + std::move(*constraint.mutable_selector()), std::move(*predicate)}; + } + +private: + Selector selector; + AttributeConstraintPredicate predicate; + + AttributeConstraintEvaluator( + Selector&& selector_, + AttributeConstraintPredicate&& predicate_) + : selector(std::move(selector_)), + predicate(std::move(predicate_)) + {} +}; + + +class OfferConstraintsFilterImpl +{ + using Group = vector<AttributeConstraintEvaluator>; + +public: + OfferConstraintsFilterImpl( + unordered_map<string, vector<Group>>&& expressions_) + : expressions(std::move(expressions_)) + {} + + bool isAgentExcluded(const std::string& role, const SlaveInfo& info) const + { + auto roleConstraintsExpression = expressions.find(role); + if (roleConstraintsExpression == expressions.end()) { + return false; + } + + // TODO(asekretenko): This method evaluates the constraints in the order in + // which they have been passed by the scheduler. Given that agents are + // seldom added or have their (pseudo)attributes changed, tracking + // match/mismatch frequency and runtime cost of the constraints, and + // reordering the expression accordingly (so that the cheapest groups that + // usually match come first, and the most expensive that usually don't come + // last) could potentially help speed up this method. + + return !std::any_of( + roleConstraintsExpression->second.cbegin(), + roleConstraintsExpression->second.cend(), + [&info](const Group& group) { + return std::all_of( + group.cbegin(), + group.cend(), + [&info](const AttributeConstraintEvaluator& e) { + return e.evaluate(info); + }); + }); + } + + static Try<OfferConstraintsFilterImpl> create(OfferConstraints&& constraints) + { + // TODO(asekretenko): This method performs a dumb 1:1 translation of + // `AttributeConstraint`s without any reordering; this leaves room for + // a number of potential optimizations such as: + // - deactivating a framework with no constraint groups + // - constructing no filter if there is a single empty group + // - deduplicating constraints and groups + // - reordering constraints and groups so that the potentially cheaper ones + // come first + using Group = vector<AttributeConstraintEvaluator>; + unordered_map<string, vector<Group>> expressions; + + for (auto& pair : *constraints.mutable_role_constraints()) { + const string& role = pair.first; + OfferConstraints::RoleConstraints& roleConstraints = pair.second; + + if (roleConstraints.groups().empty()) { + return Error( + "'OfferConstraints::role_constraints' has an empty " + "'RoleConstraints::groups' for role " + + role); + } + + vector<Group>& groups = expressions[role]; + + for (OfferConstraints::RoleConstraints::Group& group_ : + *roleConstraints.mutable_groups()) { + if (group_.attribute_constraints().empty()) { + return Error( + "'OfferConstraints::RoleConstraints::groups' for role " + role + + "contains an empty RoleConstraints::Group"); + } + + groups.emplace_back(); + Group& group = groups.back(); + + for (AttributeConstraint& constraint : + *group_.mutable_attribute_constraints()) { + Try<AttributeConstraintEvaluator> evaluator = + AttributeConstraintEvaluator::create(std::move(constraint)); + + if (evaluator.isError()) { + return Error( + "A role " + role + + " has an invalid 'AttributeConstraint': " + evaluator.error()); + } + + group.emplace_back(std::move(*evaluator)); + } + } + } + + return OfferConstraintsFilterImpl(std::move(expressions)); + } + +private: + unordered_map<string, vector<Group>> expressions; +}; + +} // namespace internal { + + +using internal::OfferConstraintsFilterImpl; + + +Try<OfferConstraintsFilter> OfferConstraintsFilter::create( + OfferConstraints&& constraints) +{ + Try<OfferConstraintsFilterImpl> impl = + OfferConstraintsFilterImpl::create(std::move(constraints)); + + if (impl.isError()) { + return Error(impl.error()); + } + + return OfferConstraintsFilter(std::move(*impl)); +} + + +OfferConstraintsFilter::OfferConstraintsFilter( + OfferConstraintsFilterImpl&& impl_) + : impl(new OfferConstraintsFilterImpl(std::move(impl_))) +{} + + +OfferConstraintsFilter::OfferConstraintsFilter(OfferConstraintsFilter&&) = + default; + + +OfferConstraintsFilter& OfferConstraintsFilter::operator=( + OfferConstraintsFilter&&) = default; + + +OfferConstraintsFilter::~OfferConstraintsFilter() = default; + + +bool OfferConstraintsFilter::isAgentExcluded( + const std::string& role, + const SlaveInfo& info) const +{ + return CHECK_NOTNULL(impl)->isAgentExcluded(role, info); +} + +} // namespace allocator { +} // namespace mesos { diff --git a/src/master/master.cpp b/src/master/master.cpp index 2769349..09efa0f 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2699,7 +2699,7 @@ void Master::subscribe( http, std::move(frameworkInfo), subscribe.force(), - FrameworkOptions{std::move(suppressedRoles)}, + FrameworkOptions{std::move(suppressedRoles), None()}, lambda::_1)); } @@ -2941,7 +2941,7 @@ void Master::subscribe( from, std::move(frameworkInfo), subscribe.force(), - FrameworkOptions{std::move(suppressedRoles)}, + FrameworkOptions{std::move(suppressedRoles), None()}, lambda::_1)); } @@ -3242,7 +3242,7 @@ Future<process::http::Response> Master::updateFramework( make_move_iterator(call.mutable_suppressed_roles()->end())); updateFramework( - framework, call.framework_info(), {std::move(suppressedRoles)}); + framework, call.framework_info(), {std::move(suppressedRoles), None()}); sendFrameworkUpdates(*framework); @@ -9904,6 +9904,7 @@ void Master::addFramework( CHECK(!frameworks.registered.contains(framework->id())) << "Framework " << *framework << " already exists!"; + // TODO(asekretenko): Print some information about the OfferConstraintsFilter. LOG(INFO) << "Adding framework " << *framework << " with roles " << stringify(allocatorOptions.suppressedRoles) << " suppressed";