Refactored authorization logic in the agent.

This patch makes uses of the new `ObjectApprovers` class which greatly
simplifies the logic for constructing and using authorization.

Review: https://reviews.apache.org/r/65313


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/83dd7f87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/83dd7f87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/83dd7f87

Branch: refs/heads/master
Commit: 83dd7f872499f1edc7a133f89719b210b152d54c
Parents: dc1e188
Author: Alexander Rojas <alexander.ro...@gmail.com>
Authored: Wed Jan 24 18:17:20 2018 +0100
Committer: Alexander Rojas <alexan...@mesosphere.io>
Committed: Wed Mar 14 18:09:20 2018 +0100

----------------------------------------------------------------------
 src/slave/http.cpp  | 1699 ++++++++++++++++++++--------------------------
 src/slave/http.hpp  |   52 +-
 src/slave/slave.cpp |   20 +-
 3 files changed, 787 insertions(+), 984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/83dd7f87/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 7d7fa2b..a8ffbd1 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -80,6 +80,27 @@
 using mesos::agent::ProcessIO;
 
 using mesos::authorization::createSubject;
+using mesos::authorization::VIEW_CONTAINER;
+using mesos::authorization::VIEW_FLAGS;
+using mesos::authorization::VIEW_FRAMEWORK;
+using mesos::authorization::VIEW_TASK;
+using mesos::authorization::VIEW_EXECUTOR;
+using mesos::authorization::VIEW_ROLE;
+using mesos::authorization::SET_LOG_LEVEL;
+using mesos::authorization::ATTACH_CONTAINER_INPUT;
+using mesos::authorization::ATTACH_CONTAINER_OUTPUT;
+using mesos::authorization::LAUNCH_NESTED_CONTAINER;
+using mesos::authorization::LAUNCH_NESTED_CONTAINER_SESSION;
+using mesos::authorization::LAUNCH_STANDALONE_CONTAINER;
+using mesos::authorization::WAIT_NESTED_CONTAINER;
+using mesos::authorization::WAIT_STANDALONE_CONTAINER;
+using mesos::authorization::VIEW_STANDALONE_CONTAINER;
+using mesos::authorization::KILL_NESTED_CONTAINER;
+using mesos::authorization::KILL_STANDALONE_CONTAINER;
+using mesos::authorization::REMOVE_NESTED_CONTAINER;
+using mesos::authorization::REMOVE_STANDALONE_CONTAINER;
+using mesos::authorization::MODIFY_RESOURCE_PROVIDER_CONFIG;
+using mesos::authorization::PRUNE_IMAGES;
 
 using mesos::internal::recordio::Reader;
 
