This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d6100  Require Datasource WRITE authorization for Supervisor and 
Task access (#11718)
f2d6100 is described below

commit f2d6100124dbe7cbc92ad91d28bd12a1800a1f2a
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Oct 8 10:39:48 2021 +0530

    Require Datasource WRITE authorization for Supervisor and Task access 
(#11718)
    
    Follow up PR for #11680
    
    Description
    Supervisor and Task APIs are related to ingestion and must always require 
Datasource WRITE
    authorization even if they are purely informative.
    
    Changes
    Check Datasource WRITE in SystemSchema for tables "supervisors" and "tasks"
    Check Datasource WRITE for APIs /supervisor/history and 
/supervisor/{id}/history
    Check Datasource for all Indexing Task APIs
---
 docs/operations/security-user-auth.md              |   3 +-
 .../druid/indexing/common/task/AbstractTask.java   |  16 +
 .../task/AppenderatorDriverRealtimeIndexTask.java  |   5 +-
 .../indexing/common/task/HadoopIndexTask.java      |   3 +-
 .../druid/indexing/common/task/IndexTask.java      |   7 +-
 .../druid/indexing/common/task/IndexTaskUtils.java |  13 +-
 .../parallel/ParallelIndexSupervisorTask.java      |  38 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |   7 +-
 .../indexing/overlord/http/OverlordResource.java   |   6 +-
 .../overlord/supervisor/SupervisorResource.java    |   8 +-
 .../SeekableStreamIndexTaskRunner.java             |  32 +-
 .../druid/indexing/common/task/IndexTaskTest.java  |  84 +++++
 .../overlord/http/OverlordResourceTest.java        | 179 +++++++++-
 .../security/SupervisorResourceFilterTest.java     |   2 +-
 .../supervisor/SupervisorResourceTest.java         |   9 +
 .../SeekableStreamIndexTaskRunnerAuthTest.java     | 393 +++++++++++++++++++++
 .../docker/ldap-configs/bootstrap.ldif             |  74 ++--
 .../security/AbstractAuthConfigurationTest.java    | 175 ++++++---
 .../security/ITBasicAuthConfigurationTest.java     |  36 +-
 .../security/ITBasicAuthLdapConfigurationTest.java |  27 +-
 .../druid/sql/calcite/schema/SystemSchema.java     |   4 +-
 .../druid/sql/calcite/schema/SystemSchemaTest.java | 361 ++++++++++++-------
 22 files changed, 1170 insertions(+), 312 deletions(-)

diff --git a/docs/operations/security-user-auth.md 
b/docs/operations/security-user-auth.md
index 54f4317..e5e5cab 100644
--- a/docs/operations/security-user-auth.md
+++ b/docs/operations/security-user-auth.md
@@ -139,7 +139,8 @@ Queries on the [system schema 
tables](../querying/sql.md#system-schema) require
 - `segments`: Segments will be filtered based on DATASOURCE READ permissions.
 - `servers`: The user requires STATE READ permissions.
 - `server_segments`: The user requires STATE READ permissions and segments 
will be filtered based on DATASOURCE READ permissions.
-- `tasks`: Tasks will be filtered based on DATASOURCE READ permissions.
+- `tasks`: Druid filters tasks according to DATASOURCE WRITE permissions.
+- `supervisors`: Druid filters supervisors according to DATASOURCE WRITE 
permissions.
 
 ## Configuration Propagation
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 8964a1e..e31270c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -30,9 +30,12 @@ import 
org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ForbiddenException;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -164,6 +167,19 @@ public abstract class AbstractTask implements Task
     return TaskStatus.success(getId());
   }
 
+  /**
+   * Authorizes WRITE action on a task's datasource
+   *
+   * @throws ForbiddenException if not authorized
+   */
+  public void authorizeRequestForDatasourceWrite(
+      HttpServletRequest request,
+      AuthorizerMapper authorizerMapper
+  ) throws ForbiddenException
+  {
+    IndexTaskUtils.authorizeRequestForDatasourceWrite(request, dataSource, 
authorizerMapper);
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 8908014..4e1c5e7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -84,7 +84,6 @@ import 
org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
 import org.apache.druid.segment.realtime.plumber.Committers;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -530,7 +529,7 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
       @Context final HttpServletRequest req
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> totalsMap = new HashMap<>();
     Map<String, Object> averagesMap = new HashMap<>();
@@ -556,7 +555,7 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
       @Context final HttpServletRequest req
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
         parseExceptionHandler.getSavedParseExceptions()
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index b04097c..97e5ec0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -63,7 +63,6 @@ import 
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.hadoop.mapred.JobClient;
@@ -634,7 +633,7 @@ public class HadoopIndexTask extends HadoopTask implements 
ChatHandler
       @QueryParam("windows") List<Integer> windows
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> totalsMap = new HashMap<>();
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a993b53..c68a2b4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -95,7 +95,6 @@ import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
 import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -285,7 +284,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     return Response.ok(doGetUnparseableEvents(full)).build();
   }
 
@@ -394,7 +393,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     return Response.ok(doGetRowStats(full)).build();
   }
 
@@ -406,7 +405,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
     Map<String, Object> payload = new HashMap<>();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index abc6b92..5be50b6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -56,28 +56,25 @@ public class IndexTaskUtils
   }
 
   /**
-   * Authorizes action to be performed on a task's datasource
+   * Authorizes WRITE action on a task's datasource
    *
-   * @return authorization result
+   * @throws ForbiddenException if not authorized
    */