@@ -175,10 +196,10 @@ using process::http::Request;
 struct ExecutorWriter
 {
   ExecutorWriter(
-      const Owned<ObjectApprover>& taskApprover,
+      const Owned<ObjectApprovers>& approvers,
       const Executor* executor,
       const Framework* framework)
-    : taskApprover_(taskApprover),
+    : approvers_(approvers),
       executor_(executor),
       framework_(framework) {}
 
@@ -210,7 +231,7 @@ struct ExecutorWriter
 
     writer->field("tasks", [this](JSON::ArrayWriter* writer) {
       foreachvalue (Task* task, executor_->launchedTasks) {
-        if (!approveViewTask(taskApprover_, *task, framework_->info)) {
+        if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
           continue;
         }
 
@@ -220,7 +241,7 @@ struct ExecutorWriter
 
     writer->field("queued_tasks", [this](JSON::ArrayWriter* writer) {
       foreachvalue (const TaskInfo& task, executor_->queuedTasks) {
-        if (!approveViewTaskInfo(taskApprover_, task, framework_->info)) {
+        if (!approvers_->approved<VIEW_TASK>(task, framework_->info)) {
           continue;
         }
 
@@ -230,7 +251,7 @@ struct ExecutorWriter
 
     writer->field("completed_tasks", [this](JSON::ArrayWriter* writer) {
       foreach (const std::shared_ptr<Task>& task, executor_->completedTasks) {
-        if (!approveViewTask(taskApprover_, *task, framework_->info)) {
+        if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
           continue;
         }
 
@@ -240,7 +261,7 @@ struct ExecutorWriter
       // NOTE: We add 'terminatedTasks' to 'completed_tasks' for
       // simplicity.
       foreachvalue (Task* task, executor_->terminatedTasks) {
-        if (!approveViewTask(taskApprover_, *task, framework_->info)) {
+        if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
           continue;
         }
 
@@ -249,7 +270,7 @@ struct ExecutorWriter
     });
   }
 
-  const Owned<ObjectApprover>& taskApprover_;
+  const Owned<ObjectApprovers>& approvers_;
   const Executor* executor_;
   const Framework* framework_;
 };
@@ -260,11 +281,9 @@ struct ExecutorWriter
 struct FrameworkWriter
 {
   FrameworkWriter(
-      const Owned<ObjectApprover>& taskApprover,
-      const Owned<ObjectApprover>& executorApprover,
+      const Owned<ObjectApprovers>& approvers,
       const Framework* framework)
-    : taskApprover_(taskApprover),
-      executorApprover_(executorApprover),
+    : approvers_(approvers),
       framework_(framework) {}
 
   void operator()(JSON::ObjectWriter* writer) const
@@ -293,13 +312,13 @@ struct FrameworkWriter
 
     writer->field("executors", [this](JSON::ArrayWriter* writer) {
       foreachvalue (Executor* executor, framework_->executors) {
-        if (!approveViewExecutorInfo(
-                executorApprover_, executor->info, framework_->info)) {
+        if (!approvers_->approved<VIEW_EXECUTOR>(
+                executor->info, framework_->info)) {
           continue;
         }
 
         ExecutorWriter executorWriter(
-            taskApprover_,
+            approvers_,
             executor,
             framework_);
 
@@ -311,13 +330,13 @@ struct FrameworkWriter
         "completed_executors", [this](JSON::ArrayWriter* writer) {
           foreach (
               const Owned<Executor>& executor, framework_->completedExecutors) 
{
-            if (!approveViewExecutorInfo(
-                executorApprover_, executor->info, framework_->info)) {
+            if (!approvers_->approved<VIEW_EXECUTOR>(
+                    executor->info, framework_->info)) {
               continue;
             }
 
             ExecutorWriter executorWriter(
-                taskApprover_,
+                approvers_,
                 executor.get(),
                 framework_);
 
@@ -326,8 +345,7 @@ struct FrameworkWriter
         });
   }
 
-  const Owned<ObjectApprover>& taskApprover_;
-  const Owned<ObjectApprover>& executorApprover_;
+  const Owned<ObjectApprovers>& approvers_;
   const Framework* framework_;
 };
 
@@ -936,33 +954,20 @@ Future<Response> Http::getFlags(
 
   LOG(INFO) << "Processing GET_FLAGS call";
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_FLAGS);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return approver.then(defer(slave->self(),
-      [this, acceptType](
-          const Owned<ObjectApprover>& approver) -> Future<Response> {
-        Try<bool> approved = approver->approved(ObjectApprover::Object());
-
-        if (approved.isError()) {
-          return InternalServerError(approved.error());
-        } else if (!approved.get()) {
-          return Forbidden();
-        }
+  return ObjectApprovers::create(slave->authorizer, principal, {VIEW_FLAGS})
+    .then(defer(
+        slave->self(),
+        [this, acceptType](
+            const Owned<ObjectApprovers>& approvers) -> Response {
+          if (!approvers->approved<VIEW_FLAGS>()) {
+            return Forbidden();
+          }
 
-        return OK(
-            serialize(
-                acceptType, evolve<v1::agent::Response::GET_FLAGS>(_flags())),
-            stringify(acceptType));
-      }));
+          return OK(
+              serialize(
+                  acceptType, 
evolve<v1::agent::Response::GET_FLAGS>(_flags())),
+              stringify(acceptType));
+        }));
 }
 
 
@@ -1083,34 +1088,18 @@ Future<Response> Http::setLoggingLevel(
 
   LOG(INFO) << "Processing SET_LOGGING_LEVEL call for level " << level;
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::SET_LOG_LEVEL);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return approver.then(
-      [level, duration](
-          const Owned<ObjectApprover>& approver) -> Future<Response> {
-        Try<bool> approved = approver->approved((ObjectApprover::Object()));
-
-        if (approved.isError()) {
-          return InternalServerError(approved.error());
-        } else if (!approved.get()) {
-          return Forbidden();
-        }
+  return ObjectApprovers::create(slave->authorizer, principal, {SET_LOG_LEVEL})
+    .then([level, duration](
+        const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+      if (!approvers->approved<SET_LOG_LEVEL>()) {
+        return Forbidden();
+      }
 
-        return dispatch(
-            process::logging(), &Logging::set_level, level, duration)
-          .then([]() -> Response {
-            return OK();
-          });
-      });
+      return dispatch(process::logging(), &Logging::set_level, level, duration)
+        .then([]() -> Response {
+          return OK();
+        });
+    });
 }
 
 
@@ -1274,238 +1263,179 @@ Future<Response> Http::state(
     return ServiceUnavailable("Agent has not finished recovery");
   }
 
-  // Retrieve `ObjectApprover`s for authorizing frameworks and tasks.
-  Future<Owned<ObjectApprover>> frameworksApprover;
-  Future<Owned<ObjectApprover>> tasksApprover;
-  Future<Owned<ObjectApprover>> executorsApprover;
-  Future<Owned<ObjectApprover>> flagsApprover;
-  Future<Owned<ObjectApprover>> rolesApprover;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    frameworksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_FRAMEWORK);
-
-    tasksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_TASK);
-
-    executorsApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_EXECUTOR);
-
-    flagsApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_FLAGS);
-
-    rolesApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_ROLE);
-  } else {
-    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    flagsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    rolesApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return collect(
-      frameworksApprover,
-      tasksApprover,
-      executorsApprover,
-      flagsApprover,
-      rolesApprover)
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_FLAGS, VIEW_ROLE})
     .then(defer(
         slave->self(),
-        [this, request](const tuple<Owned<ObjectApprover>,
-                                    Owned<ObjectApprover>,
-                                    Owned<ObjectApprover>,
-                                    Owned<ObjectApprover>,
-                                    Owned<ObjectApprover>>& approvers)
-          -> Response {
-      // This lambda is consumed before the outer lambda
-      // returns, hence capture by reference is fine here.
-      auto state = [this, &approvers](JSON::ObjectWriter* writer) {
-        // Get approver from tuple.
-        Owned<ObjectApprover> frameworksApprover;
-        Owned<ObjectApprover> tasksApprover;
-        Owned<ObjectApprover> executorsApprover;
-        Owned<ObjectApprover> flagsApprover;
-        Owned<ObjectApprover> rolesApprover;
-        tie(frameworksApprover,
-            tasksApprover,
-            executorsApprover,
-            flagsApprover,
-            rolesApprover) = approvers;
-
-        writer->field("version", MESOS_VERSION);
-
-        if (build::GIT_SHA.isSome()) {
-          writer->field("git_sha", build::GIT_SHA.get());
-        }
+        [this, request](const Owned<ObjectApprovers>& approvers) -> Response {
+          // This lambda is consumed before the outer lambda
+          // returns, hence capture by reference is fine here.
+          auto state = [this, &approvers](JSON::ObjectWriter* writer) {
+            writer->field("version", MESOS_VERSION);
+
+            if (build::GIT_SHA.isSome()) {
+              writer->field("git_sha", build::GIT_SHA.get());
+            }
 
-        if (build::GIT_BRANCH.isSome()) {
-          writer->field("git_branch", build::GIT_BRANCH.get());
-        }
+            if (build::GIT_BRANCH.isSome()) {
+              writer->field("git_branch", build::GIT_BRANCH.get());
+            }
 
-        if (build::GIT_TAG.isSome()) {
-          writer->field("git_tag", build::GIT_TAG.get());
-        }
+            if (build::GIT_TAG.isSome()) {
+              writer->field("git_tag", build::GIT_TAG.get());
+            }
 
-        writer->field("build_date", build::DATE);
-        writer->field("build_time", build::TIME);
-        writer->field("build_user", build::USER);
-        writer->field("start_time", slave->startTime.secs());
+            writer->field("build_date", build::DATE);
+            writer->field("build_time", build::TIME);
+            writer->field("build_user", build::USER);
+            writer->field("start_time", slave->startTime.secs());
 
-        writer->field("id", slave->info.id().value());
-        writer->field("pid", string(slave->self()));
-        writer->field("hostname", slave->info.hostname());
-        writer->field("capabilities", 
slave->capabilities.toRepeatedPtrField());
+            writer->field("id", slave->info.id().value());
+            writer->field("pid", string(slave->self()));
+            writer->field("hostname", slave->info.hostname());
+            writer->field(
+                "capabilities",
+                slave->capabilities.toRepeatedPtrField());
 
-        if (slave->info.has_domain()) {
-          writer->field("domain", slave->info.domain());
-        }
+            if (slave->info.has_domain()) {
+              writer->field("domain", slave->info.domain());
+            }
 
-        const Resources& totalResources = slave->totalResources;
-
-        writer->field("resources", totalResources);
-        writer->field(
-            "reserved_resources",
-            [&totalResources, &rolesApprover](JSON::ObjectWriter* writer) {
-              foreachpair (const string& role,
-                           const Resources& resources,
-                           totalResources.reservations()) {
-                if (approveViewRole(rolesApprover, role)) {
-                  writer->field(role, resources);
-                }
-              }
-            });
+            const Resources& totalResources = slave->totalResources;
 
-        writer->field("unreserved_resources", totalResources.unreserved());
-
-        writer->field(
-            "reserved_resources_full",
-            [&totalResources, &rolesApprover](JSON::ObjectWriter* writer) {
-              foreachpair (const string& role,
-                           const Resources& resources,
-                           totalResources.reservations()) {
-                if (approveViewRole(rolesApprover, role)) {
-                  writer->field(role, [&resources](JSON::ArrayWriter* writer) {
-                    foreach (Resource resource, resources) {
-                      convertResourceFormat(&resource, ENDPOINT);
-                      writer->element(JSON::Protobuf(resource));
+            writer->field("resources", totalResources);
+            writer->field(
+                "reserved_resources",
+                [&totalResources, &approvers](JSON::ObjectWriter* writer) {
+                  foreachpair (const string& role,
+                               const Resources& resources,
+                               totalResources.reservations()) {
+                    if (approvers->approved<VIEW_ROLE>(role)) {
+                      writer->field(role, resources);
                     }
-                  });
-                }
-              }
-            });
-
-        writer->field(
-            "unreserved_resources_full",
-            [&totalResources](JSON::ArrayWriter* writer) {
-              foreach (Resource resource, totalResources.unreserved()) {
-                convertResourceFormat(&resource, ENDPOINT);
-                writer->element(JSON::Protobuf(resource));
-              }
-            });
+                  }
+                });
 
-        // TODO(abudnik): Consider storing the allocatedResources in the Slave
-        // struct rather than computing it here each time.
-        Resources allocatedResources;
+            writer->field("unreserved_resources", totalResources.unreserved());
 
-        foreachvalue (const Framework* framework, slave->frameworks) {
-          allocatedResources += framework->allocatedResources();
-        }
+            writer->field(
+                "reserved_resources_full",
+                [&totalResources, &approvers](JSON::ObjectWriter* writer) {
+                  foreachpair (const string& role,
+                               const Resources& resources,
+                               totalResources.reservations()) {
+                    if (approvers->approved<VIEW_ROLE>(role)) {
+                      writer->field(
+                          role,
+                          [&resources](JSON::ArrayWriter* writer) {
+                            foreach (Resource resource, resources) {
+                              convertResourceFormat(&resource, ENDPOINT);
+                              writer->element(JSON::Protobuf(resource));
+                            }
+                          });
+                    }
+                  }
+                });
 
-        writer->field(
-            "reserved_resources_allocated",
-            [&allocatedResources, &rolesApprover](JSON::ObjectWriter* writer) {
-              foreachpair (const string& role,
-                           const Resources& resources,
-                           allocatedResources.reservations()) {
-                if (approveViewRole(rolesApprover, role)) {
-                  writer->field(role, resources);
-                }
-              }
-            });
+            writer->field(
+                "unreserved_resources_full",
+                [&totalResources](JSON::ArrayWriter* writer) {
+                  foreach (Resource resource, totalResources.unreserved()) {
+                    convertResourceFormat(&resource, ENDPOINT);
+                    writer->element(JSON::Protobuf(resource));
+                  }
+                });
+
+            // TODO(abudnik): Consider storing the allocatedResources in the
+            // Slave struct rather than computing it here each time.
+            Resources allocatedResources;
+
+            foreachvalue (const Framework* framework, slave->frameworks) {
+              allocatedResources += framework->allocatedResources();
+            }
 
-        writer->field(
-            "unreserved_resources_allocated", allocatedResources.unreserved());
+            writer->field(
+                "reserved_resources_allocated",
+                [&allocatedResources, &approvers](JSON::ObjectWriter* writer) {
+                  foreachpair (const string& role,
+                               const Resources& resources,
+                               allocatedResources.reservations()) {
+                    if (approvers->approved<VIEW_ROLE>(role)) {
+                      writer->field(role, resources);
+                    }
+                  }
+                });
 
-        writer->field("attributes", Attributes(slave->info.attributes()));
+            writer->field(
+                "unreserved_resources_allocated",
+                allocatedResources.unreserved());
 
-        if (slave->master.isSome()) {
-          Try<string> hostname =
-            net::getHostname(slave->master->address.ip);
+            writer->field("attributes", Attributes(slave->info.attributes()));
 
-          if (hostname.isSome()) {
-            writer->field("master_hostname", hostname.get());
-          }
-        }
+            if (slave->master.isSome()) {
+              Try<string> hostname =
+                  net::getHostname(slave->master->address.ip);
 
-        if (approveViewFlags(flagsApprover)) {
-          if (slave->flags.log_dir.isSome()) {
-            writer->field("log_dir", slave->flags.log_dir.get());
-          }
+              if (hostname.isSome()) {
+                writer->field("master_hostname", hostname.get());
+              }
+            }
 
-          if (slave->flags.external_log_file.isSome()) {
-            writer->field(
-                "external_log_file", slave->flags.external_log_file.get());
-          }
+            if (approvers->approved<VIEW_FLAGS>()) {
+              if (slave->flags.log_dir.isSome()) {
+                writer->field("log_dir", slave->flags.log_dir.get());
+              }
 
-          writer->field("flags", [this](JSON::ObjectWriter* writer) {
-            foreachvalue (const flags::Flag& flag, slave->flags) {
-              Option<string> value = flag.stringify(slave->flags);
-              if (value.isSome()) {
-                writer->field(flag.effective_name().value, value.get());
+              if (slave->flags.external_log_file.isSome()) {
+                writer->field(
+                    "external_log_file", slave->flags.external_log_file.get());
               }
-            }
-          });
-        }
 
-        // Model all of the frameworks.
-        writer->field(
-            "frameworks",
-            [this, &frameworksApprover, &executorsApprover, &tasksApprover](
-                JSON::ArrayWriter* writer) {
-          foreachvalue (Framework* framework, slave->frameworks) {
-            // Skip unauthorized frameworks.
-            if (!approveViewFrameworkInfo(
-                    frameworksApprover, framework->info)) {
-              continue;
+              writer->field("flags", [this](JSON::ObjectWriter* writer) {
+                  foreachvalue (const flags::Flag& flag, slave->flags) {
+                    Option<string> value = flag.stringify(slave->flags);
+                    if (value.isSome()) {
+                      writer->field(flag.effective_name().value, value.get());
+                    }
+                  }
+                });
             }
 
-            FrameworkWriter frameworkWriter(
-                tasksApprover,
-                executorsApprover,
-                framework);
-
-            writer->element(frameworkWriter);
-          }
-        });
+            // Model all of the frameworks.
+            writer->field(
+                "frameworks",
+                [this, &approvers](JSON::ArrayWriter* writer) {
+                  foreachvalue (Framework* framework, slave->frameworks) {
+                    // Skip unauthorized frameworks.
+                    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) 
{
+                      continue;
+                    }
 
-        // Model all of the completed frameworks.
-        writer->field(
-            "completed_frameworks",
-            [this, &frameworksApprover, &executorsApprover, &tasksApprover](
-                JSON::ArrayWriter* writer) {
-          foreachvalue (const Owned<Framework>& framework,
-                        slave->completedFrameworks) {
-            // Skip unauthorized frameworks.
-            if (!approveViewFrameworkInfo(
-                    frameworksApprover, framework->info)) {
-              continue;
-            }
+                    writer->element(FrameworkWriter(approvers, framework));
+                  }
+                });
 
-            FrameworkWriter frameworkWriter(
-                tasksApprover,
-                executorsApprover,
-                framework.get());
+            // Model all of the completed frameworks.
+            writer->field(
+                "completed_frameworks",
+            [this, &approvers](JSON::ArrayWriter* writer) {
+                  foreachvalue (const Owned<Framework>& framework,
+                                slave->completedFrameworks) {
+                    // Skip unauthorized frameworks.
+                    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) 
{
+                      continue;
+                    }
 
-            writer->element(frameworkWriter);
-          }
-        });
-      };
+                    writer->element(
+                        FrameworkWriter(approvers, framework.get()));
+                  }
+                });
+          };
 
-      return OK(jsonify(state), request.url.query.get("jsonp"));
-    }));
+          return OK(jsonify(state), request.url.query.get("jsonp"));
+        }));
 }
 
 
@@ -1518,39 +1448,28 @@ Future<Response> Http::getFrameworks(
 
   LOG(INFO) << "Processing GET_FRAMEWORKS call";
 
-  // Retrieve `ObjectApprover`s for authorizing frameworks.
-  Future<Owned<ObjectApprover>> frameworksApprover;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    frameworksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_FRAMEWORK);
-  } else {
-    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return frameworksApprover
-    .then(defer(slave->self(),
-        [this, acceptType](const Owned<ObjectApprover>& frameworksApprover)
-          -> Future<Response> {
-      mesos::agent::Response response;
-      response.set_type(mesos::agent::Response::GET_FRAMEWORKS);
-      *response.mutable_get_frameworks() = _getFrameworks(frameworksApprover);
+  return ObjectApprovers::create(slave->authorizer, principal, 
{VIEW_FRAMEWORK})
+    .then(defer(
+        slave->self(),
+        [this, acceptType](
+            const Owned<ObjectApprovers>& approvers) -> Response {
+          mesos::agent::Response response;
+          response.set_type(mesos::agent::Response::GET_FRAMEWORKS);
+          *response.mutable_get_frameworks() = _getFrameworks(approvers);
 
-      return OK(serialize(acceptType, evolve(response)),
-                stringify(acceptType));
-    }));
+          return OK(serialize(acceptType, evolve(response)),
+                    stringify(acceptType));
+        }));
 }
 
 
 mesos::agent::Response::GetFrameworks Http::_getFrameworks(
-    const Owned<ObjectApprover>& frameworksApprover) const
+    const Owned<ObjectApprovers>& approvers) const
 {
   mesos::agent::Response::GetFrameworks getFrameworks;
   foreachvalue (const Framework* framework, slave->frameworks) {
     // Skip unauthorized frameworks.
-    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
       continue;
     }
 
@@ -1560,7 +1479,7 @@ mesos::agent::Response::GetFrameworks 
Http::_getFrameworks(
 
   foreachvalue (const Owned<Framework>& framework, slave->completedFrameworks) 
{
     // Skip unauthorized frameworks.
-    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
       continue;
     }
 
@@ -1581,53 +1500,33 @@ Future<Response> Http::getExecutors(
 
   LOG(INFO) << "Processing GET_EXECUTORS call";
 
-  // Retrieve `ObjectApprover`s for authorizing frameworks and executors.
-  Future<Owned<ObjectApprover>> frameworksApprover;
-  Future<Owned<ObjectApprover>> executorsApprover;
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    frameworksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_FRAMEWORK);
-
-    executorsApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_EXECUTOR);
-  } else {
-    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return collect(frameworksApprover, executorsApprover)
-    .then(defer(slave->self(),
-        [this, acceptType](const tuple<Owned<ObjectApprover>,
-                                        Owned<ObjectApprover>>& approvers)
-          -> Future<Response> {
-      // Get approver from tuple.
-      Owned<ObjectApprover> frameworksApprover;
-      Owned<ObjectApprover> executorsApprover;
-      tie(frameworksApprover, executorsApprover) = approvers;
-
-      mesos::agent::Response response;
-      response.set_type(mesos::agent::Response::GET_EXECUTORS);
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {VIEW_FRAMEWORK, VIEW_EXECUTOR})
+    .then(defer(
+        slave->self(),
+        [this, acceptType](
+            const Owned<ObjectApprovers>& approvers) -> Response {
+          mesos::agent::Response response;
+          response.set_type(mesos::agent::Response::GET_EXECUTORS);
 
-      *response.mutable_get_executors() =
-        _getExecutors(frameworksApprover, executorsApprover);
+          *response.mutable_get_executors() = _getExecutors(approvers);
 
-      return OK(serialize(acceptType, evolve(response)),
-                stringify(acceptType));
-    }));
+          return OK(serialize(acceptType, evolve(response)),
+                    stringify(acceptType));
+        }));
 }
 
 
 mesos::agent::Response::GetExecutors Http::_getExecutors(
-    const Owned<ObjectApprover>& frameworksApprover,
-    const Owned<ObjectApprover>& executorsApprover) const
+    const Owned<ObjectApprovers>& approvers) const
 {
   // Construct framework list with both active and completed frameworks.
   vector<const Framework*> frameworks;
   foreachvalue (Framework* framework, slave->frameworks) {
     // Skip unauthorized frameworks.
-    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
       continue;
     }
 
@@ -1636,7 +1535,7 @@ mesos::agent::Response::GetExecutors Http::_getExecutors(
 
   foreachvalue (const Owned<Framework>& framework, slave->completedFrameworks) 
{
     // Skip unauthorized frameworks.
-    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
       continue;
     }
 
@@ -1648,9 +1547,9 @@ mesos::agent::Response::GetExecutors Http::_getExecutors(
   foreach (const Framework* framework, frameworks) {
     foreachvalue (Executor* executor, framework->executors) {
       // Skip unauthorized executors.
-      if (!approveViewExecutorInfo(executorsApprover,
-                                   executor->info,
-                                   framework->info)) {
+      if (!approvers->approved<VIEW_EXECUTOR>(
+              executor->info,
+              framework->info)) {
         continue;
       }
 
@@ -1660,9 +1559,9 @@ mesos::agent::Response::GetExecutors Http::_getExecutors(
 
     foreach (const Owned<Executor>& executor, framework->completedExecutors) {
       // Skip unauthorized executors.
-      if (!approveViewExecutorInfo(executorsApprover,
-                                   executor->info,
-                                   framework->info)) {
+      if (!approvers->approved<VIEW_EXECUTOR>(
+              executor->info,
+              framework->info)) {
         continue;
       }
 
@@ -1709,61 +1608,33 @@ Future<Response> Http::getTasks(
 
   LOG(INFO) << "Processing GET_TASKS call";
 
-  // Retrieve Approvers for authorizing frameworks and tasks.
-  Future<Owned<ObjectApprover>> frameworksApprover;
-  Future<Owned<ObjectApprover>> tasksApprover;
-  Future<Owned<ObjectApprover>> executorsApprover;
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    frameworksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_FRAMEWORK);
-
-    tasksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_TASK);
-
-    executorsApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_EXECUTOR);
-  } else {
-    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return collect(frameworksApprover, tasksApprover, executorsApprover)
-    .then(defer(slave->self(),
-      [this, acceptType](const tuple<Owned<ObjectApprover>,
-                                      Owned<ObjectApprover>,
-                                      Owned<ObjectApprover>>& approvers)
-        -> Future<Response> {
-      // Get approver from tuple.
-      Owned<ObjectApprover> frameworksApprover;
-      Owned<ObjectApprover> tasksApprover;
-      Owned<ObjectApprover> executorsApprover;
-      tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
-
-      mesos::agent::Response response;
-      response.set_type(mesos::agent::Response::GET_TASKS);
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
+    .then(defer(
+        slave->self(),
+        [this, acceptType](
+            const Owned<ObjectApprovers>& approvers) -> Response {
+          mesos::agent::Response response;
+          response.set_type(mesos::agent::Response::GET_TASKS);
 
-      *response.mutable_get_tasks() =
-        _getTasks(frameworksApprover, tasksApprover, executorsApprover);
+          *response.mutable_get_tasks() = _getTasks(approvers);
 
-      return OK(serialize(acceptType, evolve(response)),
-                stringify(acceptType));
-  }));
+          return OK(serialize(acceptType, evolve(response)),
+                    stringify(acceptType));
+        }));
 }
 
 
 mesos::agent::Response::GetTasks Http::_getTasks(
-    const Owned<ObjectApprover>& frameworksApprover,
-    const Owned<ObjectApprover>& tasksApprover,
-    const Owned<ObjectApprover>& executorsApprover) const
+    const Owned<ObjectApprovers>& approvers) const
 {
   // Construct framework list with both active and completed frameworks.
   vector<const Framework*> frameworks;
   foreachvalue (Framework* framework, slave->frameworks) {
     // Skip unauthorized frameworks.
-    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
       continue;
     }
 
@@ -1772,7 +1643,7 @@ mesos::agent::Response::GetTasks Http::_getTasks(
 
   foreachvalue (const Owned<Framework>& framework, slave->completedFrameworks) 
{
     // Skip unauthorized frameworks.
-    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
       continue;
     }
 
@@ -1784,9 +1655,8 @@ mesos::agent::Response::GetTasks Http::_getTasks(
   foreach (const Framework* framework, frameworks) {
     foreachvalue (Executor* executor, framework->executors) {
       // Skip unauthorized executors.
-      if (!approveViewExecutorInfo(executorsApprover,
-                                   executor->info,
-                                   framework->info)) {
+      if (!approvers->approved<VIEW_EXECUTOR>(
+              executor->info, framework->info)) {
         continue;
       }
 
@@ -1795,9 +1665,8 @@ mesos::agent::Response::GetTasks Http::_getTasks(
 
     foreach (const Owned<Executor>& executor, framework->completedExecutors) {
       // Skip unauthorized executors.
-      if (!approveViewExecutorInfo(executorsApprover,
-                                   executor->info,
-                                   framework->info)) {
+      if (!approvers->approved<VIEW_EXECUTOR>(
+              executor->info, framework->info)) {
         continue;
       }
 
@@ -1813,7 +1682,7 @@ mesos::agent::Response::GetTasks Http::_getTasks(
     foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) {
       foreachvalue (const TaskInfo& taskInfo, taskInfos) {
         // Skip unauthorized tasks.
-        if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+        if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
           continue;
         }
 
@@ -1831,7 +1700,7 @@ mesos::agent::Response::GetTasks Http::_getTasks(
     // Queued tasks.
     foreachvalue (const TaskInfo& taskInfo, executor->queuedTasks) {
       // Skip unauthorized tasks.
-      if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+      if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
         continue;
       }
 
@@ -1845,7 +1714,7 @@ mesos::agent::Response::GetTasks Http::_getTasks(
     foreachvalue (Task* task, executor->launchedTasks) {
       CHECK_NOTNULL(task);
       // Skip unauthorized tasks.
-      if (!approveViewTask(tasksApprover, *task, framework->info)) {
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
         continue;
       }
 
@@ -1856,7 +1725,7 @@ mesos::agent::Response::GetTasks Http::_getTasks(
     foreachvalue (Task* task, executor->terminatedTasks) {
       CHECK_NOTNULL(task);
       // Skip unauthorized tasks.
-      if (!approveViewTask(tasksApprover, *task, framework->info)) {
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
         continue;
       }
 
@@ -1866,7 +1735,7 @@ mesos::agent::Response::GetTasks Http::_getTasks(
     // Completed tasks.
     foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
       // Skip unauthorized tasks.
-      if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
+      if (!approvers->approved<VIEW_TASK>(*task.get(), framework->info)) {
         continue;
       }
 
@@ -1936,65 +1805,31 @@ Future<Response> Http::getState(
 
   LOG(INFO) << "Processing GET_STATE call";
 
-  // Retrieve Approvers for authorizing frameworks and tasks.
-  Future<Owned<ObjectApprover>> frameworksApprover;
-  Future<Owned<ObjectApprover>> tasksApprover;
-  Future<Owned<ObjectApprover>> executorsApprover;
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    frameworksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_FRAMEWORK);
-
-    tasksApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_TASK);
-
-    executorsApprover = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::VIEW_EXECUTOR);
-  } else {
-    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return collect(frameworksApprover, tasksApprover, executorsApprover)
-    .then(defer(slave->self(),
-      [=](const tuple<Owned<ObjectApprover>,
-                      Owned<ObjectApprover>,
-                      Owned<ObjectApprover>>& approvers)
-        -> Future<Response> {
-      // Get approver from tuple.
-      Owned<ObjectApprover> frameworksApprover;
-      Owned<ObjectApprover> tasksApprover;
-      Owned<ObjectApprover> executorsApprover;
-      tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
-
-      mesos::agent::Response response;
-      response.set_type(mesos::agent::Response::GET_STATE);
-      *response.mutable_get_state() =
-        _getState(frameworksApprover, tasksApprover, executorsApprover);
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
+    .then(defer(
+        slave->self(),
+        [=](const Owned<ObjectApprovers>& approvers) -> Response {
+          mesos::agent::Response response;
+          response.set_type(mesos::agent::Response::GET_STATE);
+          *response.mutable_get_state() = _getState(approvers);
 
-      return OK(serialize(acceptType, evolve(response)),
-                stringify(acceptType));
-    }));
+          return OK(serialize(acceptType, evolve(response)),
+                    stringify(acceptType));
+        }));
 }
 
 
 mesos::agent::Response::GetState Http::_getState(
-    const Owned<ObjectApprover>& frameworksApprover,
-    const Owned<ObjectApprover>& tasksApprover,
-    const Owned<ObjectApprover>& executorsApprover) const
+    const Owned<ObjectApprovers>& approvers) const
 {
   mesos::agent::Response::GetState getState;
 
-  *getState.mutable_get_tasks() =
-    _getTasks(frameworksApprover, tasksApprover, executorsApprover);
-
-  *getState.mutable_get_executors() =
-    _getExecutors(frameworksApprover, executorsApprover);
-
-  *getState.mutable_get_frameworks() =
-    _getFrameworks(frameworksApprover);
+  *getState.mutable_get_tasks() = _getTasks(approvers);
+  *getState.mutable_get_executors() = _getExecutors(approvers);
+  *getState.mutable_get_frameworks() = _getFrameworks(approvers);
 
   return getState;
 }
@@ -2191,32 +2026,21 @@ Future<Response> Http::getContainers(
 
   LOG(INFO) << "Processing GET_CONTAINERS call";
 
-  Future<Owned<AuthorizationAcceptor>> authorizeContainer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, authorization::VIEW_CONTAINER);
-
-  Future<Owned<AuthorizationAcceptor>> authorizeStandaloneContainer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, 
authorization::VIEW_STANDALONE_CONTAINER);
-
-  return collect(authorizeContainer, authorizeStandaloneContainer)
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {VIEW_CONTAINER, VIEW_STANDALONE_CONTAINER})
     .then(defer(
         slave->self(),
-        [this, call](const tuple<Owned<AuthorizationAcceptor>,
-                                 Owned<AuthorizationAcceptor>>& acceptors) {
-          Owned<AuthorizationAcceptor> authorizeContainer;
-          Owned<AuthorizationAcceptor> authorizeStandaloneContainer;
-          tie(authorizeContainer, authorizeStandaloneContainer) = acceptors;
-
+        [this, call](const Owned<ObjectApprovers>& approvers) {
           // Use an empty container ID filter.
           return __containers(
-              authorizeContainer,
-              authorizeStandaloneContainer,
+              approvers,
               None(),
               call.get_containers().show_nested(),
               call.get_containers().show_standalone());
-    })).then([acceptType](const Future<JSON::Array>& result)
-        -> Future<Response> {
+        }))
+    .then([acceptType](const Future<JSON::Array>& result) -> Response {
       if (!result.isReady()) {
         LOG(WARNING) << "Could not collect container status and statistics: "
                      << (result.isFailed()
@@ -2240,60 +2064,43 @@ Future<Response> Http::_containers(
     const Request& request,
     const Option<Principal>& principal) const
 {
-  Future<Owned<AuthorizationAcceptor>> authorizeContainer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, authorization::VIEW_CONTAINER);
+  Option<string> containerId = request.url.query.get("container_id");
 
-  Future<Owned<AuthorizationAcceptor>> authorizeStandaloneContainer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, 
authorization::VIEW_STANDALONE_CONTAINER);
-
-  Future<IDAcceptor<ContainerID>> selectContainerId =
-      IDAcceptor<ContainerID>(request.url.query.get("container_id"));
-
-  return collect(authorizeContainer,
-                 authorizeStandaloneContainer,
-                 selectContainerId)
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {VIEW_CONTAINER, VIEW_STANDALONE_CONTAINER})
     .then(defer(
         slave->self(),
-        [this](const tuple<Owned<AuthorizationAcceptor>,
-                           Owned<AuthorizationAcceptor>,
-                           IDAcceptor<ContainerID>>& acceptors) {
-          Owned<AuthorizationAcceptor> authorizeContainer;
-          Owned<AuthorizationAcceptor> authorizeStandaloneContainer;
-          Option<IDAcceptor<ContainerID>> selectContainerId;
-
-          tie(authorizeContainer,
-              authorizeStandaloneContainer,
-              selectContainerId) = acceptors;
+        [this, containerId](const Owned<ObjectApprovers>& approvers) {
+          IDAcceptor<ContainerID> selectContainerId(containerId);
 
           return __containers(
-              authorizeContainer,
-              authorizeStandaloneContainer,
+              approvers,
               selectContainerId,
               false,
               false);
-    })).then([request](const Future<JSON::Array>& result) -> Future<Response> {
-       if (!result.isReady()) {
-         LOG(WARNING) << "Could not collect container status and statistics: "
-                      << (result.isFailed()
-                           ? result.failure()
-                           : "Discarded");
-
-         return result.isFailed()
-           ? InternalServerError(result.failure())
-           : InternalServerError();
-       }
-
-       return process::http::OK(
-           result.get(), request.url.query.get("jsonp"));
+        }))
+    .then([request](const Future<JSON::Array>& result) -> Response {
+      if (!result.isReady()) {
+        LOG(WARNING) << "Could not collect container status and statistics: "
+                     << (result.isFailed()
+                         ? result.failure()
+                         : "Discarded");
+
+        return result.isFailed()
+            ? InternalServerError(result.failure())
+            : InternalServerError();
+      }
+
+      return process::http::OK(
+          result.get(), request.url.query.get("jsonp"));
     });
 }
 
 
 Future<JSON::Array> Http::__containers(
-    Owned<AuthorizationAcceptor> authorizeContainer,
-    Owned<AuthorizationAcceptor> authorizeStandaloneContainer,
+    const Owned<ObjectApprovers>& approvers,
     Option<IDAcceptor<ContainerID>> selectContainerId,
     bool showNestedContainers,
     bool showStandaloneContainers) const
@@ -2322,7 +2129,7 @@ Future<JSON::Array> Http::__containers(
 
           if ((selectContainerId.isSome() &&
                !selectContainerId->accept(containerId)) ||
-              !authorizeContainer->accept(info, framework->info)) {
+              !approvers->approved<VIEW_CONTAINER>(info, framework->info)) {
             continue;
           }
 
@@ -2381,7 +2188,7 @@ Future<JSON::Array> Http::__containers(
         }
 
         if (isRootContainerStandalone &&
-            !authorizeStandaloneContainer->accept()) {
+            !approvers->approved<VIEW_STANDALONE_CONTAINER>()) {
           continue;
         }
 
@@ -2480,47 +2287,34 @@ Future<Response> Http::pruneImages(
               std::back_inserter(excludedImages));
   }
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject,
-        authorization::PRUNE_IMAGES);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return approver
-    .then(defer(slave->self(), [this, excludedImages](
-        const Owned<ObjectApprover>& approver) -> Future<Response> {
-      Try<bool> approved = approver->approved(ObjectApprover::Object());
-      if (approved.isError()) {
-        return InternalServerError("Authorization error: " + approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+  return ObjectApprovers::create(slave->authorizer, principal, {PRUNE_IMAGES})
+    .then(defer(
+        slave->self(),
+        [this, excludedImages](
+            const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          if (!approvers->approved<PRUNE_IMAGES>()) {
+            return Forbidden();
+          }
 
-      return slave->containerizer->pruneImages(excludedImages)
-          .then([](const Future<Nothing>& result) -> Future<Response> {
-            if (!result.isReady()) {
-              // TODO(zhitao): Because `containerizer::pruneImages` returns
-              // a `Nothing` now, we cannot distinguish between actual
-              // failure or the case that operator should drain the agent.
-              // Consider returning more information.
-              LOG(WARNING)
-                << "Failed to prune images: "
-                << (result.isFailed() ? result.failure() : "discarded");
-
-              return result.isFailed()
-                ? InternalServerError(result.failure())
-                : InternalServerError();
-            }
+          return slave->containerizer->pruneImages(excludedImages)
+            .then([](const Future<Nothing>& result) -> Response {
+                if (!result.isReady()) {
+                  // TODO(zhitao): Because `containerizer::pruneImages` returns
+                  // a `Nothing` now, we cannot distinguish between actual
+                  // failure or the case that operator should drain the agent.
+                  // Consider returning more information.
+                  LOG(WARNING)
+                      << "Failed to prune images: "
+                      << (result.isFailed() ? result.failure() : "discarded");
+
+                  return result.isFailed()
+                      ? InternalServerError(result.failure())
+                      : InternalServerError();
+                }
 
-            return OK();
-          });
-    }));
+                return OK();
+              });
+        }));
 }
 
 
@@ -2605,15 +2399,14 @@ Future<Response> Http::launchNestedContainer(
   LOG(INFO) << "Processing LAUNCH_NESTED_CONTAINER call for container '"
             << call.launch_nested_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, authorization::LAUNCH_NESTED_CONTAINER);
-
-  return authorizer
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {LAUNCH_NESTED_CONTAINER})
     .then(defer(
         slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _launchContainer(
+        [=](const Owned<ObjectApprovers>& authorizer) {
+          return _launchContainer<LAUNCH_NESTED_CONTAINER>(
               call.launch_nested_container().container_id(),
               call.launch_nested_container().command(),
               None(),
@@ -2638,20 +2431,34 @@ Future<Response> Http::launchContainer(
   LOG(INFO) << "Processing LAUNCH_CONTAINER call for container '"
             << call.launch_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal,
-        slave->authorizer,
-        call.launch_container().container_id().has_parent()
-          ? authorization::LAUNCH_NESTED_CONTAINER
-          : authorization::LAUNCH_STANDALONE_CONTAINER);
+  if (call.launch_container().container_id().has_parent()) {
+    return launchContainer<LAUNCH_NESTED_CONTAINER>(
+        call,
+        acceptType,
+        principal);
+  }
+
+  return launchContainer<LAUNCH_STANDALONE_CONTAINER>(
+      call,
+      acceptType,
+      principal);
+}
+
 
-  return authorizer
+template <authorization::Action action>
+Future<Response> Http::launchContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {action})
     .then(defer(
         slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer)
-          -> Future<Response> {
-          return _launchContainer(
+        [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          return _launchContainer<action>(
               call.launch_container().container_id(),
               call.launch_container().command(),
               call.launch_container().resources(),
@@ -2660,11 +2467,12 @@ Future<Response> Http::launchContainer(
                 : Option<ContainerInfo>::none(),
               ContainerClass::DEFAULT,
               acceptType,
-              authorizer);
+              approvers);
         }));
 }
 
 
+template <authorization::Action action>
 Future<Response> Http::_launchContainer(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
@@ -2672,7 +2480,7 @@ Future<Response> Http::_launchContainer(
     const Option<ContainerInfo>& containerInfo,
     const Option<ContainerClass>& containerClass,
     ContentType acceptType,
-    const Owned<AuthorizationAcceptor>& authorizer) const
+    const Owned<ObjectApprovers>& approvers) const
 {
   Option<string> user;
 
@@ -2682,14 +2490,14 @@ Future<Response> Http::_launchContainer(
   // launching a standalone container (possibly nested).
   Executor* executor = slave->getExecutor(containerId);
   if (executor == nullptr) {
-    if (!authorizer->accept(containerId)) {
+    if (!approvers->approved<action>(containerId)) {
       return Forbidden();
     }
   } else {
     Framework* framework = slave->getFramework(executor->frameworkId);
     CHECK_NOTNULL(framework);
 
-    if (!authorizer->accept(
+    if (!approvers->approved<action>(
             executor->info, framework->info, commandInfo, containerId)) {
       return Forbidden();
     }
@@ -2826,18 +2634,17 @@ Future<Response> Http::waitNestedContainer(
   LOG(INFO) << "Processing WAIT_NESTED_CONTAINER call for container '"
             << call.wait_nested_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, authorization::WAIT_NESTED_CONTAINER);
-
-  return authorizer
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {WAIT_NESTED_CONTAINER})
     .then(defer(
         slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _waitContainer(
+        [=](const Owned<ObjectApprovers>& approvers) {
+          return _waitContainer<WAIT_NESTED_CONTAINER>(
               call.wait_nested_container().container_id(),
               acceptType,
-              authorizer,
+              approvers,
               true);
         }));
 }
@@ -2854,31 +2661,41 @@ Future<Response> Http::waitContainer(
   LOG(INFO) << "Processing WAIT_CONTAINER call for container '"
             << call.wait_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal,
-        slave->authorizer,
-        call.wait_container().container_id().has_parent()
-          ? authorization::WAIT_NESTED_CONTAINER
-          : authorization::WAIT_STANDALONE_CONTAINER);
+  if (call.wait_container().container_id().has_parent()) {
+    return waitContainer<WAIT_NESTED_CONTAINER>(call, acceptType, principal);
+  }
 
-  return authorizer
+  return waitContainer<WAIT_STANDALONE_CONTAINER>(call, acceptType, principal);
+}
+
+
+template <authorization::Action action>
+Future<Response> Http::waitContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {action})
     .then(defer(
         slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _waitContainer(
+        [=](const Owned<ObjectApprovers>& approvers) {
+          return _waitContainer<action>(
               call.wait_container().container_id(),
               acceptType,
-              authorizer,
+              approvers,
               false);
         }));
 }
 
 
+template <authorization::Action action>
 Future<Response> Http::_waitContainer(
     const ContainerID& containerId,
     ContentType acceptType,
-    const Owned<AuthorizationAcceptor>& authorizer,
+    const Owned<ObjectApprovers>& approvers,
     const bool deprecated) const
 {
   // Attempt to get the executor associated with this ContainerID.
@@ -2887,17 +2704,15 @@ Future<Response> Http::_waitContainer(
   // waiting on a standalone container (possibly nested).
   Executor* executor = slave->getExecutor(containerId);
   if (executor == nullptr) {
-    if (!authorizer->accept(containerId)) {
+    if (!approvers->approved<action>(containerId)) {
       return Forbidden();
     }
   } else {
     Framework* framework = slave->getFramework(executor->frameworkId);
     CHECK_NOTNULL(framework);
 
-    if (!authorizer->accept(
-            executor->info,
-            framework->info,
-            containerId)) {
+    if (!approvers->approved<action>(
+            executor->info, framework->info, containerId)) {
       return Forbidden();
     }
   }
@@ -2984,25 +2799,24 @@ Future<Response> Http::killNestedContainer(
   LOG(INFO) << "Processing KILL_NESTED_CONTAINER call for container '"
             << call.kill_nested_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, authorization::KILL_NESTED_CONTAINER);
-
   // SIGKILL is used by default if a signal is not specified.
   int signal = SIGKILL;
   if (call.kill_nested_container().has_signal()) {
     signal = call.kill_nested_container().signal();
   }
 
-  return authorizer
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {KILL_NESTED_CONTAINER})
     .then(defer(
         slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _killContainer(
+        [=](const Owned<ObjectApprovers>& approvers) {
+          return _killContainer<KILL_NESTED_CONTAINER>(
               call.kill_nested_container().container_id(),
               signal,
               acceptType,
-              authorizer);
+              approvers);
         }));
 }
 
@@ -3018,38 +2832,48 @@ Future<Response> Http::killContainer(
   LOG(INFO) << "Processing KILL_CONTAINER call for container '"
             << call.kill_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal,
-        slave->authorizer,
-        call.kill_container().container_id().has_parent()
-          ? authorization::KILL_NESTED_CONTAINER
-          : authorization::KILL_STANDALONE_CONTAINER);
+  if (call.kill_container().container_id().has_parent()) {
+    return killContainer<KILL_NESTED_CONTAINER>(call, acceptType, principal);
+  }
+
+  return killContainer<KILL_STANDALONE_CONTAINER>(call, acceptType, principal);
+}
+
 
+template <mesos::authorization::Action action>
+Future<Response> Http::killContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
   // SIGKILL is used by default if a signal is not specified.
   int signal = SIGKILL;
   if (call.kill_container().has_signal()) {
     signal = call.kill_container().signal();
   }
 
-  return authorizer
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {action})
     .then(defer(
         slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _killContainer(
+        [=](const Owned<ObjectApprovers>& approvers) {
+          return _killContainer<action>(
               call.kill_container().container_id(),
               signal,
               acceptType,
-              authorizer);
+              approvers);
         }));
 }
 
 
+template <mesos::authorization::Action action>
 Future<Response> Http::_killContainer(
     const ContainerID& containerId,
     const int signal,
     ContentType acceptType,
-    const Owned<AuthorizationAcceptor>& authorizer) const
+    const Owned<ObjectApprovers>& approvers) const
 {
   // Attempt to get the executor associated with this ContainerID.
   // We only expect to get the executor when killing a nested container
@@ -3057,14 +2881,14 @@ Future<Response> Http::_killContainer(
   // killing a standalone container (possibly nested).
   Executor* executor = slave->getExecutor(containerId);
   if (executor == nullptr) {
-    if (!authorizer->accept(containerId)) {
+    if (!approvers->approved<action>(containerId)) {
       return Forbidden();
     }
   } else {
     Framework* framework = slave->getFramework(executor->frameworkId);
     CHECK_NOTNULL(framework);
 
-    if (!authorizer->accept(
+    if (!approvers->approved<action>(
             executor->info,
             framework->info,
             containerId)) {
@@ -3096,19 +2920,18 @@ Future<Response> Http::removeNestedContainer(
   LOG(INFO) << "Processing REMOVE_NESTED_CONTAINER call for container '"
             << call.remove_nested_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal, slave->authorizer, authorization::REMOVE_NESTED_CONTAINER);
-
-  return authorizer
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {REMOVE_NESTED_CONTAINER})
     .then(defer(
-        slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _removeContainer(
-              call.remove_nested_container().container_id(),
-              acceptType,
-              authorizer);
-        }));
+      slave->self(),
+      [=](const Owned<ObjectApprovers>& approvers) {
+        return _removeContainer<REMOVE_NESTED_CONTAINER>(
+            call.remove_nested_container().container_id(),
+            acceptType,
+            approvers);
+      }));
 }
 
 
@@ -3123,30 +2946,46 @@ Future<Response> Http::removeContainer(
   LOG(INFO) << "Processing REMOVE_CONTAINER call for container '"
             << call.remove_container().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal,
-        slave->authorizer,
-        call.remove_container().container_id().has_parent()
-          ? authorization::REMOVE_NESTED_CONTAINER
-          : authorization::REMOVE_STANDALONE_CONTAINER);
+  if (call.remove_container().container_id().has_parent()) {
+    return removeContainer<REMOVE_NESTED_CONTAINER>(
+        call,
+        acceptType,
+        principal);
+  }
+
+  return removeContainer<REMOVE_STANDALONE_CONTAINER>(
+      call,
+      acceptType,
+      principal);
+}
+
 
-  return authorizer
+template <mesos::authorization::Action action>
+Future<Response> Http::removeContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {action})
     .then(defer(
         slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _removeContainer(
+        [=](const Owned<ObjectApprovers>& approvers) {
+          return _removeContainer<action>(
               call.remove_container().container_id(),
               acceptType,
-              authorizer);
+              approvers);
         }));
 }
 
 
+template <mesos::authorization::Action action>
 Future<Response> Http::_removeContainer(
     const ContainerID& containerId,
     ContentType acceptType,
-    const Owned<AuthorizationAcceptor>& authorizer) const
+    const Owned<ObjectApprovers>& approvers) const
 {
   // Attempt to get the executor associated with this ContainerID.
   // We only expect to get the executor when removing a nested container
@@ -3154,14 +2993,14 @@ Future<Response> Http::_removeContainer(
   // removing a standalone container (possibly nested).
   Executor* executor = slave->getExecutor(containerId);
   if (executor == nullptr) {
-    if (!authorizer->accept(containerId)) {
+    if (!approvers->approved<action>(containerId)) {
       return Forbidden();
     }
   } else {
     Framework* framework = slave->getFramework(executor->frameworkId);
     CHECK_NOTNULL(framework);
 
-    if (!authorizer->accept(
+    if (!approvers->approved<action>(
             executor->info,
             framework->info,
             containerId)) {
@@ -3273,46 +3112,37 @@ Future<Response> Http::attachContainerInput(
   LOG(INFO) << "Processing ATTACH_CONTAINER_INPUT call for container '"
             << call.attach_container_input().container_id() << "'";
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::ATTACH_CONTAINER_INPUT);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return approver.then(defer(slave->self(),
-    [this, call, decoder, mediaTypes](
-        const Owned<ObjectApprover>& attachInputApprover) -> Future<Response> {
-      const ContainerID& containerId =
-        call.attach_container_input().container_id();
-
-      Executor* executor = slave->getExecutor(containerId);
-      if (executor == nullptr) {
-        return NotFound(
-            "Container " + stringify(containerId) + " cannot be found");
-      }
-
-      Framework* framework = slave->getFramework(executor->frameworkId);
-      CHECK_NOTNULL(framework);
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {ATTACH_CONTAINER_INPUT})
+    .then(defer(
+        slave->self(),
+        [this, call, decoder, mediaTypes](
+            const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          const ContainerID& containerId =
+              call.attach_container_input().container_id();
+
+          Executor* executor = slave->getExecutor(containerId);
+          if (executor == nullptr) {
+            return NotFound(
+                "Container " + stringify(containerId) + " cannot be found");
+          }
 
-      Try<bool> approved = attachInputApprover->approved(
-          ObjectApprover::Object(executor->info, framework->info));
+          Framework* framework = slave->getFramework(executor->frameworkId);
+          CHECK_NOTNULL(framework);
 
-      if (approved.isError()) {
-        return Failure(approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+          if (!approvers->approved<ATTACH_CONTAINER_INPUT>(
+                  executor->info,
+                  framework->info)) {
+            return Forbidden();
+          }
 
-      Owned<Reader<mesos::agent::Call>> decoder_ = decoder;
+          Owned<Reader<mesos::agent::Call>> decoder_ = decoder;
 
-      return _attachContainerInput(
-          call, std::move(decoder_), mediaTypes);
-  }));
+          return _attachContainerInput(
+              call, std::move(decoder_), mediaTypes);
+        }));
 }
 
 
@@ -3323,52 +3153,41 @@ Future<Response> Http::addResourceProviderConfig(
   CHECK_EQ(mesos::agent::Call::ADD_RESOURCE_PROVIDER_CONFIG, call.type());
   CHECK(call.has_add_resource_provider_config());
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject,
-        authorization::MODIFY_RESOURCE_PROVIDER_CONFIG);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {MODIFY_RESOURCE_PROVIDER_CONFIG})
+    .then(defer(
+        slave->self(),
+        [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          if (!approvers->approved<MODIFY_RESOURCE_PROVIDER_CONFIG>()) {
+            return Forbidden();
+          }
 
-  return approver
-    .then(defer(slave->self(), [=](
-        const Owned<ObjectApprover>& approver) -> Future<Response> {
-      Try<bool> approved = approver->approved(ObjectApprover::Object());
-      if (approved.isError()) {
-        return InternalServerError("Authorization error: " + approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+          const ResourceProviderInfo& info =
+              call.add_resource_provider_config().info();
 
-      const ResourceProviderInfo& info =
-        call.add_resource_provider_config().info();
+          LOG(INFO)
+              << "Processing ADD_RESOURCE_PROVIDER_CONFIG call with type '"
+              << info.type() << "' and name '" << info.name() << "'";
 
-      LOG(INFO)
-        << "Processing ADD_RESOURCE_PROVIDER_CONFIG call with type '"
-        << info.type() << "' and name '" << info.name() << "'";
+          return slave->localResourceProviderDaemon->add(info)
+            .then([](bool added) -> Response {
+                if (!added) {
+                  return Conflict();
+                }
 
-      return slave->localResourceProviderDaemon->add(info)
-        .then([](bool added) -> Response {
-          if (!added) {
-            return Conflict();
-          }
+                return OK();
+              })
+            .repair([info](const Future<Response>& future) {
+                LOG(ERROR)
+                    << "Failed to add resource provider config with type '"
+                    << info.type() << "' and name '" << info.name() << "': "
+                    << future.failure();
 
-          return OK();
-        })
-        .repair([info](const Future<Response>& future) {
-          LOG(ERROR)
-            << "Failed to add resource provider config with type '"
-            << info.type() << "' and name '" << info.name() << "': "
-            << future.failure();
-
-          return InternalServerError(future.failure());
-        });
-    }));
+                return InternalServerError(future.failure());
+              });
+        }));
 }
 
 
@@ -3379,52 +3198,41 @@ Future<Response> Http::updateResourceProviderConfig(
   CHECK_EQ(mesos::agent::Call::UPDATE_RESOURCE_PROVIDER_CONFIG, call.type());
   CHECK(call.has_update_resource_provider_config());
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject,
-        authorization::MODIFY_RESOURCE_PROVIDER_CONFIG);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {MODIFY_RESOURCE_PROVIDER_CONFIG})
+    .then(defer(
+        slave->self(),
+        [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          if (!approvers->approved<MODIFY_RESOURCE_PROVIDER_CONFIG>()) {
+            return Forbidden();
+          }
 
-  return approver
-    .then(defer(slave->self(), [=](
-        const Owned<ObjectApprover>& approver) -> Future<Response> {
-      Try<bool> approved = approver->approved(ObjectApprover::Object());
-      if (approved.isError()) {
-        return InternalServerError("Authorization error: " + approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+          const ResourceProviderInfo& info =
+              call.update_resource_provider_config().info();
 
-      const ResourceProviderInfo& info =
-        call.update_resource_provider_config().info();
+          LOG(INFO)
+              << "Processing UPDATE_RESOURCE_PROVIDER_CONFIG call with type '"
+              << info.type() << "' and name '" << info.name() << "'";
 
-      LOG(INFO)
-        << "Processing UPDATE_RESOURCE_PROVIDER_CONFIG call with type '"
-        << info.type() << "' and name '" << info.name() << "'";
+          return slave->localResourceProviderDaemon->update(info)
+            .then([](bool updated) -> Response {
+                if (!updated) {
+                  return NotFound();
+                }
 
-      return slave->localResourceProviderDaemon->update(info)
-        .then([](bool updated) -> Response {
-          if (!updated) {
-            return NotFound();
-          }
+                return OK();
+              })
+            .repair([info](const Future<Response>& future) {
+                LOG(ERROR)
+                    << "Failed to update resource provider config with type '"
+                    << info.type() << "' and name '" << info.name() << "': "
+                    << future.failure();
 
-          return OK();
-        })
-        .repair([info](const Future<Response>& future) {
-          LOG(ERROR)
-            << "Failed to update resource provider config with type '"
-            << info.type() << "' and name '" << info.name() << "': "
-            << future.failure();
-
-          return InternalServerError(future.failure());
-        });
-    }));
+                return InternalServerError(future.failure());
+              });
+        }));
 }
 
 
@@ -3435,51 +3243,41 @@ Future<Response> Http::removeResourceProviderConfig(
   CHECK_EQ(mesos::agent::Call::REMOVE_RESOURCE_PROVIDER_CONFIG, call.type());
   CHECK(call.has_remove_resource_provider_config());
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject,
-        authorization::MODIFY_RESOURCE_PROVIDER_CONFIG);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return approver
-    .then(defer(slave->self(), [=](
-        const Owned<ObjectApprover>& approver) -> Future<Response> {
-      Try<bool> approved = approver->approved(ObjectApprover::Object());
-      if (approved.isError()) {
-        return InternalServerError("Authorization error: " + approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {MODIFY_RESOURCE_PROVIDER_CONFIG})
+    .then(defer(
+        slave->self(),
+        [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          if (!approvers->approved<MODIFY_RESOURCE_PROVIDER_CONFIG>()) {
+            return Forbidden();
+          }
 
-      const string& type = call.remove_resource_provider_config().type();
-      const string& name = call.remove_resource_provider_config().name();
+          const string& type = call.remove_resource_provider_config().type();
+          const string& name = call.remove_resource_provider_config().name();
 
-      LOG(INFO)
-        << "Processing REMOVE_RESOURCE_PROVIDER_CONFIG call with type '" << 
type
-        << "' and name '" << name << "'";
+          LOG(INFO)
+              << "Processing REMOVE_RESOURCE_PROVIDER_CONFIG call with type '"
+              << type << "' and name '" << name << "'";
 
-      return slave->localResourceProviderDaemon->remove(type, name)
-        .then([](bool removed) -> Response {
-          if (!removed) {
-            return NotFound();
-          }
+          return slave->localResourceProviderDaemon->remove(type, name)
+            .then([](bool removed) -> Response {
+                if (!removed) {
+                  return NotFound();
+                }
 
-          return OK();
-        })
-        .repair([type, name](const Future<Response>& future) {
-          LOG(ERROR)
-            << "Failed to remove resource provider config with type '" << type
-            << "' and name '" << name << "': " << future.failure();
+                return OK();
+              })
+            .repair([type, name](const Future<Response>& future) {
+              LOG(ERROR)
+                  << "Failed to remove resource provider config with type '"
+                  << type << "' and name '" << name << "': "
+                  << future.failure();
 
-          return InternalServerError(future.failure());
-        });
-    }));
+              return InternalServerError(future.failure());
+            });
+      }));
 }
 
 
@@ -3520,28 +3318,6 @@ Future<Response> Http::launchNestedContainerSession(
   LOG(INFO) << "Processing LAUNCH_NESTED_CONTAINER_SESSION call for container 
'"
             << call.launch_nested_container_session().container_id() << "'";
 
-  Future<Owned<AuthorizationAcceptor>> authorizer =
-    AuthorizationAcceptor::create(
-        principal,
-        slave->authorizer,
-        authorization::LAUNCH_NESTED_CONTAINER_SESSION);
-
-  Future<Response> response = authorizer
-    .then(defer(
-        slave->self(),
-        [=](const Owned<AuthorizationAcceptor>& authorizer) {
-          return _launchContainer(
-              call.launch_nested_container_session().container_id(),
-              call.launch_nested_container_session().command(),
-              None(),
-              call.launch_nested_container_session().has_container()
-                ? call.launch_nested_container_session().container()
-                : Option<ContainerInfo>::none(),
-              ContainerClass::DEBUG,
-              mediaTypes.accept,
-              authorizer);
-        }));
-
   // Helper to destroy the container.
   auto destroy = [this](const ContainerID& containerId) {
     slave->containerizer->destroy(containerId)
@@ -3551,99 +3327,125 @@ Future<Response> Http::launchNestedContainerSession(
       });
   };
 
-  // If `response` has failed or is not `OK`, the container will be
-  // destroyed by `_launchContainer`.
-  return response
-    .then(defer(slave->self(),
-                [=](const Response& response) -> Future<Response> {
-      const ContainerID& containerId =
-        call.launch_nested_container_session().container_id();
-
-      if (response.status != OK().status) {
-        return response;
-      }
-
-      // If launch is successful, attach to the container output.
-      mesos::agent::Call call;
-      call.set_type(mesos::agent::Call::ATTACH_CONTAINER_OUTPUT);
-      call.mutable_attach_container_output()->mutable_container_id()
-          ->CopyFrom(containerId);
-
-      // Instead of directly returning the response of `attachContainerOutput`
-      // to the client, we use a level of indirection to make sure the 
container
-      // is destroyed when the client connection breaks.
-      return attachContainerOutput(call, mediaTypes, principal)
-        .then(defer(slave->self(),
-                    [=](const Response& response) -> Future<Response> {
-          if (response.status != OK().status) {
-            LOG(WARNING) << "Failed to attach to nested container "
-                         << containerId << ": '" << response.status << "' ("
-                         << response.body << ")";
-
-            destroy(containerId);
-            return response;
-          }
-
-          Pipe pipe;
-          Pipe::Writer writer = pipe.writer();
-
-          OK ok;
-          ok.headers = response.headers; // Reuse the headers from the 
response.
-          ok.type = Response::PIPE;
-          ok.reader = pipe.reader();
-
-          CHECK_EQ(Response::PIPE, response.type);
-          CHECK_SOME(response.reader);
-          Pipe::Reader reader = response.reader.get();
-
-          // Read from the `response` pipe and write to
-          // the client's response pipe.
-          // NOTE: Need to cast the lambda to std::function here because of a
-          // limitation of `defer`; `defer` does not work with `mutable` 
lambda.
-          std::function<void (const Future<Nothing>&)> _connect =
-            [=](const Future<Nothing>& future) mutable {
-              CHECK(!future.isDiscarded());
-
-              if (future.isFailed()) {
-                LOG(WARNING) << "Failed to send attach response for "
-                             << containerId << ": " << future.failure();
-
-                writer.fail(future.failure());
-                reader.close();
-              } else {
-                // EOF case.
-                LOG(INFO) << "Received EOF attach response for " << 
containerId;
-
-                writer.close();
-                reader.close();
-              }
-
-              destroy(containerId);
-          };
-
-          connect(reader, writer)
-            .onAny(defer(slave->self(), _connect));
-
-          // Destroy the container if the connection to client is closed.
-          writer.readerClosed()
-            .onAny(defer(slave->self(), [=](const Future<Nothing>& future) {
-              LOG(WARNING)
-                << "Launch nested container session connection"
-                << " for container " << containerId << " closed"
-                << (future.isFailed() ? ": " + future.failure() : "");
-
-              destroy(containerId);
-            }));
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {LAUNCH_NESTED_CONTAINER_SESSION})
+    .then(defer(
+      slave->self(),
+      [=](const Owned<ObjectApprovers>& approvers) {
+        return _launchContainer<LAUNCH_NESTED_CONTAINER_SESSION>(
+            call.launch_nested_container_session().container_id(),
+            call.launch_nested_container_session().command(),
+            None(),
+            call.launch_nested_container_session().has_container()
+              ? call.launch_nested_container_session().container()
+              : Option<ContainerInfo>::none(),
+            ContainerClass::DEBUG,
+            mediaTypes.accept,
+            approvers);
+      }))
+    .then(defer(
+      slave->self(),
+      [=](const Response& response) -> Future<Response> {
+        // If `response` has failed or is not `OK`, the container will be
+        // destroyed by `_launchContainer`.
+        const ContainerID& containerId =
+            call.launch_nested_container_session().container_id();
+
+        if (response.status != OK().status) {
+          return response;
+        }
 
-          return ok;
-        }))
-        .onFailed(defer(slave->self(), [=](const string& failure) {
-          LOG(WARNING) << "Failed to attach to nested container "
-                       << containerId << ": " << failure;
+        // If launch is successful, attach to the container output.
+        mesos::agent::Call call;
+        call.set_type(mesos::agent::Call::ATTACH_CONTAINER_OUTPUT);
+        call.mutable_attach_container_output()->mutable_container_id()
+            ->CopyFrom(containerId);
+
+        // Instead of directly returning the response of 
`attachContainerOutput`
+        // to the client, we use a level of indirection to make sure the
+        // container is destroyed when the client connection breaks.
+        return attachContainerOutput(call, mediaTypes, principal)
+          .then(defer(
+              slave->self(),
+              [=](const Response& response) -> Future<Response> {
+                if (response.status != OK().status) {
+                  LOG(WARNING) << "Failed to attach to nested container "
+                               << containerId << ": '" << response.status
+                                << "' (" << response.body << ")";
+
+                  destroy(containerId);
+                  return response;
+                }
 
-          destroy(containerId);
-        }));
-    }));
+                Pipe pipe;
+                Pipe::Writer writer = pipe.writer();
+
+                OK ok;
+                ok.headers = response.headers; // Reuse headers from response.
+                ok.type = Response::PIPE;
+                ok.reader = pipe.reader();
+
+                CHECK_EQ(Response::PIPE, response.type);
+                CHECK_SOME(response.reader);
+                Pipe::Reader reader = response.reader.get();
+
+                // Read from the `response` pipe and write to
+                // the client's response pipe.
+                // NOTE: Need to cast the lambda to std::function here because
+                // of a limitation of `defer`; `defer` does not work with
+                // `mutable` lambda.
+                std::function<void (const Future<Nothing>&)> _connect =
+                    [=](const Future<Nothing>& future) mutable {
+                      CHECK(!future.isDiscarded());
+
+                      if (future.isFailed()) {
+                        LOG(WARNING) << "Failed to send attach response for "
+                                     << containerId << ": " << 
future.failure();
+
+                        writer.fail(future.failure());
+                        reader.close();
+                      } else {
+                        // EOF case.
+                        LOG(INFO) << "Received EOF attach response for "
+                                  << containerId;
+
+                        writer.close();
+                        reader.close();
+                      }
+
+                      destroy(containerId);
+                };
+
+                connect(reader, writer).onAny(defer(slave->self(), _connect));
+
+                // Destroy the container if the connection to client is closed.
+                writer.readerClosed()
+                  .onAny(defer(
+                      slave->self(),
+                      [=](const Future<Nothing>& future) {
+                        LOG(WARNING)
+                            << "Launch nested container session connection"
+                            << " for container " << containerId << " closed"
+                            << (future.isFailed()
+                                ? ": " + future.failure()
+                                : "");
+
+                        destroy(containerId);
+                      }));
+
+                return ok;
+              }))
+          .onFailed(defer(
+              slave->self(),
+              [=](const string& failure) {
+                LOG(WARNING) << "Failed to attach to nested container "
+                             << containerId << ": " << failure;
+
+                destroy(containerId);
+              }));
+      }));
 }
 
 
@@ -3760,47 +3562,32 @@ Future<Response> Http::attachContainerOutput(
 
   LOG(INFO) << "Processing ATTACH_CONTAINER_OUTPUT call for container '"
             << call.attach_container_output().container_id() << "'";
+  return ObjectApprovers::create(
+      slave->authorizer,
+      principal,
+      {ATTACH_CONTAINER_OUTPUT})
+    .then(defer(
+        slave->self(),
+        [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          const ContainerID& containerId =
+              call.attach_container_output().container_id();
+
+          Executor* executor = slave->getExecutor(containerId);
+          if (executor == nullptr) {
+            return NotFound(
+                "Container " + stringify(containerId) + " cannot be found");
+          }
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::ATTACH_CONTAINER_OUTPUT);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
-
-  return approver.then(defer(slave->self(),
-    [this, call, mediaTypes](
-        const Owned<ObjectApprover>& attachOutputApprover) -> Future<Response> 
{
-      const ContainerID& containerId =
-        call.attach_container_output().container_id();
-
-      Executor* executor = slave->getExecutor(containerId);
-      if (executor == nullptr) {
-        return NotFound(
-            "Container " + stringify(containerId) + " cannot be found");
-      }
-
-      Framework* framework = slave->getFramework(executor->frameworkId);
-      CHECK_NOTNULL(framework);
-
-      Try<bool> approved = attachOutputApprover->approved(
-          ObjectApprover::Object(
-              executor->info,
-              framework->info,
-              containerId));
+          Framework* framework = slave->getFramework(executor->frameworkId);
+          CHECK_NOTNULL(framework);
 
-      if (approved.isError()) {
-        return Failure(approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+          if (!approvers->approved<ATTACH_CONTAINER_OUTPUT>(
+                  executor->info, framework->info, containerId)) {
+            return Forbidden();
+          }
 
-      return _attachContainerOutput(call, mediaTypes);
-  }));
+          return _attachContainerOutput(call, mediaTypes);
+      }));
 }
 
 } // namespace slave {

Reply via email to