-  public static Access datasourceAuthorizationCheck(
+  public static void authorizeRequestForDatasourceWrite(
       final HttpServletRequest req,
-      Action action,
       String datasource,
       AuthorizerMapper authorizerMapper
-  )
+  ) throws ForbiddenException
   {
     ResourceAction resourceAction = new ResourceAction(
         new Resource(datasource, ResourceType.DATASOURCE),
-        action
+        Action.WRITE
     );
 
     Access access = AuthorizationUtils.authorizeResourceAction(req, 
resourceAction, authorizerMapper);
     if (!access.isAllowed()) {
       throw new ForbiddenException(access.toString());
     }
-
-    return access;
   }
 
   public static void setTaskDimensions(final ServiceMetricEvent.Builder 
metricBuilder, final Task task)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 201a48e..3dfc9c7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -53,7 +53,6 @@ import 
org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
-import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
@@ -75,8 +74,6 @@ import 
org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.segment.realtime.firehose.ChatHandlers;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.BuildingShardSpec;
@@ -1176,7 +1173,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       @Context final HttpServletRequest req
   )
   {
-    ChatHandlers.authorizationCheck(req, Action.READ, getDataSource(), 
authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
 
     if (toolbox == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1255,12 +1252,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       @Context final HttpServletRequest req
   )
   {
-    ChatHandlers.authorizationCheck(
-        req,
-        Action.WRITE,
-        getDataSource(),
-        authorizerMapper
-    );
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     if (currentSubTaskHolder == null || currentSubTaskHolder.getTask() == 
null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
     } else {
@@ -1278,7 +1270,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getMode(@Context final HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
   }
 
@@ -1287,7 +1279,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getPhaseName(@Context final HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     if (isParallelMode()) {
       final ParallelIndexTaskRunner runner = getCurrentRunner();
       if (runner == null) {
@@ -1305,7 +1297,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getProgress(@Context final HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1319,7 +1311,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getRunningTasks(@Context final HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1333,7 +1325,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getSubTaskSpecs(@Context final HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1347,7 +1339,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getRunningSubTaskSpecs(@Context final HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1361,7 +1353,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getCompleteSubTaskSpecs(@Context final HttpServletRequest 
req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1375,7 +1367,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getSubTaskSpec(@PathParam("id") String id, @Context final 
HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
 
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
@@ -1395,7 +1387,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Produces(MediaType.APPLICATION_JSON)
   public Response getSubTaskState(@PathParam("id") String id, @Context final 
HttpServletRequest req)
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1417,7 +1409,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       @Context final HttpServletRequest req
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
     if (currentRunner == null) {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task 
is not running yet").build();
@@ -1569,7 +1561,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     return Response.ok(doGetRowStatsAndUnparseableEvents(full, 
false).lhs).build();
   }
 
@@ -1614,8 +1606,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     return Response.ok(doGetLiveReports(full)).build();
   }
+
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 267df48..f979bc8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -67,7 +67,6 @@ import 
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
@@ -488,7 +487,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     Map<String, List<String>> events = new HashMap<>();
 
     boolean needsBuildSegments = false;
@@ -563,7 +562,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     return Response.ok(doGetRowStats(full)).build();
   }
 
@@ -595,7 +594,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
       @QueryParam("full") String full
   )
   {
-    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    authorizeRequestForDatasourceWrite(req, authorizerMapper);
     return Response.ok(doGetLiveReports(full)).build();
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 0076eb0..1f596dd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -575,11 +575,11 @@ public class OverlordResource
       }
     }
     // early authorization check if datasource != null
-    // fail fast if user not authorized to access datasource
+    // fail fast if user not authorized to write to datasource
     if (dataSource != null) {
       final ResourceAction resourceAction = new ResourceAction(
           new Resource(dataSource, ResourceType.DATASOURCE),
-          Action.READ
+          Action.WRITE
       );
       final Access authResult = AuthorizationUtils.authorizeResourceAction(
           req,
@@ -987,7 +987,7 @@ public class OverlordResource
         );
       }
       return Collections.singletonList(
-          new ResourceAction(new Resource(taskDatasource, 
ResourceType.DATASOURCE), Action.READ)
+          new ResourceAction(new Resource(taskDatasource, 
ResourceType.DATASOURCE), Action.WRITE)
       );
     };
     List<TaskStatusPlus> optionalTypeFilteredList = collectionToFilter;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 74c5b61..9104666 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -65,7 +65,7 @@ import java.util.stream.Collectors;
 @Path("/druid/indexer/v1/supervisor")
 public class SupervisorResource
 {
-  private static final Function<VersionedSupervisorSpec, 
Iterable<ResourceAction>> SPEC_DATASOURCE_READ_RA_GENERATOR =
+  private static final Function<VersionedSupervisorSpec, 
Iterable<ResourceAction>> SPEC_DATASOURCE_WRITE_RA_GENERATOR =
       supervisorSpec -> {
         if (supervisorSpec.getSpec() == null) {
           return null;
@@ -75,7 +75,7 @@ public class SupervisorResource
         }
         return Iterables.transform(
             supervisorSpec.getSpec().getDataSources(),
-            AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
+            AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR
         );
       };
 
@@ -376,7 +376,7 @@ public class SupervisorResource
             AuthorizationUtils.filterAuthorizedResources(
                 req,
                 manager.getSupervisorHistory(),
-                SPEC_DATASOURCE_READ_RA_GENERATOR,
+                SPEC_DATASOURCE_WRITE_RA_GENERATOR,
                 authorizerMapper
             )
         ).build()
@@ -401,7 +401,7 @@ public class SupervisorResource
                     AuthorizationUtils.filterAuthorizedResources(
                         req,
                         historyForId,
-                        SPEC_DATASOURCE_READ_RA_GENERATOR,
+                        SPEC_DATASOURCE_WRITE_RA_GENERATOR,
                         authorizerMapper
                     )
                 );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index fba9352..50bf440 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -81,8 +81,6 @@ import 
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul
 import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.utils.CollectionUtils;
@@ -1361,12 +1359,10 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
   /**
    * Authorizes action to be performed on this task's datasource
-   *
-   * @return authorization result
    */
-  private Access authorizationCheck(final HttpServletRequest req, Action 
action)
+  private void authorizeRequest(final HttpServletRequest req)
   {
-    return IndexTaskUtils.datasourceAuthorizationCheck(req, action, 
task.getDataSource(), authorizerMapper);
+    task.authorizeRequestForDatasourceWrite(req, authorizerMapper);
   }
 
   public Appenderator getAppenderator()
@@ -1443,7 +1439,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   @Path("/stop")
   public Response stop(@Context final HttpServletRequest req)
   {
-    authorizationCheck(req, Action.WRITE);
+    authorizeRequest(req);
     stopGracefully();
     return Response.status(Response.Status.OK).build();
   }
@@ -1453,7 +1449,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   @Produces(MediaType.APPLICATION_JSON)
   public Status getStatusHTTP(@Context final HttpServletRequest req)
   {
-    authorizationCheck(req, Action.READ);
+    authorizeRequest(req);
     return status;
   }
 
@@ -1468,7 +1464,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   @Produces(MediaType.APPLICATION_JSON)
   public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets(@Context 
final HttpServletRequest req)
   {
-    authorizationCheck(req, Action.READ);
+    authorizeRequest(req);
     return getCurrentOffsets();
   }
 
@@ -1482,7 +1478,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   @Produces(MediaType.APPLICATION_JSON)
   public Map<PartitionIdType, SequenceOffsetType> getEndOffsetsHTTP(@Context 
final HttpServletRequest req)
   {
-    authorizationCheck(req, Action.READ);
+    authorizeRequest(req);
     return getEndOffsets();
   }
 
@@ -1502,7 +1498,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       @Context final HttpServletRequest req
   ) throws InterruptedException
   {
-    authorizationCheck(req, Action.WRITE);
+    authorizeRequest(req);
     return setEndOffsets(sequences, finish);
   }
 
@@ -1552,7 +1548,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       @Context final HttpServletRequest req
   )
   {
-    authorizationCheck(req, Action.READ);
+    authorizeRequest(req);
     return Response.ok(doGetRowStats()).build();
   }
 
@@ -1563,7 +1559,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       @Context final HttpServletRequest req
   )
   {
-    authorizationCheck(req, Action.READ);
+    authorizeRequest(req);
     return Response.ok(doGetLiveReports()).build();
   }
 
@@ -1575,7 +1571,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       @Context final HttpServletRequest req
   )
   {
-    authorizationCheck(req, Action.READ);
+    authorizeRequest(req);
     List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
         parseExceptionHandler.getSavedParseExceptions()
     );
@@ -1726,7 +1722,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       @Context final HttpServletRequest req
   )
   {
-    authorizationCheck(req, Action.READ);
+    authorizeRequest(req);
     return getCheckpoints();
   }
 
@@ -1753,7 +1749,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       @Context final HttpServletRequest req
   ) throws InterruptedException
   {
-    authorizationCheck(req, Action.WRITE);
+    authorizeRequest(req);
     return pause();
   }
 
@@ -1808,7 +1804,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   @Path("/resume")
   public Response resumeHTTP(@Context final HttpServletRequest req) throws 
InterruptedException
   {
-    authorizationCheck(req, Action.WRITE);
+    authorizeRequest(req);
     resume();
     return Response.status(Response.Status.OK).build();
   }
@@ -1841,7 +1837,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   @Produces(MediaType.APPLICATION_JSON)
   public DateTime getStartTime(@Context final HttpServletRequest req)
   {
-    authorizationCheck(req, Action.WRITE);
+    authorizeRequest(req);
     return startTime;
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index f805b2c..5ff9d4f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -90,6 +90,14 @@ import 
org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
 import 
org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.Authorizer;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -111,6 +119,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
@@ -2605,6 +2614,81 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
+  public void testAuthorizeRequestForDatasourceWrite() throws Exception
+  {
+    // Need to run this only once
+    if (lockGranularity == LockGranularity.SEGMENT) {
+      return;
+    }
+
+    // Create auth mapper which allows datasourceReadUser to read datasource
+    // and datasourceWriteUser to write to datasource
+    final String datasourceWriteUser = "datasourceWriteUser";
+    final String datasourceReadUser = "datasourceReadUser";
+    AuthorizerMapper authorizerMapper = new AuthorizerMapper(null) {
+      @Override
+      public Authorizer getAuthorizer(String name)
+      {
+        return (authenticationResult, resource, action) -> {
+          final String username = authenticationResult.getIdentity();
+          if (!resource.getType().equals(ResourceType.DATASOURCE) || username 
== null) {
+            return new Access(false);
+          } else if (action == Action.WRITE) {
+            return new Access(username.equals(datasourceWriteUser));
+          } else {
+            return new Access(username.equals(datasourceReadUser));
+          }
+        };
+      }
+    };
+
+    // Create test target
+    final IndexTask indexTask = new IndexTask(
+        null,
+        null,
+        createDefaultIngestionSpec(
+            jsonMapper,
+            temporaryFolder.newFolder(),
+            null,
+            null,
+            createTuningConfigWithMaxRowsPerSegment(2, true),
+            false,
+            false
+        ),
+        null
+    );
+
+    // Verify that datasourceWriteUser is successfully authorized
+    HttpServletRequest writeUserRequest = 
EasyMock.mock(HttpServletRequest.class);
+    expectAuthorizationTokenCheck(datasourceWriteUser, writeUserRequest);
+    EasyMock.replay(writeUserRequest);
+    indexTask.authorizeRequestForDatasourceWrite(writeUserRequest, 
authorizerMapper);
+
+    // Verify that datasourceReadUser is not successfully authorized
+    HttpServletRequest readUserRequest = 
EasyMock.mock(HttpServletRequest.class);
+    expectAuthorizationTokenCheck(datasourceReadUser, readUserRequest);
+    EasyMock.replay(readUserRequest);
+    expectedException.expect(ForbiddenException.class);
+    indexTask.authorizeRequestForDatasourceWrite(readUserRequest, 
authorizerMapper);
+  }
+
+  private void expectAuthorizationTokenCheck(String username, 
HttpServletRequest request)
+  {
+    AuthenticationResult authenticationResult = new 
AuthenticationResult(username, "druid", null, null);
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+            .andReturn(authenticationResult)
+            .atLeastOnce();
+
+    request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
+    EasyMock.expectLastCall().anyTimes();
+
+    request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+    EasyMock.expectLastCall().anyTimes();
+  }
+
+  @Test
   public void testEqualsAndHashCode()
   {
     EqualsVerifier.forClass(IndexTuningConfig.class)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index c475166..ae494ee 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -71,6 +71,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.util.Arrays;
@@ -116,10 +117,24 @@ public class OverlordResourceTest
           @Override
           public Access authorize(AuthenticationResult authenticationResult, 
Resource resource, Action action)
           {
-            if (resource.getName().equals("allow")) {
-              return new Access(true);
-            } else {
-              return new Access(false);
+            final String username = authenticationResult.getIdentity();
+            switch (resource.getName()) {
+              case "allow":
+                return new Access(true);
+              case Datasources.WIKIPEDIA:
+                // All users can read wikipedia but only writer can write
+                return new Access(
+                    action == Action.READ
+                    || (action == Action.WRITE && 
Users.WIKI_WRITER.equals(username))
+                );
+              case Datasources.BUZZFEED:
+                // All users can read buzzfeed but only writer can write
+                return new Access(
+                    action == Action.READ
+                    || (action == Action.WRITE && 
Users.BUZZ_WRITER.equals(username))
+                );
+              default:
+                return new Access(false);
             }
           }
 
@@ -842,6 +857,104 @@ public class OverlordResourceTest
   }
 
   @Test
+  public void testGetTasksRequiresDatasourceWrite()
+  {
+    // Setup mocks for a user who has write access to "wikipedia"
+    // and read access to "buzzfeed"
+    expectAuthorizationTokenCheck(Users.WIKI_WRITER);
+
+    // Setup mocks to return completed, active, known, pending and running 
tasks
+    
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null,
 null, null)).andStubReturn(
+        ImmutableList.of(
+            createTaskInfo("id_5", Datasources.WIKIPEDIA),
+            createTaskInfo("id_6", Datasources.BUZZFEED)
+        )
+    );
+
+    
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
+        ImmutableList.of(
+            createTaskInfo("id_1", Datasources.WIKIPEDIA),
+            createTaskInfo("id_2", Datasources.BUZZFEED)
+        )
+    );
+
+    EasyMock.<Collection<? extends 
TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
+        ImmutableList.of(
+            new MockTaskRunnerWorkItem("id_1", null),
+            new MockTaskRunnerWorkItem("id_4", null)
+        )
+    ).atLeastOnce();
+
+    EasyMock.<Collection<? extends 
TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
+        ImmutableList.of(
+            new MockTaskRunnerWorkItem("id_4", null)
+        )
+    );
+
+    EasyMock.<Collection<? extends 
TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
+        ImmutableList.of(
+            new MockTaskRunnerWorkItem("id_1", null)
+        )
+    );
+
+    // Replay all mocks
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    // Verify that only the tasks of write access datasource are returned
+    List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) 
overlordResource
+        .getTasks(null, null, null, null, null, req)
+        .getEntity();
+    Assert.assertEquals(2, responseObjects.size());
+    for (TaskStatusPlus taskStatus : responseObjects) {
+      Assert.assertEquals(Datasources.WIKIPEDIA, taskStatus.getDataSource());
+    }
+  }
+
+  @Test
+  public void testGetTasksFilterByDatasourceRequiresWrite()
+  {
+    // Setup mocks for a user who has write access to "wikipedia"
+    // and read access to "buzzfeed"
+    expectAuthorizationTokenCheck(Users.WIKI_WRITER);
+
+    // Setup mocks to return completed, active, known, pending and running 
tasks
+    
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null,
 null, null)).andStubReturn(
+        ImmutableList.of(
+            createTaskInfo("id_5", Datasources.WIKIPEDIA),
+            createTaskInfo("id_6", Datasources.BUZZFEED)
+        )
+    );
+
+    
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
+        ImmutableList.of(
+            createTaskInfo("id_1", Datasources.WIKIPEDIA),
+            createTaskInfo("id_2", Datasources.BUZZFEED)
+        )
+    );
+
+    // Replay all mocks
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    // Verify that only the tasks of write access datasource are returned
+    expectedException.expect(WebApplicationException.class);
+    overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, 
req);
+  }
+
+  @Test
   public void testGetNullCompleteTask()
   {
     expectAuthorizationTokenCheck();
@@ -927,6 +1040,27 @@ public class OverlordResourceTest
   }
 
   @Test
+  public void testTaskPostDeniesDatasourceReadUser()
+  {
+    expectAuthorizationTokenCheck(Users.WIKI_WRITER);
+
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    // Verify that taskPost fails for user who has only datasource read access
+    Task task = NoopTask.create(Datasources.BUZZFEED);
+    expectedException.expect(ForbiddenException.class);
+    expectedException.expect(ForbiddenException.class);
+    overlordResource.taskPost(task, req);
+  }
+
+  @Test
   public void testKillPendingSegments()
   {
     expectAuthorizationTokenCheck();
@@ -1317,7 +1451,12 @@ public class OverlordResourceTest
 
   private void expectAuthorizationTokenCheck()
   {
-    AuthenticationResult authenticationResult = new 
AuthenticationResult("druid", "druid", null, null);
+    expectAuthorizationTokenCheck(Users.DRUID);
+  }
+
+  private void expectAuthorizationTokenCheck(String username)
+  {
+    AuthenticationResult authenticationResult = new 
AuthenticationResult(username, "druid", null, null);
     
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
     
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
     EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
@@ -1360,6 +1499,36 @@ public class OverlordResourceTest
     };
   }
 
+  private TaskInfo<Task, TaskStatus> createTaskInfo(String taskId, String 
datasource)
+  {
+    return new TaskInfo<>(
+        taskId,
+        DateTime.now(ISOChronology.getInstanceUTC()),
+        TaskStatus.success(taskId),
+        datasource,
+        getTaskWithIdAndDatasource(taskId, datasource)
+    );
+  }
+
+  /**
+   * Usernames to use in the tests.
+   */
+  private static class Users
+  {
+    private static final String DRUID = "druid";
+    private static final String WIKI_WRITER = "Wiki Writer";
+    private static final String BUZZ_WRITER = "Buzz Writer";
+  }
+
+  /**
+   * Datasource names to use in the tests.
+   */
+  private static class Datasources
+  {
+    private static final String WIKIPEDIA = "wikipedia";
+    private static final String BUZZFEED = "buzzfeed";
+  }
+
   private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem
   {
     public MockTaskRunnerWorkItem(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
index d38165c..118b8b5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
@@ -130,7 +130,7 @@ public class SupervisorResourceFilterTest
   )
   {
     expect(containerRequest.getPathSegments())
-        .andReturn(getPathSegments("/druid/indexer/v1/supervisor/datasource1"))
+        .andReturn(getPathSegments(path))
         .anyTimes();
     expect(containerRequest.getMethod()).andReturn(requestMethod).anyTimes();
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index c22460e..8c77ea0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -30,10 +30,12 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ResourceType;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -92,7 +94,14 @@ public class SupervisorResourceTest extends EasyMockSupport
           @Override
           public Authorizer getAuthorizer(String name)
           {
+            // Create an Authorizer that only allows Datasource WRITE requests
+            // because all SupervisorResource APIs must only check Datasource 
WRITE access
             return (authenticationResult, resource, action) -> {
+              if (!resource.getType().equals(ResourceType.DATASOURCE)
+                  || action != Action.WRITE) {
+                return new Access(false);
+              }
+
               if (authenticationResult.getIdentity().equals("druid")) {
                 return Access.OK;
               } else {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
new file mode 100644
index 0000000..3eba53f
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.Authorizer;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+
+/**
+ * Unit Test to test authorization requirements of
+ * {@link SeekableStreamIndexTaskRunner} and {@link SeekableStreamIndexTask}.
+ */
+public class SeekableStreamIndexTaskRunnerAuthTest
+{
+
+  /**
+   * Test target.
+   */
+  private TestSeekableStreamIndexTaskRunner taskRunner;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Before
+  public void setUp()
+  {
+    // Create an AuthorizerMapper that only allows access to a Datasource 
resource
+    AuthorizerMapper authorizerMapper = new AuthorizerMapper(null)
+    {
+      @Override
+      public Authorizer getAuthorizer(String name)
+      {
+        return (authenticationResult, resource, action) -> {
+          final String username = authenticationResult.getIdentity();
+
+          // Allow access to a Datasource if
+          // - any user requests Read access
+          // - or, Datasource Write User requests Write access
+          if (resource.getType().equals(ResourceType.DATASOURCE)) {
+            return new Access(
+                action == Action.READ
+                || (action == Action.WRITE && 
username.equals(Users.DATASOURCE_WRITE))
+            );
+          }
+
+          // Do not allow access to any other resource
+          return new Access(false);
+        };
+      }
+    };
+
+    DataSchema dataSchema = new DataSchema(
+        "datasource",
+        new TimestampSpec(null, null, null),
+        new DimensionsSpec(Collections.emptyList()),
+        new AggregatorFactory[]{},
+        new ArbitraryGranularitySpec(new AllGranularity(), 
Collections.emptyList()),
+        TransformSpec.NONE,
+        null,
+        null
+    );
+    SeekableStreamIndexTaskTuningConfig tuningConfig = 
mock(SeekableStreamIndexTaskTuningConfig.class);
+    
/*expect(tuningConfig.getIntermediateHandoffPeriod()).andReturn(Period.minutes(10));
+    expect(tuningConfig.isLogParseExceptions()).andReturn(false);
+    expect(tuningConfig.getMaxParseExceptions()).andReturn(1000);
+    expect(tuningConfig.getMaxSavedParseExceptions()).andReturn(100);
+
+    replay(tuningConfig);*/
+
+    SeekableStreamIndexTaskIOConfig<String, String> ioConfig = new 
TestSeekableStreamIndexTaskIOConfig();
+
+    // Initiliaze task and task runner
+    SeekableStreamIndexTask<String, String, ByteEntity> indexTask
+        = new TestSeekableStreamIndexTask("id", dataSchema, tuningConfig, 
ioConfig);
+    taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask, 
authorizerMapper);
+  }
+
+  @Test
+  public void testGetStatusHttp()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStatusHTTP);
+  }
+
+  @Test
+  public void testGetStartTime()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStartTime);
+  }
+
+  @Test
+  public void testStop()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(taskRunner::stop);
+  }
+
+  @Test
+  public void testPauseHttp()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(req -> {
+      try {
+        taskRunner.pauseHTTP(req);
+      }
+      catch (InterruptedException e) {
+        // ignore
+      }
+    });
+  }
+
+  @Test
+  public void testResumeHttp()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(req -> {
+      try {
+        taskRunner.resumeHTTP(req);
+      }
+      catch (InterruptedException e) {
+
+      }
+    });
+  }
+
+  @Test
+  public void testGetEndOffsets()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCurrentOffsets);
+  }
+
+  @Test
+  public void testSetEndOffsetsHttp()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(request -> {
+      try {
+        taskRunner.setEndOffsetsHTTP(Collections.emptyMap(), false, request);
+      }
+      catch (InterruptedException e) {
+
+      }
+    });
+  }
+
+  @Test
+  public void testGetCheckpointsHttp()
+  {
+    verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCheckpointsHTTP);
+  }
+
+
+  private void verifyOnlyDatasourceWriteUserCanAccess(
+      Consumer<HttpServletRequest> method
+  )
+  {
+    // Verify that datasource write user can access
+    HttpServletRequest allowedRequest = createRequest(Users.DATASOURCE_WRITE);
+    replay(allowedRequest);
+    method.accept(allowedRequest);
+
+    // Verify that no other user can access
+    HttpServletRequest blockedRequest = createRequest(Users.DATASOURCE_READ);
+    replay(blockedRequest);
+    expectedException.expect(ForbiddenException.class);
+    method.accept(blockedRequest);
+  }
+
+  private HttpServletRequest createRequest(String username)
+  {
+    HttpServletRequest request = mock(HttpServletRequest.class);
+
+    AuthenticationResult authenticationResult = new 
AuthenticationResult(username, "druid", null, null);
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+            .andReturn(authenticationResult)
+            .atLeastOnce();
+
+    request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
+    EasyMock.expectLastCall().anyTimes();
+
+    request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+    EasyMock.expectLastCall().anyTimes();
+
+    return request;
+  }
+
+  /**
+   * Dummy implementation used as the test target to test out the non-abstract 
methods.
+   */
+  private static class TestSeekableStreamIndexTaskRunner
+      extends SeekableStreamIndexTaskRunner<String, String, ByteEntity>
+  {
+
+    private TestSeekableStreamIndexTaskRunner(
+        SeekableStreamIndexTask<String, String, ByteEntity> task,
+        AuthorizerMapper authorizerMapper
+    )
+    {
+      super(task, null, authorizerMapper, LockGranularity.SEGMENT);
+    }
+
+    @Override
+    protected boolean isEndOfShard(String seqNum)
+    {
+      return false;
+    }
+
+    @Nullable
+    @Override
+    protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
+        TaskToolbox toolbox,
+        String checkpointsString
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected String getNextStartOffset(String sequenceNumber)
+    {
+      return null;
+    }
+
+    @Override
+    protected SeekableStreamEndSequenceNumbers<String, String> 
deserializePartitionsFromMetadata(
+        ObjectMapper mapper,
+        Object object
+    )
+    {
+      return null;
+    }
+
+    @Nonnull
+    @Override
+    protected List<OrderedPartitionableRecord<String, String, ByteEntity>> 
getRecords(
+        RecordSupplier<String, String, ByteEntity> recordSupplier,
+        TaskToolbox toolbox
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetadata(
+        SeekableStreamSequenceNumbers<String, String> partitions
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected OrderedSequenceNumber<String> createSequenceNumber(String 
sequenceNumber)
+    {
+      return null;
+    }
+
+    @Override
+    protected void possiblyResetDataSourceMetadata(
+        TaskToolbox toolbox,
+        RecordSupplier<String, String, ByteEntity> recordSupplier,
+        Set<StreamPartition<String>> assignment
+    )
+    {
+
+    }
+
+    @Override
+    protected boolean isEndOffsetExclusive()
+    {
+      return false;
+    }
+
+    @Override
+    protected TypeReference<List<SequenceMetadata<String, String>>> 
getSequenceMetadataTypeReference()
+    {
+      return null;
+    }
+  }
+
+  private static class TestSeekableStreamIndexTask extends 
SeekableStreamIndexTask<String, String, ByteEntity>
+  {
+
+    public TestSeekableStreamIndexTask(
+        String id,
+        DataSchema dataSchema,
+        SeekableStreamIndexTaskTuningConfig tuningConfig,
+        SeekableStreamIndexTaskIOConfig<String, String> ioConfig
+    )
+    {
+      super(id, null, dataSchema, tuningConfig, ioConfig, null, null);
+    }
+
+    @Override
+    public String getType()
+    {
+      return null;
+    }
+
+    @Override
+    protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
createTaskRunner()
+    {
+      return null;
+    }
+
+    @Override
+    protected RecordSupplier<String, String, ByteEntity> 
newTaskRecordSupplier()
+    {
+      return null;
+    }
+  }
+
+  private static class TestSeekableStreamIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<String, String>
+  {
+    public TestSeekableStreamIndexTaskIOConfig()
+    {
+      super(
+          null,
+          "someSequence",
+          new SeekableStreamStartSequenceNumbers<>("abc", "def", 
Collections.emptyMap(), Collections.emptyMap(), null),
+          new SeekableStreamEndSequenceNumbers<>("abc", "def", 
Collections.emptyMap(), Collections.emptyMap()),
+          false,
+          DateTimes.nowUtc().minusDays(2),
+          DateTimes.nowUtc(),
+          new CsvInputFormat(null, null, true, null, 0)
+      );
+    }
+  }
+
+  /**
+   * Usernames used in the tests.
+   */
+  private static class Users
+  {
+    private static final String DATASOURCE_READ = "datasourceRead";
+    private static final String DATASOURCE_WRITE = "datasourceWrite";
+  }
+
+}
diff --git a/integration-tests/docker/ldap-configs/bootstrap.ldif 
b/integration-tests/docker/ldap-configs/bootstrap.ldif
index 9614a78..d88265f 100644
--- a/integration-tests/docker/ldap-configs/bootstrap.ldif
+++ b/integration-tests/docker/ldap-configs/bootstrap.ldif
@@ -53,41 +53,41 @@ description: Admin users
 uniqueMember: uid=admin,ou=Users,dc=example,dc=org
 uniqueMember: uid=druid_system,ou=Users,dc=example,dc=org
 
-dn: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org
-uid: datasourceOnlyUser
-cn: datasourceOnlyUser
-sn: datasourceOnlyUser
+dn: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org
+uid: datasourceReadOnlyUser
+cn: datasourceReadOnlyUser
+sn: datasourceReadOnlyUser
 objectClass: top
 objectClass: posixAccount
 objectClass: inetOrgPerson
-homeDirectory: /home/datasourceOnlyUser
+homeDirectory: /home/datasourceReadOnlyUser
 uidNumber: 3
 gidNumber: 3
 userPassword: helloworld
 
-dn: cn=datasourceOnlyGroup,ou=Groups,dc=example,dc=org
+dn: cn=datasourceReadOnlyGroup,ou=Groups,dc=example,dc=org
 objectClass: groupOfUniqueNames
-cn: datasourceOnlyGroup
-description: datasourceOnlyGroup users
-uniqueMember: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org
-
-dn: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org
-uid: datasourceWithStateUser
-cn: datasourceWithStateUser
-sn: datasourceWithStateUser
+cn: datasourceReadOnlyGroup
+description: datasourceReadOnlyGroup users
+uniqueMember: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org
+
+dn: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org
+uid: datasourceReadWithStateUser
+cn: datasourceReadWithStateUser
+sn: datasourceReadWithStateUser
 objectClass: top
 objectClass: posixAccount
 objectClass: inetOrgPerson
-homeDirectory: /home/datasourceWithStateUser
+homeDirectory: /home/datasourceReadWithStateUser
 uidNumber: 4
 gidNumber: 4
 userPassword: helloworld
 
-dn: cn=datasourceWithStateGroup,ou=Groups,dc=example,dc=org
+dn: cn=datasourceReadWithStateGroup,ou=Groups,dc=example,dc=org
 objectClass: groupOfUniqueNames
-cn: datasourceWithStateGroup
-description: datasourceWithStateGroup users
-uniqueMember: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org
+cn: datasourceReadWithStateGroup
+description: datasourceReadWithStateGroup users
+uniqueMember: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org
 
 dn: uid=stateOnlyUser,ou=Users,dc=example,dc=org
 uid: stateOnlyUser
@@ -137,20 +137,38 @@ uidNumber: 7
 gidNumber: 7
 userPassword: helloworld
 
-dn: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org
-uid: datasourceAndSysUser
-cn: datasourceAndSysUser
-sn: datasourceAndSysUser
+dn: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org
+uid: datasourceReadAndSysUser
+cn: datasourceReadAndSysUser
+sn: datasourceReadAndSysUser
+objectClass: top
+objectClass: posixAccount
+objectClass: inetOrgPerson
+homeDirectory: /home/datasourceReadAndSysUser
+uidNumber: 8
+gidNumber: 8
+userPassword: helloworld
+
+dn: cn=datasourceReadWithSysGroup,ou=Groups,dc=example,dc=org
+objectClass: groupOfUniqueNames
+cn: datasourceReadWithSysGroup
+description: datasourceReadWithSysGroup users
+uniqueMember: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org
+
+dn: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org
+uid: datasourceWriteAndSysUser
+cn: datasourceWriteAndSysUser
+sn: datasourceWriteAndSysUser
 objectClass: top
 objectClass: posixAccount
 objectClass: inetOrgPerson
-homeDirectory: /home/datasourceAndSysUser
+homeDirectory: /home/datasourceWriteAndSysUser
 uidNumber: 8
 gidNumber: 8
 userPassword: helloworld
 
-dn: cn=datasourceWithSysGroup,ou=Groups,dc=example,dc=org
+dn: cn=datasourceWriteWithSysGroup,ou=Groups,dc=example,dc=org
 objectClass: groupOfUniqueNames
-cn: datasourceWithSysGroup
-description: datasourceWithSysGroup users
-uniqueMember: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org
+cn: datasourceWriteWithSysGroup
+description: datasourceWriteWithSysGroup users
+uniqueMember: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
index 42556ad..c29be8d 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
@@ -98,7 +98,7 @@ public abstract class AbstractAuthConfigurationTest
    * create a ResourceAction set of permissions that can only read a 
'auth_test' datasource, for Authorizer
    * implementations which use ResourceAction pattern matching
    */
-  protected static final List<ResourceAction> DATASOURCE_ONLY_PERMISSIONS = 
Collections.singletonList(
+  protected static final List<ResourceAction> DATASOURCE_READ_ONLY_PERMISSIONS 
= Collections.singletonList(
       new ResourceAction(
           new Resource("auth_test", ResourceType.DATASOURCE),
           Action.READ
@@ -109,7 +109,7 @@ public abstract class AbstractAuthConfigurationTest
    * create a ResourceAction set of permissions that can only read 'auth_test' 
+ partial SYSTEM_TABLE, for Authorizer
    * implementations which use ResourceAction pattern matching
    */
-  protected static final List<ResourceAction> DATASOURCE_SYS_PERMISSIONS = 
ImmutableList.of(
+  protected static final List<ResourceAction> DATASOURCE_READ_SYS_PERMISSIONS 
= ImmutableList.of(
       new ResourceAction(
           new Resource("auth_test", ResourceType.DATASOURCE),
           Action.READ
@@ -135,10 +135,38 @@ public abstract class AbstractAuthConfigurationTest
   );
 
   /**
+   * create a ResourceAction set of permissions that can write datasource 
'auth_test'
+   */
+  protected static final List<ResourceAction> DATASOURCE_WRITE_SYS_PERMISSIONS 
= ImmutableList.of(
+      new ResourceAction(
+          new Resource("auth_test", ResourceType.DATASOURCE),
+          Action.WRITE
+      ),
+      new ResourceAction(
+          new Resource("segments", ResourceType.SYSTEM_TABLE),
+          Action.READ
+      ),
+      // test missing state permission but having servers permission
+      new ResourceAction(
+          new Resource("servers", ResourceType.SYSTEM_TABLE),
+          Action.READ
+      ),
+      // test missing state permission but having server_segments permission
+      new ResourceAction(
+          new Resource("server_segments", ResourceType.SYSTEM_TABLE),
+          Action.READ
+      ),
+      new ResourceAction(
+          new Resource("tasks", ResourceType.SYSTEM_TABLE),
+          Action.READ
+      )
+  );
+
+  /**
    * create a ResourceAction set of permissions that can only read 'auth_test' 
+ STATE + SYSTEM_TABLE read access, for
    * Authorizer implementations which use ResourceAction pattern matching
    */
-  protected static final List<ResourceAction> DATASOURCE_SYS_STATE_PERMISSIONS 
= ImmutableList.of(
+  protected static final List<ResourceAction> 
DATASOURCE_READ_SYS_STATE_PERMISSIONS = ImmutableList.of(
       new ResourceAction(
           new Resource("auth_test", ResourceType.DATASOURCE),
           Action.READ
@@ -187,16 +215,18 @@ public abstract class AbstractAuthConfigurationTest
   protected CoordinatorResourceTestClient coordinatorClient;
 
   protected HttpClient adminClient;
-  protected HttpClient datasourceOnlyUserClient;
-  protected HttpClient datasourceAndSysUserClient;
-  protected HttpClient datasourceWithStateUserClient;
+  protected HttpClient datasourceReadOnlyUserClient;
+  protected HttpClient datasourceReadAndSysUserClient;
+  protected HttpClient datasourceWriteAndSysUserClient;
+  protected HttpClient datasourceReadWithStateUserClient;
   protected HttpClient stateOnlyUserClient;
   protected HttpClient internalSystemClient;
 
 
-  protected abstract void setupDatasourceOnlyUser() throws Exception;
-  protected abstract void setupDatasourceAndSysTableUser() throws Exception;
-  protected abstract void setupDatasourceAndSysAndStateUser() throws Exception;
+  protected abstract void setupDatasourceReadOnlyUser() throws Exception;
+  protected abstract void setupDatasourceReadAndSysTableUser() throws 
Exception;
+  protected abstract void setupDatasourceWriteAndSysTableUser() throws 
Exception;
+  protected abstract void setupDatasourceReadAndSysAndStateUser() throws 
Exception;
   protected abstract void setupSysTableAndStateOnlyUser() throws Exception;
   protected abstract void setupTestSpecificHttpClients() throws Exception;
   protected abstract String getAuthenticatorName();
@@ -242,44 +272,44 @@ public abstract class AbstractAuthConfigurationTest
   }
 
   @Test
-  public void test_systemSchemaAccess_datasourceOnlyUser() throws Exception
+  public void test_systemSchemaAccess_datasourceReadOnlyUser() throws Exception
   {
     // check that we can access a datasource-permission restricted resource on 
the broker
     HttpUtil.makeRequest(
-        datasourceOnlyUserClient,
+        datasourceReadOnlyUserClient,
         HttpMethod.GET,
         config.getBrokerUrl() + "/druid/v2/datasources/auth_test",
         null
     );
 
     // as user that can only read auth_test
-    LOG.info("Checking sys.segments query as datasourceOnlyUser...");
+    LOG.info("Checking sys.segments query as datasourceReadOnlyUser...");
     verifySystemSchemaQueryFailure(
-        datasourceOnlyUserClient,
+        datasourceReadOnlyUserClient,
         SYS_SCHEMA_SEGMENTS_QUERY,
         HttpResponseStatus.FORBIDDEN,
         "{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
     );
 
-    LOG.info("Checking sys.servers query as datasourceOnlyUser...");
+    LOG.info("Checking sys.servers query as datasourceReadOnlyUser...");
     verifySystemSchemaQueryFailure(
-        datasourceOnlyUserClient,
+        datasourceReadOnlyUserClient,
         SYS_SCHEMA_SERVERS_QUERY,
         HttpResponseStatus.FORBIDDEN,
         "{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
     );
 
-    LOG.info("Checking sys.server_segments query as datasourceOnlyUser...");
+    LOG.info("Checking sys.server_segments query as 
datasourceReadOnlyUser...");
     verifySystemSchemaQueryFailure(
-        datasourceOnlyUserClient,
+        datasourceReadOnlyUserClient,
         SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
         HttpResponseStatus.FORBIDDEN,
         "{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
     );
 
-    LOG.info("Checking sys.tasks query as datasourceOnlyUser...");
+    LOG.info("Checking sys.tasks query as datasourceReadOnlyUser...");
     verifySystemSchemaQueryFailure(
-        datasourceOnlyUserClient,
+        datasourceReadOnlyUserClient,
         SYS_SCHEMA_TASKS_QUERY,
         HttpResponseStatus.FORBIDDEN,
         "{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
@@ -287,83 +317,119 @@ public abstract class AbstractAuthConfigurationTest
   }
 
   @Test
-  public void test_systemSchemaAccess_datasourceAndSysUser() throws Exception
+  public void test_systemSchemaAccess_datasourceReadAndSysUser() throws 
Exception
   {
     // check that we can access a datasource-permission restricted resource on 
the broker
     HttpUtil.makeRequest(
-        datasourceAndSysUserClient,
+        datasourceReadAndSysUserClient,
         HttpMethod.GET,
         config.getBrokerUrl() + "/druid/v2/datasources/auth_test",
         null
     );
 
     // as user that can only read auth_test
-    LOG.info("Checking sys.segments query as datasourceAndSysUser...");
+    LOG.info("Checking sys.segments query as datasourceReadAndSysUser...");
     verifySystemSchemaQuery(
-        datasourceAndSysUserClient,
+        datasourceReadAndSysUserClient,
         SYS_SCHEMA_SEGMENTS_QUERY,
         adminSegments.stream()
                      .filter((segmentEntry) -> 
"auth_test".equals(segmentEntry.get("datasource")))
                      .collect(Collectors.toList())
     );
 
-    LOG.info("Checking sys.servers query as datasourceAndSysUser...");
+    LOG.info("Checking sys.servers query as datasourceReadAndSysUser...");
+    verifySystemSchemaQueryFailure(
+        datasourceReadAndSysUserClient,
+        SYS_SCHEMA_SERVERS_QUERY,
+        HttpResponseStatus.FORBIDDEN,
+        "{\"Access-Check-Result\":\"Insufficient permission to view servers : 
Allowed:false, Message:\"}"
+    );
+
+    LOG.info("Checking sys.server_segments query as 
datasourceReadAndSysUser...");
+    verifySystemSchemaQueryFailure(
+        datasourceReadAndSysUserClient,
+        SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
+        HttpResponseStatus.FORBIDDEN,
+        "{\"Access-Check-Result\":\"Insufficient permission to view servers : 
Allowed:false, Message:\"}"
+    );
+
+    // Verify that sys.tasks result is empty as it is filtered by Datasource 
WRITE access
+    LOG.info("Checking sys.tasks query as datasourceReadAndSysUser...");
+    verifySystemSchemaQuery(
+        datasourceReadAndSysUserClient,
+        SYS_SCHEMA_TASKS_QUERY,
+        Collections.emptyList()
+    );
+  }
+
+  @Test
+  public void test_systemSchemaAccess_datasourceWriteAndSysUser() throws 
Exception
+  {
+    // Verify that sys.segments result is empty as it is filtered by 
Datasource READ access
+    LOG.info("Checking sys.segments query as datasourceWriteAndSysUser...");
+    verifySystemSchemaQuery(
+        datasourceWriteAndSysUserClient,
+        SYS_SCHEMA_SEGMENTS_QUERY,
+        Collections.emptyList()
+    );
+
+    LOG.info("Checking sys.servers query as datasourceWriteAndSysUser...");
     verifySystemSchemaQueryFailure(
-        datasourceAndSysUserClient,
+        datasourceWriteAndSysUserClient,
         SYS_SCHEMA_SERVERS_QUERY,
         HttpResponseStatus.FORBIDDEN,
         "{\"Access-Check-Result\":\"Insufficient permission to view servers : 
Allowed:false, Message:\"}"
     );
 
-    LOG.info("Checking sys.server_segments query as datasourceAndSysUser...");
+    LOG.info("Checking sys.server_segments query as 
datasourceWriteAndSysUser...");
     verifySystemSchemaQueryFailure(
-        datasourceAndSysUserClient,
+        datasourceWriteAndSysUserClient,
         SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
         HttpResponseStatus.FORBIDDEN,
         "{\"Access-Check-Result\":\"Insufficient permission to view servers : 
Allowed:false, Message:\"}"
     );
 
-    LOG.info("Checking sys.tasks query as datasourceAndSysUser...");
+    LOG.info("Checking sys.tasks query as datasourceWriteAndSysUser...");
     verifySystemSchemaQuery(
-        datasourceAndSysUserClient,
+        datasourceWriteAndSysUserClient,
         SYS_SCHEMA_TASKS_QUERY,
         adminTasks.stream()
-                  .filter((taskEntry) -> 
"auth_test".equals(taskEntry.get("datasource")))
+                  .filter((segmentEntry) -> 
"auth_test".equals(segmentEntry.get("datasource")))
                   .collect(Collectors.toList())
     );
   }
 
   @Test
-  public void test_systemSchemaAccess_datasourceAndSysWithStateUser() throws 
Exception
+  public void test_systemSchemaAccess_datasourceReadAndSysWithStateUser() 
throws Exception
   {
     // check that we can access a state-permission restricted resource on the 
broker
     HttpUtil.makeRequest(
-        datasourceWithStateUserClient,
+        datasourceReadWithStateUserClient,
         HttpMethod.GET,
         config.getBrokerUrl() + "/status",
         null
     );
 
     // as user that can read auth_test and STATE
-    LOG.info("Checking sys.segments query as datasourceWithStateUser...");
+    LOG.info("Checking sys.segments query as datasourceReadWithStateUser...");
     verifySystemSchemaQuery(
-        datasourceWithStateUserClient,
+        datasourceReadWithStateUserClient,
         SYS_SCHEMA_SEGMENTS_QUERY,
         adminSegments.stream()
                      .filter((segmentEntry) -> 
"auth_test".equals(segmentEntry.get("datasource")))
                      .collect(Collectors.toList())
     );
 
-    LOG.info("Checking sys.servers query as datasourceWithStateUser...");
+    LOG.info("Checking sys.servers query as datasourceReadWithStateUser...");
     verifySystemSchemaServerQuery(
-        datasourceWithStateUserClient,
+        datasourceReadWithStateUserClient,
         SYS_SCHEMA_SERVERS_QUERY,
         adminServers
     );
 
-    LOG.info("Checking sys.server_segments query as 
datasourceWithStateUser...");
+    LOG.info("Checking sys.server_segments query as 
datasourceReadWithStateUser...");
     verifySystemSchemaQuery(
-        datasourceWithStateUserClient,
+        datasourceReadWithStateUserClient,
         SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
         adminServerSegments.stream()
                            .filter((serverSegmentEntry) -> ((String) 
serverSegmentEntry.get("segment_id")).contains(
@@ -371,13 +437,12 @@ public abstract class AbstractAuthConfigurationTest
                            .collect(Collectors.toList())
     );
 
-    LOG.info("Checking sys.tasks query as datasourceWithStateUser...");
+    // Verify that sys.tasks result is empty as it is filtered by Datasource 
WRITE access
+    LOG.info("Checking sys.tasks query as datasourceReadWithStateUser...");
     verifySystemSchemaQuery(
-        datasourceWithStateUserClient,
+        datasourceReadWithStateUserClient,
         SYS_SCHEMA_TASKS_QUERY,
-        adminTasks.stream()
-                  .filter((taskEntry) -> 
"auth_test".equals(taskEntry.get("datasource")))
-                  .collect(Collectors.toList())
+        Collections.emptyList()
     );
   }
 
@@ -500,9 +565,10 @@ public abstract class AbstractAuthConfigurationTest
   protected void setupHttpClientsAndUsers() throws Exception
   {
     setupHttpClients();
-    setupDatasourceOnlyUser();
-    setupDatasourceAndSysTableUser();
-    setupDatasourceAndSysAndStateUser();
+    setupDatasourceReadOnlyUser();
+    setupDatasourceReadAndSysTableUser();
+    setupDatasourceWriteAndSysTableUser();
+    setupDatasourceReadAndSysAndStateUser();
     setupSysTableAndStateOnlyUser();
   }
 
@@ -766,18 +832,23 @@ public abstract class AbstractAuthConfigurationTest
         httpClient
     );
 
-    datasourceOnlyUserClient = new CredentialedHttpClient(
-        new BasicCredentials("datasourceOnlyUser", "helloworld"),
+    datasourceReadOnlyUserClient = new CredentialedHttpClient(
+        new BasicCredentials("datasourceReadOnlyUser", "helloworld"),
+        httpClient
+    );
+
+    datasourceReadAndSysUserClient = new CredentialedHttpClient(
+        new BasicCredentials("datasourceReadAndSysUser", "helloworld"),
         httpClient
     );
 
-    datasourceAndSysUserClient = new CredentialedHttpClient(
-        new BasicCredentials("datasourceAndSysUser", "helloworld"),
+    datasourceWriteAndSysUserClient = new CredentialedHttpClient(
+        new BasicCredentials("datasourceWriteAndSysUser", "helloworld"),
         httpClient
     );
 
-    datasourceWithStateUserClient = new CredentialedHttpClient(
-        new BasicCredentials("datasourceWithStateUser", "helloworld"),
+    datasourceReadWithStateUserClient = new CredentialedHttpClient(
+        new BasicCredentials("datasourceReadWithStateUser", "helloworld"),
         httpClient
     );
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
index 690757f..c87b750 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
@@ -70,38 +70,50 @@ public class ITBasicAuthConfigurationTest extends 
AbstractAuthConfigurationTest
   }
 
   @Override
-  protected void setupDatasourceOnlyUser() throws Exception
+  protected void setupDatasourceReadOnlyUser() throws Exception
   {
     createUserAndRoleWithPermissions(
         adminClient,
-        "datasourceOnlyUser",
+        "datasourceReadOnlyUser",
         "helloworld",
-        "datasourceOnlyRole",
-        DATASOURCE_ONLY_PERMISSIONS
+        "datasourceReadOnlyRole",
+        DATASOURCE_READ_ONLY_PERMISSIONS
     );
   }
 
   @Override
-  protected void setupDatasourceAndSysTableUser() throws Exception
+  protected void setupDatasourceReadAndSysTableUser() throws Exception
   {
     createUserAndRoleWithPermissions(
         adminClient,
-        "datasourceAndSysUser",
+        "datasourceReadAndSysUser",
         "helloworld",
-        "datasourceAndSysRole",
-        DATASOURCE_SYS_PERMISSIONS
+        "datasourceReadAndSysRole",
+        DATASOURCE_READ_SYS_PERMISSIONS
     );
   }
 
   @Override
-  protected void setupDatasourceAndSysAndStateUser() throws Exception
+  protected void setupDatasourceWriteAndSysTableUser() throws Exception
   {
     createUserAndRoleWithPermissions(
         adminClient,
-        "datasourceWithStateUser",
+        "datasourceWriteAndSysUser",
         "helloworld",
-        "datasourceWithStateRole",
-        DATASOURCE_SYS_STATE_PERMISSIONS
+        "datasourceWriteAndSysRole",
+        DATASOURCE_WRITE_SYS_PERMISSIONS
+    );
+  }
+
+  @Override
+  protected void setupDatasourceReadAndSysAndStateUser() throws Exception
+  {
+    createUserAndRoleWithPermissions(
+        adminClient,
+        "datasourceReadWithStateUser",
+        "helloworld",
+        "datasourceReadWithStateRole",
+        DATASOURCE_READ_SYS_STATE_PERMISSIONS
     );
   }
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
index 97a7c53..3469f56 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
@@ -120,29 +120,38 @@ public class ITBasicAuthLdapConfigurationTest extends 
AbstractAuthConfigurationT
 
 
   @Override
-  protected void setupDatasourceOnlyUser() throws Exception
+  protected void setupDatasourceReadOnlyUser() throws Exception
   {
     createRoleWithPermissionsAndGroupMapping(
-        "datasourceOnlyGroup",
-        ImmutableMap.of("datasourceOnlyRole", DATASOURCE_ONLY_PERMISSIONS)
+        "datasourceReadOnlyGroup",
+        ImmutableMap.of("datasourceReadOnlyRole", 
DATASOURCE_READ_ONLY_PERMISSIONS)
     );
   }
 
   @Override
-  protected void setupDatasourceAndSysTableUser() throws Exception
+  protected void setupDatasourceReadAndSysTableUser() throws Exception
   {
     createRoleWithPermissionsAndGroupMapping(
-        "datasourceWithSysGroup",
-        ImmutableMap.of("datasourceWithSysRole", DATASOURCE_SYS_PERMISSIONS)
+        "datasourceReadWithSysGroup",
+        ImmutableMap.of("datasourceReadWithSysRole", 
DATASOURCE_READ_SYS_PERMISSIONS)
     );
   }
 
   @Override
-  protected void setupDatasourceAndSysAndStateUser() throws Exception
+  protected void setupDatasourceWriteAndSysTableUser() throws Exception
   {
     createRoleWithPermissionsAndGroupMapping(
-        "datasourceWithStateGroup",
-        ImmutableMap.of("datasourceWithStateRole", 
DATASOURCE_SYS_STATE_PERMISSIONS)
+        "datasourceWriteWithSysGroup",
+        ImmutableMap.of("datasourceWriteWithSysRole", 
DATASOURCE_WRITE_SYS_PERMISSIONS)
+    );
+  }
+
+  @Override
+  protected void setupDatasourceReadAndSysAndStateUser() throws Exception
+  {
+    createRoleWithPermissionsAndGroupMapping(
+        "datasourceReadWithStateGroup",
+        ImmutableMap.of("datasourceReadWithStateRole", 
DATASOURCE_READ_SYS_STATE_PERMISSIONS)
     );
   }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 0562e33..aa2bdce 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -872,7 +872,7 @@ public class SystemSchema extends AbstractSchema
       );
 
       Function<TaskStatusPlus, Iterable<ResourceAction>> raGenerator = task -> 
Collections.singletonList(
-          
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(task.getDataSource()));
+          
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(task.getDataSource()));
 
       final Iterable<TaskStatusPlus> authorizedTasks = 
AuthorizationUtils.filterAuthorizedResources(
           authenticationResult,
@@ -1014,7 +1014,7 @@ public class SystemSchema extends AbstractSchema
       );
 
       Function<SupervisorStatus, Iterable<ResourceAction>> raGenerator = 
supervisor -> Collections.singletonList(
-          
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource()));
+          
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(supervisor.getSource()));
 
       final Iterable<SupervisorStatus> authorizedSupervisors = 
AuthorizationUtils.filterAuthorizedResources(
           authenticationResult,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index d25a902..b299a3a 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -81,9 +81,12 @@ import 
org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.NoopEscalator;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable;
 import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -196,14 +199,7 @@ public class SystemSchemaTest extends CalciteTestBase
                               .addMockedMethod("getStatus")
                               .createMock();
     request = EasyMock.createMock(Request.class);
-    authMapper = new AuthorizerMapper(null)
-    {
-      @Override
-      public Authorizer getAuthorizer(String name)
-      {
-        return (authenticationResult, resource, action) -> new Access(true);
-      }
-    };
+    authMapper = createAuthMapper();
 
     final File tmpDir = temporaryFolder.newFolder();
     final QueryableIndex index1 = IndexBuilder.create()
@@ -554,33 +550,7 @@ public class SystemSchemaTest extends CalciteTestBase
     
EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
 
     EasyMock.replay(client, request, responseHolder, responseHandler, 
metadataView);
-    DataContext dataContext = new DataContext()
-    {
-      @Override
-      public SchemaPlus getRootSchema()
-      {
-        return null;
-      }
-
-      @Override
-      public JavaTypeFactory getTypeFactory()
-      {
-        return null;
-      }
-
-      @Override
-      public QueryProvider getQueryProvider()
-      {
-        return null;
-      }
-
-      @Override
-      public Object get(String name)
-      {
-        return CalciteTests.SUPER_USER_AUTH_RESULT;
-      }
-    };
-
+    DataContext dataContext = createDataContext(Users.SUPER);
     final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
     rows.sort((Object[] row1, Object[] row2) -> ((Comparable) 
row1[0]).compareTo(row2[0]));
 
@@ -831,33 +801,7 @@ public class SystemSchemaTest extends CalciteTestBase
         indexerNodeDiscovery
     );
 
-    DataContext dataContext = new DataContext()
-    {
-      @Override
-      public SchemaPlus getRootSchema()
-      {
-        return null;
-      }
-
-      @Override
-      public JavaTypeFactory getTypeFactory()
-      {
-        return null;
-      }
-
-      @Override
-      public QueryProvider getQueryProvider()
-      {
-        return null;
-      }
-
-      @Override
-      public Object get(String name)
-      {
-        return CalciteTests.SUPER_USER_AUTH_RESULT;
-      }
-    };
-
+    DataContext dataContext = createDataContext(Users.SUPER);
     final List<Object[]> rows = serversTable.scan(dataContext).toList();
     rows.sort((Object[] row1, Object[] row2) -> ((Comparable) 
row1[0]).compareTo(row2[0]));
 
@@ -1112,32 +1056,7 @@ public class SystemSchemaTest extends CalciteTestBase
             .andReturn(immutableDruidServers)
             .once();
     EasyMock.replay(serverView);
-    DataContext dataContext = new DataContext()
-    {
-      @Override
-      public SchemaPlus getRootSchema()
-      {
-        return null;
-      }
-
-      @Override
-      public JavaTypeFactory getTypeFactory()
-      {
-        return null;
-      }
-
-      @Override
-      public QueryProvider getQueryProvider()
-      {
-        return null;
-      }
-
-      @Override
-      public Object get(String name)
-      {
-        return CalciteTests.SUPER_USER_AUTH_RESULT;
-      }
-    };
+    DataContext dataContext = createDataContext(Users.SUPER);
 
     //server_segments table is the join of servers and segments table
     // it will have 5 rows as follows
@@ -1230,32 +1149,7 @@ public class SystemSchemaTest extends CalciteTestBase
     responseHolder.done();
 
     EasyMock.replay(client, request, responseHandler);
-    DataContext dataContext = new DataContext()
-    {
-      @Override
-      public SchemaPlus getRootSchema()
-      {
-        return null;
-      }
-
-      @Override
-      public JavaTypeFactory getTypeFactory()
-      {
-        return null;
-      }
-
-      @Override
-      public QueryProvider getQueryProvider()
-      {
-        return null;
-      }
-
-      @Override
-      public Object get(String name)
-      {
-        return CalciteTests.SUPER_USER_AUTH_RESULT;
-      }
-    };
+    DataContext dataContext = createDataContext(Users.SUPER);
     final List<Object[]> rows = tasksTable.scan(dataContext).toList();
 
     Object[] row0 = rows.get(0);
@@ -1295,6 +1189,81 @@ public class SystemSchemaTest extends CalciteTestBase
   }
 
   @Test
+  public void testTasksTableAuth() throws Exception
+  {
+    SystemSchema.TasksTable tasksTable = new SystemSchema.TasksTable(client, 
mapper, authMapper);
+
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, 
"/druid/indexer/v1/tasks"))
+            .andReturn(request)
+            .anyTimes();
+
+    String json = "[{\n"
+                  + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+                  + "\t\"groupId\": 
\"group_index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+                  + "\t\"type\": \"index\",\n"
+                  + "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n"
+                  + "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n"
+                  + "\t\"statusCode\": \"FAILED\",\n"
+                  + "\t\"runnerStatusCode\": \"NONE\",\n"
+                  + "\t\"duration\": -1,\n"
+                  + "\t\"location\": {\n"
+                  + "\t\t\"host\": \"testHost\",\n"
+                  + "\t\t\"port\": 1234,\n"
+                  + "\t\t\"tlsPort\": -1\n"
+                  + "\t},\n"
+                  + "\t\"dataSource\": \"wikipedia\",\n"
+                  + "\t\"errorMsg\": null\n"
+                  + "}, {\n"
+                  + "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+                  + "\t\"groupId\": 
\"group_index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+                  + "\t\"type\": \"index\",\n"
+                  + "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n"
+                  + "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n"
+                  + "\t\"statusCode\": \"RUNNING\",\n"
+                  + "\t\"runnerStatusCode\": \"RUNNING\",\n"
+                  + "\t\"duration\": null,\n"
+                  + "\t\"location\": {\n"
+                  + "\t\t\"host\": \"192.168.1.6\",\n"
+                  + "\t\t\"port\": 8100,\n"
+                  + "\t\t\"tlsPort\": -1\n"
+                  + "\t},\n"
+                  + "\t\"dataSource\": \"wikipedia\",\n"
+                  + "\t\"errorMsg\": null\n"
+                  + "}]";
+
+    HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK);
+
+    EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject()))
+            .andReturn(createFullResponseHolder(httpResp, json))
+            .andReturn(createFullResponseHolder(httpResp, json))
+            .andReturn(createFullResponseHolder(httpResp, json));
+
+    EasyMock.expect(request.getUrl())
+            .andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks";))
+            .anyTimes();
+
+    EasyMock.replay(client, request, responseHandler);
+
+    // Verify that no row is returned for Datasource Read user
+    List<Object[]> rows = tasksTable
+        .scan(createDataContext(Users.DATASOURCE_READ))
+        .toList();
+    Assert.assertTrue(rows.isEmpty());
+
+    // Verify that 2 rows are is returned for Datasource Write user
+    rows = tasksTable
+        .scan(createDataContext(Users.DATASOURCE_WRITE))
+        .toList();
+    Assert.assertEquals(2, rows.size());
+
+    // Verify that 2 rows are returned for Super user
+    rows = tasksTable
+        .scan(createDataContext(Users.SUPER))
+        .toList();
+    Assert.assertEquals(2, rows.size());
+  }
+
+  @Test
   public void testSupervisorTable() throws Exception
   {
 
@@ -1337,7 +1306,111 @@ public class SystemSchemaTest extends CalciteTestBase
     responseHolder.done();
 
     EasyMock.replay(client, request, responseHandler);
-    DataContext dataContext = new DataContext()
+    DataContext dataContext = createDataContext(Users.SUPER);
+    final List<Object[]> rows = supervisorTable.scan(dataContext).toList();
+
+    Object[] row0 = rows.get(0);
+    Assert.assertEquals("wikipedia", row0[0].toString());
+    Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString());
+    Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString());
+    Assert.assertEquals(0L, row0[3]);
+    Assert.assertEquals("kafka", row0[4].toString());
+    Assert.assertEquals("wikipedia", row0[5].toString());
+    Assert.assertEquals(0L, row0[6]);
+    Assert.assertEquals(
+        
"{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}",
+        row0[7].toString()
+    );
+
+    // Verify value types.
+    verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
+  }
+
+  @Test
+  public void testSupervisorTableAuth() throws Exception
+  {
+    SystemSchema.SupervisorsTable supervisorTable =
+        new SystemSchema.SupervisorsTable(client, mapper, createAuthMapper());
+
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, 
"/druid/indexer/v1/supervisor?system"))
+            .andReturn(request)
+            .anyTimes();
+
+    final String json = "[{\n"
+                  + "\t\"id\": \"wikipedia\",\n"
+                  + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+                  + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+                  + "\t\"healthy\": false,\n"
+                  + "\t\"specString\": 
\"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}"
+                  + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n"
+                  + "\t\"type\": \"kafka\",\n"
+                  + "\t\"source\": \"wikipedia\",\n"
+                  + "\t\"suspended\": false\n"
+                  + "}]";
+
+    HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK);
+    EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject()))
+            .andReturn(createFullResponseHolder(httpResponse, json))
+            .andReturn(createFullResponseHolder(httpResponse, json))
+            .andReturn(createFullResponseHolder(httpResponse, json));
+
+    EasyMock.expect(responseHandler.getStatus())
+            .andReturn(httpResponse.getStatus().getCode())
+            .anyTimes();
+    EasyMock.expect(request.getUrl())
+            .andReturn(new 
URL("http://test-host:1234/druid/indexer/v1/supervisor?system";))
+            .anyTimes();
+
+    EasyMock.replay(client, request, responseHandler);
+
+    // Verify that no row is returned for Datasource Read user
+    List<Object[]> rows = supervisorTable
+        .scan(createDataContext(Users.DATASOURCE_READ))
+        .toList();
+    Assert.assertTrue(rows.isEmpty());
+
+    // Verify that 1 row is returned for Datasource Write user
+    rows = supervisorTable
+        .scan(createDataContext(Users.DATASOURCE_WRITE))
+        .toList();
+    Assert.assertEquals(1, rows.size());
+
+    // Verify that 1 row is returned for Super user
+    rows = supervisorTable
+        .scan(createDataContext(Users.SUPER))
+        .toList();
+    Assert.assertEquals(1, rows.size());
+
+    // TODO: If needed, verify the first row here
+
+    // TODO: Verify value types.
+    // verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
+  }
+
+  /**
+   * Creates a response holder that contains the given json.
+   */
+  private InputStreamFullResponseHolder createFullResponseHolder(
+      HttpResponse httpResponse,
+      String json
+  )
+  {
+    InputStreamFullResponseHolder responseHolder =
+        new InputStreamFullResponseHolder(httpResponse.getStatus(), 
httpResponse);
+
+    byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
+    responseHolder.addChunk(bytesToWrite);
+    responseHolder.done();
+
+    return responseHolder;
+  }
+
+  /**
+   * Creates a DataContext for the given username.
+   */
+  private DataContext createDataContext(String username)
+  {
+    return new DataContext()
     {
       @Override
       public SchemaPlus getRootSchema()
@@ -1358,28 +1431,40 @@ public class SystemSchemaTest extends CalciteTestBase
       }
 
       @Override
-      public Object get(String name)
+      public Object get(String authorizerName)
       {
-        return CalciteTests.SUPER_USER_AUTH_RESULT;
+        return CalciteTests.TEST_SUPERUSER_NAME.equals(username)
+               ? CalciteTests.SUPER_USER_AUTH_RESULT
+               : new AuthenticationResult(username, authorizerName, null, 
null);
       }
     };
-    final List<Object[]> rows = supervisorTable.scan(dataContext).toList();
-
-    Object[] row0 = rows.get(0);
-    Assert.assertEquals("wikipedia", row0[0].toString());
-    Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString());
-    Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString());
-    Assert.assertEquals(0L, row0[3]);
-    Assert.assertEquals("kafka", row0[4].toString());
-    Assert.assertEquals("wikipedia", row0[5].toString());
-    Assert.assertEquals(0L, row0[6]);
-    Assert.assertEquals(
-        
"{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}",
-        row0[7].toString()
-    );
+  }
 
-    // Verify value types.
-    verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
+  private AuthorizerMapper createAuthMapper()
+  {
+    return new AuthorizerMapper(null)
+    {
+      @Override
+      public Authorizer getAuthorizer(String name)
+      {
+        return (authenticationResult, resource, action) -> {
+          final String username = authenticationResult.getIdentity();
+
+          // Allow access to a Datasource if
+          // - any user requests Read access
+          // - Super User or Datasource Write User requests Write access
+          if (resource.getType().equals(ResourceType.DATASOURCE)) {
+            return new Access(
+                action == Action.READ
+                || username.equals(Users.SUPER)
+                || username.equals(Users.DATASOURCE_WRITE)
+            );
+          }
+
+          return new Access(true);
+        };
+      }
+    };
   }
 
   private static void verifyTypes(final List<Object[]> rows, final 
RowSignature signature)
@@ -1443,4 +1528,14 @@ public class SystemSchemaTest extends CalciteTestBase
       }
     }
   }
+
+  /**
+   * Usernames to be used in tests.
+   */
+  private static class Users
+  {
+    private static final String SUPER = CalciteTests.TEST_SUPERUSER_NAME;
+    private static final String DATASOURCE_READ = "datasourceRead";
+    private static final String DATASOURCE_WRITE = "datasourceWrite";
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to