This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f2d6100 Require Datasource WRITE authorization for Supervisor and
Task access (#11718)
f2d6100 is described below
commit f2d6100124dbe7cbc92ad91d28bd12a1800a1f2a
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Oct 8 10:39:48 2021 +0530
Require Datasource WRITE authorization for Supervisor and Task access
(#11718)
Follow up PR for #11680
Description
Supervisor and Task APIs are related to ingestion and must always require
Datasource WRITE
authorization even if they are purely informative.
Changes
Check Datasource WRITE in SystemSchema for tables "supervisors" and "tasks"
Check Datasource WRITE for APIs /supervisor/history and
/supervisor/{id}/history
Check Datasource for all Indexing Task APIs
---
docs/operations/security-user-auth.md | 3 +-
.../druid/indexing/common/task/AbstractTask.java | 16 +
.../task/AppenderatorDriverRealtimeIndexTask.java | 5 +-
.../indexing/common/task/HadoopIndexTask.java | 3 +-
.../druid/indexing/common/task/IndexTask.java | 7 +-
.../druid/indexing/common/task/IndexTaskUtils.java | 13 +-
.../parallel/ParallelIndexSupervisorTask.java | 38 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 7 +-
.../indexing/overlord/http/OverlordResource.java | 6 +-
.../overlord/supervisor/SupervisorResource.java | 8 +-
.../SeekableStreamIndexTaskRunner.java | 32 +-
.../druid/indexing/common/task/IndexTaskTest.java | 84 +++++
.../overlord/http/OverlordResourceTest.java | 179 +++++++++-
.../security/SupervisorResourceFilterTest.java | 2 +-
.../supervisor/SupervisorResourceTest.java | 9 +
.../SeekableStreamIndexTaskRunnerAuthTest.java | 393 +++++++++++++++++++++
.../docker/ldap-configs/bootstrap.ldif | 74 ++--
.../security/AbstractAuthConfigurationTest.java | 175 ++++++---
.../security/ITBasicAuthConfigurationTest.java | 36 +-
.../security/ITBasicAuthLdapConfigurationTest.java | 27 +-
.../druid/sql/calcite/schema/SystemSchema.java | 4 +-
.../druid/sql/calcite/schema/SystemSchemaTest.java | 361 ++++++++++++-------
22 files changed, 1170 insertions(+), 312 deletions(-)
diff --git a/docs/operations/security-user-auth.md
b/docs/operations/security-user-auth.md
index 54f4317..e5e5cab 100644
--- a/docs/operations/security-user-auth.md
+++ b/docs/operations/security-user-auth.md
@@ -139,7 +139,8 @@ Queries on the [system schema
tables](../querying/sql.md#system-schema) require
- `segments`: Segments will be filtered based on DATASOURCE READ permissions.
- `servers`: The user requires STATE READ permissions.
- `server_segments`: The user requires STATE READ permissions and segments
will be filtered based on DATASOURCE READ permissions.
-- `tasks`: Tasks will be filtered based on DATASOURCE READ permissions.
+- `tasks`: Druid filters tasks according to DATASOURCE WRITE permissions.
+- `supervisors`: Druid filters supervisors according to DATASOURCE WRITE
permissions.
## Configuration Propagation
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 8964a1e..e31270c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -30,9 +30,12 @@ import
org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ForbiddenException;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
@@ -164,6 +167,19 @@ public abstract class AbstractTask implements Task
return TaskStatus.success(getId());
}
+ /**
+ * Authorizes WRITE action on a task's datasource
+ *
+ * @throws ForbiddenException if not authorized
+ */
+ public void authorizeRequestForDatasourceWrite(
+ HttpServletRequest request,
+ AuthorizerMapper authorizerMapper
+ ) throws ForbiddenException
+ {
+ IndexTaskUtils.authorizeRequestForDatasourceWrite(request, dataSource,
authorizerMapper);
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 8908014..4e1c5e7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -84,7 +84,6 @@ import
org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.Committers;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -530,7 +529,7 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
@Context final HttpServletRequest req
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Map<String, Object> averagesMap = new HashMap<>();
@@ -556,7 +555,7 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
@Context final HttpServletRequest req
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
parseExceptionHandler.getSavedParseExceptions()
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index b04097c..97e5ec0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -63,7 +63,6 @@ import
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.hadoop.mapred.JobClient;
@@ -634,7 +633,7 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
@QueryParam("windows") List<Integer> windows
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a993b53..c68a2b4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -95,7 +95,6 @@ import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -285,7 +284,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetUnparseableEvents(full)).build();
}
@@ -394,7 +393,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetRowStats(full)).build();
}
@@ -406,7 +405,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index abc6b92..5be50b6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -56,28 +56,25 @@ public class IndexTaskUtils
}
/**
- * Authorizes action to be performed on a task's datasource
+ * Authorizes WRITE action on a task's datasource
*
- * @return authorization result
+ * @throws ForbiddenException if not authorized
*/
- public static Access datasourceAuthorizationCheck(
+ public static void authorizeRequestForDatasourceWrite(
final HttpServletRequest req,
- Action action,
String datasource,
AuthorizerMapper authorizerMapper
- )
+ ) throws ForbiddenException
{
ResourceAction resourceAction = new ResourceAction(
new Resource(datasource, ResourceType.DATASOURCE),
- action
+ Action.WRITE
);
Access access = AuthorizationUtils.authorizeResourceAction(req,
resourceAction, authorizerMapper);
if (!access.isAllowed()) {
throw new ForbiddenException(access.toString());
}
-
- return access;
}
public static void setTaskDimensions(final ServiceMetricEvent.Builder
metricBuilder, final Task task)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 201a48e..3dfc9c7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -53,7 +53,6 @@ import
org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
-import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
@@ -75,8 +74,6 @@ import
org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.segment.realtime.firehose.ChatHandlers;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingShardSpec;
@@ -1176,7 +1173,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Context final HttpServletRequest req
)
{
- ChatHandlers.authorizationCheck(req, Action.READ, getDataSource(),
authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
if (toolbox == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1255,12 +1252,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Context final HttpServletRequest req
)
{
- ChatHandlers.authorizationCheck(
- req,
- Action.WRITE,
- getDataSource(),
- authorizerMapper
- );
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
if (currentSubTaskHolder == null || currentSubTaskHolder.getTask() ==
null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
} else {
@@ -1278,7 +1270,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getMode(@Context final HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
}
@@ -1287,7 +1279,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getPhaseName(@Context final HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
if (isParallelMode()) {
final ParallelIndexTaskRunner runner = getCurrentRunner();
if (runner == null) {
@@ -1305,7 +1297,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getProgress(@Context final HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1319,7 +1311,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasks(@Context final HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1333,7 +1325,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpecs(@Context final HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1347,7 +1339,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningSubTaskSpecs(@Context final HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1361,7 +1353,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getCompleteSubTaskSpecs(@Context final HttpServletRequest
req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1375,7 +1367,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpec(@PathParam("id") String id, @Context final
HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
@@ -1395,7 +1387,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskState(@PathParam("id") String id, @Context final
HttpServletRequest req)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1417,7 +1409,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Context final HttpServletRequest req
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
@@ -1569,7 +1561,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetRowStatsAndUnparseableEvents(full,
false).lhs).build();
}
@@ -1614,8 +1606,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
-
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetLiveReports(full)).build();
}
+
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 267df48..f979bc8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -67,7 +67,6 @@ import
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
@@ -488,7 +487,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, List<String>> events = new HashMap<>();
boolean needsBuildSegments = false;
@@ -563,7 +562,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetRowStats(full)).build();
}
@@ -595,7 +594,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
@QueryParam("full") String full
)
{
- IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetLiveReports(full)).build();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 0076eb0..1f596dd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -575,11 +575,11 @@ public class OverlordResource
}
}
// early authorization check if datasource != null
- // fail fast if user not authorized to access datasource
+ // fail fast if user not authorized to write to datasource
if (dataSource != null) {
final ResourceAction resourceAction = new ResourceAction(
new Resource(dataSource, ResourceType.DATASOURCE),
- Action.READ
+ Action.WRITE
);
final Access authResult = AuthorizationUtils.authorizeResourceAction(
req,
@@ -987,7 +987,7 @@ public class OverlordResource
);
}
return Collections.singletonList(
- new ResourceAction(new Resource(taskDatasource,
ResourceType.DATASOURCE), Action.READ)
+ new ResourceAction(new Resource(taskDatasource,
ResourceType.DATASOURCE), Action.WRITE)
);
};
List<TaskStatusPlus> optionalTypeFilteredList = collectionToFilter;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 74c5b61..9104666 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -65,7 +65,7 @@ import java.util.stream.Collectors;
@Path("/druid/indexer/v1/supervisor")
public class SupervisorResource
{
- private static final Function<VersionedSupervisorSpec,
Iterable<ResourceAction>> SPEC_DATASOURCE_READ_RA_GENERATOR =
+ private static final Function<VersionedSupervisorSpec,
Iterable<ResourceAction>> SPEC_DATASOURCE_WRITE_RA_GENERATOR =
supervisorSpec -> {
if (supervisorSpec.getSpec() == null) {
return null;
@@ -75,7 +75,7 @@ public class SupervisorResource
}
return Iterables.transform(
supervisorSpec.getSpec().getDataSources(),
- AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
+ AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR
);
};
@@ -376,7 +376,7 @@ public class SupervisorResource
AuthorizationUtils.filterAuthorizedResources(
req,
manager.getSupervisorHistory(),
- SPEC_DATASOURCE_READ_RA_GENERATOR,
+ SPEC_DATASOURCE_WRITE_RA_GENERATOR,
authorizerMapper
)
).build()
@@ -401,7 +401,7 @@ public class SupervisorResource
AuthorizationUtils.filterAuthorizedResources(
req,
historyForId,
- SPEC_DATASOURCE_READ_RA_GENERATOR,
+ SPEC_DATASOURCE_WRITE_RA_GENERATOR,
authorizerMapper
)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index fba9352..50bf440 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -81,8 +81,6 @@ import
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul
import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
@@ -1361,12 +1359,10 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
/**
* Authorizes action to be performed on this task's datasource
- *
- * @return authorization result
*/
- private Access authorizationCheck(final HttpServletRequest req, Action
action)
+ private void authorizeRequest(final HttpServletRequest req)
{
- return IndexTaskUtils.datasourceAuthorizationCheck(req, action,
task.getDataSource(), authorizerMapper);
+ task.authorizeRequestForDatasourceWrite(req, authorizerMapper);
}
public Appenderator getAppenderator()
@@ -1443,7 +1439,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Path("/stop")
public Response stop(@Context final HttpServletRequest req)
{
- authorizationCheck(req, Action.WRITE);
+ authorizeRequest(req);
stopGracefully();
return Response.status(Response.Status.OK).build();
}
@@ -1453,7 +1449,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public Status getStatusHTTP(@Context final HttpServletRequest req)
{
- authorizationCheck(req, Action.READ);
+ authorizeRequest(req);
return status;
}
@@ -1468,7 +1464,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets(@Context
final HttpServletRequest req)
{
- authorizationCheck(req, Action.READ);
+ authorizeRequest(req);
return getCurrentOffsets();
}
@@ -1482,7 +1478,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public Map<PartitionIdType, SequenceOffsetType> getEndOffsetsHTTP(@Context
final HttpServletRequest req)
{
- authorizationCheck(req, Action.READ);
+ authorizeRequest(req);
return getEndOffsets();
}
@@ -1502,7 +1498,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
) throws InterruptedException
{
- authorizationCheck(req, Action.WRITE);
+ authorizeRequest(req);
return setEndOffsets(sequences, finish);
}
@@ -1552,7 +1548,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
- authorizationCheck(req, Action.READ);
+ authorizeRequest(req);
return Response.ok(doGetRowStats()).build();
}
@@ -1563,7 +1559,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
- authorizationCheck(req, Action.READ);
+ authorizeRequest(req);
return Response.ok(doGetLiveReports()).build();
}
@@ -1575,7 +1571,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
- authorizationCheck(req, Action.READ);
+ authorizeRequest(req);
List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
parseExceptionHandler.getSavedParseExceptions()
);
@@ -1726,7 +1722,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
- authorizationCheck(req, Action.READ);
+ authorizeRequest(req);
return getCheckpoints();
}
@@ -1753,7 +1749,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
) throws InterruptedException
{
- authorizationCheck(req, Action.WRITE);
+ authorizeRequest(req);
return pause();
}
@@ -1808,7 +1804,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Path("/resume")
public Response resumeHTTP(@Context final HttpServletRequest req) throws
InterruptedException
{
- authorizationCheck(req, Action.WRITE);
+ authorizeRequest(req);
resume();
return Response.status(Response.Status.OK).build();
}
@@ -1841,7 +1837,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public DateTime getStartTime(@Context final HttpServletRequest req)
{
- authorizationCheck(req, Action.WRITE);
+ authorizeRequest(req);
return startTime;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index f805b2c..5ff9d4f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -90,6 +90,14 @@ import
org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import
org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.Authorizer;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -111,6 +119,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
@@ -2605,6 +2614,81 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
+ public void testAuthorizeRequestForDatasourceWrite() throws Exception
+ {
+ // Need to run this only once
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ return;
+ }
+
+ // Create auth mapper which allows datasourceReadUser to read datasource
+ // and datasourceWriteUser to write to datasource
+ final String datasourceWriteUser = "datasourceWriteUser";
+ final String datasourceReadUser = "datasourceReadUser";
+ AuthorizerMapper authorizerMapper = new AuthorizerMapper(null) {
+ @Override
+ public Authorizer getAuthorizer(String name)
+ {
+ return (authenticationResult, resource, action) -> {
+ final String username = authenticationResult.getIdentity();
+ if (!resource.getType().equals(ResourceType.DATASOURCE) || username
== null) {
+ return new Access(false);
+ } else if (action == Action.WRITE) {
+ return new Access(username.equals(datasourceWriteUser));
+ } else {
+ return new Access(username.equals(datasourceReadUser));
+ }
+ };
+ }
+ };
+
+ // Create test target
+ final IndexTask indexTask = new IndexTask(
+ null,
+ null,
+ createDefaultIngestionSpec(
+ jsonMapper,
+ temporaryFolder.newFolder(),
+ null,
+ null,
+ createTuningConfigWithMaxRowsPerSegment(2, true),
+ false,
+ false
+ ),
+ null
+ );
+
+ // Verify that datasourceWriteUser is successfully authorized
+ HttpServletRequest writeUserRequest =
EasyMock.mock(HttpServletRequest.class);
+ expectAuthorizationTokenCheck(datasourceWriteUser, writeUserRequest);
+ EasyMock.replay(writeUserRequest);
+ indexTask.authorizeRequestForDatasourceWrite(writeUserRequest,
authorizerMapper);
+
+ // Verify that datasourceReadUser is not successfully authorized
+ HttpServletRequest readUserRequest =
EasyMock.mock(HttpServletRequest.class);
+ expectAuthorizationTokenCheck(datasourceReadUser, readUserRequest);
+ EasyMock.replay(readUserRequest);
+ expectedException.expect(ForbiddenException.class);
+ indexTask.authorizeRequestForDatasourceWrite(readUserRequest,
authorizerMapper);
+ }
+
+ private void expectAuthorizationTokenCheck(String username,
HttpServletRequest request)
+ {
+ AuthenticationResult authenticationResult = new
AuthenticationResult(username, "druid", null, null);
+
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
+
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .andReturn(authenticationResult)
+ .atLeastOnce();
+
+ request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
+ EasyMock.expectLastCall().anyTimes();
+
+ request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+ EasyMock.expectLastCall().anyTimes();
+ }
+
+ @Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(IndexTuningConfig.class)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index c475166..ae494ee 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -71,6 +71,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.util.Arrays;
@@ -116,10 +117,24 @@ public class OverlordResourceTest
@Override
public Access authorize(AuthenticationResult authenticationResult,
Resource resource, Action action)
{
- if (resource.getName().equals("allow")) {
- return new Access(true);
- } else {
- return new Access(false);
+ final String username = authenticationResult.getIdentity();
+ switch (resource.getName()) {
+ case "allow":
+ return new Access(true);
+ case Datasources.WIKIPEDIA:
+ // All users can read wikipedia but only writer can write
+ return new Access(
+ action == Action.READ
+ || (action == Action.WRITE &&
Users.WIKI_WRITER.equals(username))
+ );
+ case Datasources.BUZZFEED:
+ // All users can read buzzfeed but only writer can write
+ return new Access(
+ action == Action.READ
+ || (action == Action.WRITE &&
Users.BUZZ_WRITER.equals(username))
+ );
+ default:
+ return new Access(false);
}
}
@@ -842,6 +857,104 @@ public class OverlordResourceTest
}
@Test
+ public void testGetTasksRequiresDatasourceWrite()
+ {
+ // Setup mocks for a user who has write access to "wikipedia"
+ // and read access to "buzzfeed"
+ expectAuthorizationTokenCheck(Users.WIKI_WRITER);
+
+ // Setup mocks to return completed, active, known, pending and running
tasks
+
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null,
null, null)).andStubReturn(
+ ImmutableList.of(
+ createTaskInfo("id_5", Datasources.WIKIPEDIA),
+ createTaskInfo("id_6", Datasources.BUZZFEED)
+ )
+ );
+
+
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
+ ImmutableList.of(
+ createTaskInfo("id_1", Datasources.WIKIPEDIA),
+ createTaskInfo("id_2", Datasources.BUZZFEED)
+ )
+ );
+
+ EasyMock.<Collection<? extends
TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
+ ImmutableList.of(
+ new MockTaskRunnerWorkItem("id_1", null),
+ new MockTaskRunnerWorkItem("id_4", null)
+ )
+ ).atLeastOnce();
+
+ EasyMock.<Collection<? extends
TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
+ ImmutableList.of(
+ new MockTaskRunnerWorkItem("id_4", null)
+ )
+ );
+
+ EasyMock.<Collection<? extends
TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
+ ImmutableList.of(
+ new MockTaskRunnerWorkItem("id_1", null)
+ )
+ );
+
+ // Replay all mocks
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter
+ );
+
+ // Verify that only the tasks of write access datasource are returned
+ List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>)
overlordResource
+ .getTasks(null, null, null, null, null, req)
+ .getEntity();
+ Assert.assertEquals(2, responseObjects.size());
+ for (TaskStatusPlus taskStatus : responseObjects) {
+ Assert.assertEquals(Datasources.WIKIPEDIA, taskStatus.getDataSource());
+ }
+ }
+
+ @Test
+ public void testGetTasksFilterByDatasourceRequiresWrite()
+ {
+ // Setup mocks for a user who has write access to "wikipedia"
+ // and read access to "buzzfeed"
+ expectAuthorizationTokenCheck(Users.WIKI_WRITER);
+
+ // Setup mocks to return completed, active, known, pending and running
tasks
+
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null,
null, null)).andStubReturn(
+ ImmutableList.of(
+ createTaskInfo("id_5", Datasources.WIKIPEDIA),
+ createTaskInfo("id_6", Datasources.BUZZFEED)
+ )
+ );
+
+
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
+ ImmutableList.of(
+ createTaskInfo("id_1", Datasources.WIKIPEDIA),
+ createTaskInfo("id_2", Datasources.BUZZFEED)
+ )
+ );
+
+ // Replay all mocks
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter
+ );
+
+ // Verify that only the tasks of write access datasource are returned
+ expectedException.expect(WebApplicationException.class);
+ overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null,
req);
+ }
+
+ @Test
public void testGetNullCompleteTask()
{
expectAuthorizationTokenCheck();
@@ -927,6 +1040,27 @@ public class OverlordResourceTest
}
@Test
+ public void testTaskPostDeniesDatasourceReadUser()
+ {
+ expectAuthorizationTokenCheck(Users.WIKI_WRITER);
+
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter
+ );
+
+ // Verify that taskPost fails for user who has only datasource read access
+ Task task = NoopTask.create(Datasources.BUZZFEED);
+ expectedException.expect(ForbiddenException.class);
+ expectedException.expect(ForbiddenException.class);
+ overlordResource.taskPost(task, req);
+ }
+
+ @Test
public void testKillPendingSegments()
{
expectAuthorizationTokenCheck();
@@ -1317,7 +1451,12 @@ public class OverlordResourceTest
private void expectAuthorizationTokenCheck()
{
- AuthenticationResult authenticationResult = new
AuthenticationResult("druid", "druid", null, null);
+ expectAuthorizationTokenCheck(Users.DRUID);
+ }
+
+ private void expectAuthorizationTokenCheck(String username)
+ {
+ AuthenticationResult authenticationResult = new
AuthenticationResult(username, "druid", null, null);
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
@@ -1360,6 +1499,36 @@ public class OverlordResourceTest
};
}
+ private TaskInfo<Task, TaskStatus> createTaskInfo(String taskId, String
datasource)
+ {
+ return new TaskInfo<>(
+ taskId,
+ DateTime.now(ISOChronology.getInstanceUTC()),
+ TaskStatus.success(taskId),
+ datasource,
+ getTaskWithIdAndDatasource(taskId, datasource)
+ );
+ }
+
+ /**
+ * Usernames to use in the tests.
+ */
+ private static class Users
+ {
+ private static final String DRUID = "druid";
+ private static final String WIKI_WRITER = "Wiki Writer";
+ private static final String BUZZ_WRITER = "Buzz Writer";
+ }
+
+ /**
+ * Datasource names to use in the tests.
+ */
+ private static class Datasources
+ {
+ private static final String WIKIPEDIA = "wikipedia";
+ private static final String BUZZFEED = "buzzfeed";
+ }
+
private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem
{
public MockTaskRunnerWorkItem(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
index d38165c..118b8b5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java
@@ -130,7 +130,7 @@ public class SupervisorResourceFilterTest
)
{
expect(containerRequest.getPathSegments())
- .andReturn(getPathSegments("/druid/indexer/v1/supervisor/datasource1"))
+ .andReturn(getPathSegments(path))
.anyTimes();
expect(containerRequest.getMethod()).andReturn(requestMethod).anyTimes();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index c22460e..8c77ea0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -30,10 +30,12 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ResourceType;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -92,7 +94,14 @@ public class SupervisorResourceTest extends EasyMockSupport
@Override
public Authorizer getAuthorizer(String name)
{
+ // Create an Authorizer that only allows Datasource WRITE requests
+ // because all SupervisorResource APIs must only check Datasource
WRITE access
return (authenticationResult, resource, action) -> {
+ if (!resource.getType().equals(ResourceType.DATASOURCE)
+ || action != Action.WRITE) {
+ return new Access(false);
+ }
+
if (authenticationResult.getIdentity().equals("druid")) {
return Access.OK;
} else {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
new file mode 100644
index 0000000..3eba53f
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.Authorizer;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+
+/**
+ * Unit Test to test authorization requirements of
+ * {@link SeekableStreamIndexTaskRunner} and {@link SeekableStreamIndexTask}.
+ */
+public class SeekableStreamIndexTaskRunnerAuthTest
+{
+
+ /**
+ * Test target.
+ */
+ private TestSeekableStreamIndexTaskRunner taskRunner;
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setUp()
+ {
+ // Create an AuthorizerMapper that only allows access to a Datasource
resource
+ AuthorizerMapper authorizerMapper = new AuthorizerMapper(null)
+ {
+ @Override
+ public Authorizer getAuthorizer(String name)
+ {
+ return (authenticationResult, resource, action) -> {
+ final String username = authenticationResult.getIdentity();
+
+ // Allow access to a Datasource if
+ // - any user requests Read access
+ // - or, Datasource Write User requests Write access
+ if (resource.getType().equals(ResourceType.DATASOURCE)) {
+ return new Access(
+ action == Action.READ
+ || (action == Action.WRITE &&
username.equals(Users.DATASOURCE_WRITE))
+ );
+ }
+
+ // Do not allow access to any other resource
+ return new Access(false);
+ };
+ }
+ };
+
+ DataSchema dataSchema = new DataSchema(
+ "datasource",
+ new TimestampSpec(null, null, null),
+ new DimensionsSpec(Collections.emptyList()),
+ new AggregatorFactory[]{},
+ new ArbitraryGranularitySpec(new AllGranularity(),
Collections.emptyList()),
+ TransformSpec.NONE,
+ null,
+ null
+ );
+ SeekableStreamIndexTaskTuningConfig tuningConfig =
mock(SeekableStreamIndexTaskTuningConfig.class);
+
/*expect(tuningConfig.getIntermediateHandoffPeriod()).andReturn(Period.minutes(10));
+ expect(tuningConfig.isLogParseExceptions()).andReturn(false);
+ expect(tuningConfig.getMaxParseExceptions()).andReturn(1000);
+ expect(tuningConfig.getMaxSavedParseExceptions()).andReturn(100);
+
+ replay(tuningConfig);*/
+
+ SeekableStreamIndexTaskIOConfig<String, String> ioConfig = new
TestSeekableStreamIndexTaskIOConfig();
+
+ // Initiliaze task and task runner
+ SeekableStreamIndexTask<String, String, ByteEntity> indexTask
+ = new TestSeekableStreamIndexTask("id", dataSchema, tuningConfig,
ioConfig);
+ taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask,
authorizerMapper);
+ }
+
+ @Test
+ public void testGetStatusHttp()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStatusHTTP);
+ }
+
+ @Test
+ public void testGetStartTime()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStartTime);
+ }
+
+ @Test
+ public void testStop()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(taskRunner::stop);
+ }
+
+ @Test
+ public void testPauseHttp()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(req -> {
+ try {
+ taskRunner.pauseHTTP(req);
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
+ });
+ }
+
+ @Test
+ public void testResumeHttp()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(req -> {
+ try {
+ taskRunner.resumeHTTP(req);
+ }
+ catch (InterruptedException e) {
+
+ }
+ });
+ }
+
+ @Test
+ public void testGetEndOffsets()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCurrentOffsets);
+ }
+
+ @Test
+ public void testSetEndOffsetsHttp()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(request -> {
+ try {
+ taskRunner.setEndOffsetsHTTP(Collections.emptyMap(), false, request);
+ }
+ catch (InterruptedException e) {
+
+ }
+ });
+ }
+
+ @Test
+ public void testGetCheckpointsHttp()
+ {
+ verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCheckpointsHTTP);
+ }
+
+
+ private void verifyOnlyDatasourceWriteUserCanAccess(
+ Consumer<HttpServletRequest> method
+ )
+ {
+ // Verify that datasource write user can access
+ HttpServletRequest allowedRequest = createRequest(Users.DATASOURCE_WRITE);
+ replay(allowedRequest);
+ method.accept(allowedRequest);
+
+ // Verify that no other user can access
+ HttpServletRequest blockedRequest = createRequest(Users.DATASOURCE_READ);
+ replay(blockedRequest);
+ expectedException.expect(ForbiddenException.class);
+ method.accept(blockedRequest);
+ }
+
+ private HttpServletRequest createRequest(String username)
+ {
+ HttpServletRequest request = mock(HttpServletRequest.class);
+
+ AuthenticationResult authenticationResult = new
AuthenticationResult(username, "druid", null, null);
+
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
+
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .andReturn(authenticationResult)
+ .atLeastOnce();
+
+ request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
+ EasyMock.expectLastCall().anyTimes();
+
+ request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+ EasyMock.expectLastCall().anyTimes();
+
+ return request;
+ }
+
+ /**
+ * Dummy implementation used as the test target to test out the non-abstract
methods.
+ */
+ private static class TestSeekableStreamIndexTaskRunner
+ extends SeekableStreamIndexTaskRunner<String, String, ByteEntity>
+ {
+
+ private TestSeekableStreamIndexTaskRunner(
+ SeekableStreamIndexTask<String, String, ByteEntity> task,
+ AuthorizerMapper authorizerMapper
+ )
+ {
+ super(task, null, authorizerMapper, LockGranularity.SEGMENT);
+ }
+
+ @Override
+ protected boolean isEndOfShard(String seqNum)
+ {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
+ TaskToolbox toolbox,
+ String checkpointsString
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected String getNextStartOffset(String sequenceNumber)
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamEndSequenceNumbers<String, String>
deserializePartitionsFromMetadata(
+ ObjectMapper mapper,
+ Object object
+ )
+ {
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ protected List<OrderedPartitionableRecord<String, String, ByteEntity>>
getRecords(
+ RecordSupplier<String, String, ByteEntity> recordSupplier,
+ TaskToolbox toolbox
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetadata(
+ SeekableStreamSequenceNumbers<String, String> partitions
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected OrderedSequenceNumber<String> createSequenceNumber(String
sequenceNumber)
+ {
+ return null;
+ }
+
+ @Override
+ protected void possiblyResetDataSourceMetadata(
+ TaskToolbox toolbox,
+ RecordSupplier<String, String, ByteEntity> recordSupplier,
+ Set<StreamPartition<String>> assignment
+ )
+ {
+
+ }
+
+ @Override
+ protected boolean isEndOffsetExclusive()
+ {
+ return false;
+ }
+
+ @Override
+ protected TypeReference<List<SequenceMetadata<String, String>>>
getSequenceMetadataTypeReference()
+ {
+ return null;
+ }
+ }
+
+ private static class TestSeekableStreamIndexTask extends
SeekableStreamIndexTask<String, String, ByteEntity>
+ {
+
+ public TestSeekableStreamIndexTask(
+ String id,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig,
+ SeekableStreamIndexTaskIOConfig<String, String> ioConfig
+ )
+ {
+ super(id, null, dataSchema, tuningConfig, ioConfig, null, null);
+ }
+
+ @Override
+ public String getType()
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamIndexTaskRunner<String, String, ByteEntity>
createTaskRunner()
+ {
+ return null;
+ }
+
+ @Override
+ protected RecordSupplier<String, String, ByteEntity>
newTaskRecordSupplier()
+ {
+ return null;
+ }
+ }
+
+ private static class TestSeekableStreamIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<String, String>
+ {
+ public TestSeekableStreamIndexTaskIOConfig()
+ {
+ super(
+ null,
+ "someSequence",
+ new SeekableStreamStartSequenceNumbers<>("abc", "def",
Collections.emptyMap(), Collections.emptyMap(), null),
+ new SeekableStreamEndSequenceNumbers<>("abc", "def",
Collections.emptyMap(), Collections.emptyMap()),
+ false,
+ DateTimes.nowUtc().minusDays(2),
+ DateTimes.nowUtc(),
+ new CsvInputFormat(null, null, true, null, 0)
+ );
+ }
+ }
+
+ /**
+ * Usernames used in the tests.
+ */
+ private static class Users
+ {
+ private static final String DATASOURCE_READ = "datasourceRead";
+ private static final String DATASOURCE_WRITE = "datasourceWrite";
+ }
+
+}
diff --git a/integration-tests/docker/ldap-configs/bootstrap.ldif
b/integration-tests/docker/ldap-configs/bootstrap.ldif
index 9614a78..d88265f 100644
--- a/integration-tests/docker/ldap-configs/bootstrap.ldif
+++ b/integration-tests/docker/ldap-configs/bootstrap.ldif
@@ -53,41 +53,41 @@ description: Admin users
uniqueMember: uid=admin,ou=Users,dc=example,dc=org
uniqueMember: uid=druid_system,ou=Users,dc=example,dc=org
-dn: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org
-uid: datasourceOnlyUser
-cn: datasourceOnlyUser
-sn: datasourceOnlyUser
+dn: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org
+uid: datasourceReadOnlyUser
+cn: datasourceReadOnlyUser
+sn: datasourceReadOnlyUser
objectClass: top
objectClass: posixAccount
objectClass: inetOrgPerson
-homeDirectory: /home/datasourceOnlyUser
+homeDirectory: /home/datasourceReadOnlyUser
uidNumber: 3
gidNumber: 3
userPassword: helloworld
-dn: cn=datasourceOnlyGroup,ou=Groups,dc=example,dc=org
+dn: cn=datasourceReadOnlyGroup,ou=Groups,dc=example,dc=org
objectClass: groupOfUniqueNames
-cn: datasourceOnlyGroup
-description: datasourceOnlyGroup users
-uniqueMember: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org
-
-dn: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org
-uid: datasourceWithStateUser
-cn: datasourceWithStateUser
-sn: datasourceWithStateUser
+cn: datasourceReadOnlyGroup
+description: datasourceReadOnlyGroup users
+uniqueMember: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org
+
+dn: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org
+uid: datasourceReadWithStateUser
+cn: datasourceReadWithStateUser
+sn: datasourceReadWithStateUser
objectClass: top
objectClass: posixAccount
objectClass: inetOrgPerson
-homeDirectory: /home/datasourceWithStateUser
+homeDirectory: /home/datasourceReadWithStateUser
uidNumber: 4
gidNumber: 4
userPassword: helloworld
-dn: cn=datasourceWithStateGroup,ou=Groups,dc=example,dc=org
+dn: cn=datasourceReadWithStateGroup,ou=Groups,dc=example,dc=org
objectClass: groupOfUniqueNames
-cn: datasourceWithStateGroup
-description: datasourceWithStateGroup users
-uniqueMember: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org
+cn: datasourceReadWithStateGroup
+description: datasourceReadWithStateGroup users
+uniqueMember: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org
dn: uid=stateOnlyUser,ou=Users,dc=example,dc=org
uid: stateOnlyUser
@@ -137,20 +137,38 @@ uidNumber: 7
gidNumber: 7
userPassword: helloworld
-dn: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org
-uid: datasourceAndSysUser
-cn: datasourceAndSysUser
-sn: datasourceAndSysUser
+dn: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org
+uid: datasourceReadAndSysUser
+cn: datasourceReadAndSysUser
+sn: datasourceReadAndSysUser
+objectClass: top
+objectClass: posixAccount
+objectClass: inetOrgPerson
+homeDirectory: /home/datasourceReadAndSysUser
+uidNumber: 8
+gidNumber: 8
+userPassword: helloworld
+
+dn: cn=datasourceReadWithSysGroup,ou=Groups,dc=example,dc=org
+objectClass: groupOfUniqueNames
+cn: datasourceReadWithSysGroup
+description: datasourceReadWithSysGroup users
+uniqueMember: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org
+
+dn: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org
+uid: datasourceWriteAndSysUser
+cn: datasourceWriteAndSysUser
+sn: datasourceWriteAndSysUser
objectClass: top
objectClass: posixAccount
objectClass: inetOrgPerson
-homeDirectory: /home/datasourceAndSysUser
+homeDirectory: /home/datasourceWriteAndSysUser
uidNumber: 8
gidNumber: 8
userPassword: helloworld
-dn: cn=datasourceWithSysGroup,ou=Groups,dc=example,dc=org
+dn: cn=datasourceWriteWithSysGroup,ou=Groups,dc=example,dc=org
objectClass: groupOfUniqueNames
-cn: datasourceWithSysGroup
-description: datasourceWithSysGroup users
-uniqueMember: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org
+cn: datasourceWriteWithSysGroup
+description: datasourceWriteWithSysGroup users
+uniqueMember: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
index 42556ad..c29be8d 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
@@ -98,7 +98,7 @@ public abstract class AbstractAuthConfigurationTest
* create a ResourceAction set of permissions that can only read a
'auth_test' datasource, for Authorizer
* implementations which use ResourceAction pattern matching
*/
- protected static final List<ResourceAction> DATASOURCE_ONLY_PERMISSIONS =
Collections.singletonList(
+ protected static final List<ResourceAction> DATASOURCE_READ_ONLY_PERMISSIONS
= Collections.singletonList(
new ResourceAction(
new Resource("auth_test", ResourceType.DATASOURCE),
Action.READ
@@ -109,7 +109,7 @@ public abstract class AbstractAuthConfigurationTest
* create a ResourceAction set of permissions that can only read 'auth_test'
+ partial SYSTEM_TABLE, for Authorizer
* implementations which use ResourceAction pattern matching
*/
- protected static final List<ResourceAction> DATASOURCE_SYS_PERMISSIONS =
ImmutableList.of(
+ protected static final List<ResourceAction> DATASOURCE_READ_SYS_PERMISSIONS
= ImmutableList.of(
new ResourceAction(
new Resource("auth_test", ResourceType.DATASOURCE),
Action.READ
@@ -135,10 +135,38 @@ public abstract class AbstractAuthConfigurationTest
);
/**
+ * create a ResourceAction set of permissions that can write datasource
'auth_test'
+ */
+ protected static final List<ResourceAction> DATASOURCE_WRITE_SYS_PERMISSIONS
= ImmutableList.of(
+ new ResourceAction(
+ new Resource("auth_test", ResourceType.DATASOURCE),
+ Action.WRITE
+ ),
+ new ResourceAction(
+ new Resource("segments", ResourceType.SYSTEM_TABLE),
+ Action.READ
+ ),
+ // test missing state permission but having servers permission
+ new ResourceAction(
+ new Resource("servers", ResourceType.SYSTEM_TABLE),
+ Action.READ
+ ),
+ // test missing state permission but having server_segments permission
+ new ResourceAction(
+ new Resource("server_segments", ResourceType.SYSTEM_TABLE),
+ Action.READ
+ ),
+ new ResourceAction(
+ new Resource("tasks", ResourceType.SYSTEM_TABLE),
+ Action.READ
+ )
+ );
+
+ /**
* create a ResourceAction set of permissions that can only read 'auth_test'
+ STATE + SYSTEM_TABLE read access, for
* Authorizer implementations which use ResourceAction pattern matching
*/
- protected static final List<ResourceAction> DATASOURCE_SYS_STATE_PERMISSIONS
= ImmutableList.of(
+ protected static final List<ResourceAction>
DATASOURCE_READ_SYS_STATE_PERMISSIONS = ImmutableList.of(
new ResourceAction(
new Resource("auth_test", ResourceType.DATASOURCE),
Action.READ
@@ -187,16 +215,18 @@ public abstract class AbstractAuthConfigurationTest
protected CoordinatorResourceTestClient coordinatorClient;
protected HttpClient adminClient;
- protected HttpClient datasourceOnlyUserClient;
- protected HttpClient datasourceAndSysUserClient;
- protected HttpClient datasourceWithStateUserClient;
+ protected HttpClient datasourceReadOnlyUserClient;
+ protected HttpClient datasourceReadAndSysUserClient;
+ protected HttpClient datasourceWriteAndSysUserClient;
+ protected HttpClient datasourceReadWithStateUserClient;
protected HttpClient stateOnlyUserClient;
protected HttpClient internalSystemClient;
- protected abstract void setupDatasourceOnlyUser() throws Exception;
- protected abstract void setupDatasourceAndSysTableUser() throws Exception;
- protected abstract void setupDatasourceAndSysAndStateUser() throws Exception;
+ protected abstract void setupDatasourceReadOnlyUser() throws Exception;
+ protected abstract void setupDatasourceReadAndSysTableUser() throws
Exception;
+ protected abstract void setupDatasourceWriteAndSysTableUser() throws
Exception;
+ protected abstract void setupDatasourceReadAndSysAndStateUser() throws
Exception;
protected abstract void setupSysTableAndStateOnlyUser() throws Exception;
protected abstract void setupTestSpecificHttpClients() throws Exception;
protected abstract String getAuthenticatorName();
@@ -242,44 +272,44 @@ public abstract class AbstractAuthConfigurationTest
}
@Test
- public void test_systemSchemaAccess_datasourceOnlyUser() throws Exception
+ public void test_systemSchemaAccess_datasourceReadOnlyUser() throws Exception
{
// check that we can access a datasource-permission restricted resource on
the broker
HttpUtil.makeRequest(
- datasourceOnlyUserClient,
+ datasourceReadOnlyUserClient,
HttpMethod.GET,
config.getBrokerUrl() + "/druid/v2/datasources/auth_test",
null
);
// as user that can only read auth_test
- LOG.info("Checking sys.segments query as datasourceOnlyUser...");
+ LOG.info("Checking sys.segments query as datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
- datasourceOnlyUserClient,
+ datasourceReadOnlyUserClient,
SYS_SCHEMA_SEGMENTS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
);
- LOG.info("Checking sys.servers query as datasourceOnlyUser...");
+ LOG.info("Checking sys.servers query as datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
- datasourceOnlyUserClient,
+ datasourceReadOnlyUserClient,
SYS_SCHEMA_SERVERS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
);
- LOG.info("Checking sys.server_segments query as datasourceOnlyUser...");
+ LOG.info("Checking sys.server_segments query as
datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
- datasourceOnlyUserClient,
+ datasourceReadOnlyUserClient,
SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
);
- LOG.info("Checking sys.tasks query as datasourceOnlyUser...");
+ LOG.info("Checking sys.tasks query as datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
- datasourceOnlyUserClient,
+ datasourceReadOnlyUserClient,
SYS_SCHEMA_TASKS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
@@ -287,83 +317,119 @@ public abstract class AbstractAuthConfigurationTest
}
@Test
- public void test_systemSchemaAccess_datasourceAndSysUser() throws Exception
+ public void test_systemSchemaAccess_datasourceReadAndSysUser() throws
Exception
{
// check that we can access a datasource-permission restricted resource on
the broker
HttpUtil.makeRequest(
- datasourceAndSysUserClient,
+ datasourceReadAndSysUserClient,
HttpMethod.GET,
config.getBrokerUrl() + "/druid/v2/datasources/auth_test",
null
);
// as user that can only read auth_test
- LOG.info("Checking sys.segments query as datasourceAndSysUser...");
+ LOG.info("Checking sys.segments query as datasourceReadAndSysUser...");
verifySystemSchemaQuery(
- datasourceAndSysUserClient,
+ datasourceReadAndSysUserClient,
SYS_SCHEMA_SEGMENTS_QUERY,
adminSegments.stream()
.filter((segmentEntry) ->
"auth_test".equals(segmentEntry.get("datasource")))
.collect(Collectors.toList())
);
- LOG.info("Checking sys.servers query as datasourceAndSysUser...");
+ LOG.info("Checking sys.servers query as datasourceReadAndSysUser...");
+ verifySystemSchemaQueryFailure(
+ datasourceReadAndSysUserClient,
+ SYS_SCHEMA_SERVERS_QUERY,
+ HttpResponseStatus.FORBIDDEN,
+ "{\"Access-Check-Result\":\"Insufficient permission to view servers :
Allowed:false, Message:\"}"
+ );
+
+ LOG.info("Checking sys.server_segments query as
datasourceReadAndSysUser...");
+ verifySystemSchemaQueryFailure(
+ datasourceReadAndSysUserClient,
+ SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
+ HttpResponseStatus.FORBIDDEN,
+ "{\"Access-Check-Result\":\"Insufficient permission to view servers :
Allowed:false, Message:\"}"
+ );
+
+ // Verify that sys.tasks result is empty as it is filtered by Datasource
WRITE access
+ LOG.info("Checking sys.tasks query as datasourceReadAndSysUser...");
+ verifySystemSchemaQuery(
+ datasourceReadAndSysUserClient,
+ SYS_SCHEMA_TASKS_QUERY,
+ Collections.emptyList()
+ );
+ }
+
+ @Test
+ public void test_systemSchemaAccess_datasourceWriteAndSysUser() throws
Exception
+ {
+ // Verify that sys.segments result is empty as it is filtered by
Datasource READ access
+ LOG.info("Checking sys.segments query as datasourceWriteAndSysUser...");
+ verifySystemSchemaQuery(
+ datasourceWriteAndSysUserClient,
+ SYS_SCHEMA_SEGMENTS_QUERY,
+ Collections.emptyList()
+ );
+
+ LOG.info("Checking sys.servers query as datasourceWriteAndSysUser...");
verifySystemSchemaQueryFailure(
- datasourceAndSysUserClient,
+ datasourceWriteAndSysUserClient,
SYS_SCHEMA_SERVERS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Insufficient permission to view servers :
Allowed:false, Message:\"}"
);
- LOG.info("Checking sys.server_segments query as datasourceAndSysUser...");
+ LOG.info("Checking sys.server_segments query as
datasourceWriteAndSysUser...");
verifySystemSchemaQueryFailure(
- datasourceAndSysUserClient,
+ datasourceWriteAndSysUserClient,
SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Insufficient permission to view servers :
Allowed:false, Message:\"}"
);
- LOG.info("Checking sys.tasks query as datasourceAndSysUser...");
+ LOG.info("Checking sys.tasks query as datasourceWriteAndSysUser...");
verifySystemSchemaQuery(
- datasourceAndSysUserClient,
+ datasourceWriteAndSysUserClient,
SYS_SCHEMA_TASKS_QUERY,
adminTasks.stream()
- .filter((taskEntry) ->
"auth_test".equals(taskEntry.get("datasource")))
+ .filter((segmentEntry) ->
"auth_test".equals(segmentEntry.get("datasource")))
.collect(Collectors.toList())
);
}
@Test
- public void test_systemSchemaAccess_datasourceAndSysWithStateUser() throws
Exception
+ public void test_systemSchemaAccess_datasourceReadAndSysWithStateUser()
throws Exception
{
// check that we can access a state-permission restricted resource on the
broker
HttpUtil.makeRequest(
- datasourceWithStateUserClient,
+ datasourceReadWithStateUserClient,
HttpMethod.GET,
config.getBrokerUrl() + "/status",
null
);
// as user that can read auth_test and STATE
- LOG.info("Checking sys.segments query as datasourceWithStateUser...");
+ LOG.info("Checking sys.segments query as datasourceReadWithStateUser...");
verifySystemSchemaQuery(
- datasourceWithStateUserClient,
+ datasourceReadWithStateUserClient,
SYS_SCHEMA_SEGMENTS_QUERY,
adminSegments.stream()
.filter((segmentEntry) ->
"auth_test".equals(segmentEntry.get("datasource")))
.collect(Collectors.toList())
);
- LOG.info("Checking sys.servers query as datasourceWithStateUser...");
+ LOG.info("Checking sys.servers query as datasourceReadWithStateUser...");
verifySystemSchemaServerQuery(
- datasourceWithStateUserClient,
+ datasourceReadWithStateUserClient,
SYS_SCHEMA_SERVERS_QUERY,
adminServers
);
- LOG.info("Checking sys.server_segments query as
datasourceWithStateUser...");
+ LOG.info("Checking sys.server_segments query as
datasourceReadWithStateUser...");
verifySystemSchemaQuery(
- datasourceWithStateUserClient,
+ datasourceReadWithStateUserClient,
SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
adminServerSegments.stream()
.filter((serverSegmentEntry) -> ((String)
serverSegmentEntry.get("segment_id")).contains(
@@ -371,13 +437,12 @@ public abstract class AbstractAuthConfigurationTest
.collect(Collectors.toList())
);
- LOG.info("Checking sys.tasks query as datasourceWithStateUser...");
+ // Verify that sys.tasks result is empty as it is filtered by Datasource
WRITE access
+ LOG.info("Checking sys.tasks query as datasourceReadWithStateUser...");
verifySystemSchemaQuery(
- datasourceWithStateUserClient,
+ datasourceReadWithStateUserClient,
SYS_SCHEMA_TASKS_QUERY,
- adminTasks.stream()
- .filter((taskEntry) ->
"auth_test".equals(taskEntry.get("datasource")))
- .collect(Collectors.toList())
+ Collections.emptyList()
);
}
@@ -500,9 +565,10 @@ public abstract class AbstractAuthConfigurationTest
protected void setupHttpClientsAndUsers() throws Exception
{
setupHttpClients();
- setupDatasourceOnlyUser();
- setupDatasourceAndSysTableUser();
- setupDatasourceAndSysAndStateUser();
+ setupDatasourceReadOnlyUser();
+ setupDatasourceReadAndSysTableUser();
+ setupDatasourceWriteAndSysTableUser();
+ setupDatasourceReadAndSysAndStateUser();
setupSysTableAndStateOnlyUser();
}
@@ -766,18 +832,23 @@ public abstract class AbstractAuthConfigurationTest
httpClient
);
- datasourceOnlyUserClient = new CredentialedHttpClient(
- new BasicCredentials("datasourceOnlyUser", "helloworld"),
+ datasourceReadOnlyUserClient = new CredentialedHttpClient(
+ new BasicCredentials("datasourceReadOnlyUser", "helloworld"),
+ httpClient
+ );
+
+ datasourceReadAndSysUserClient = new CredentialedHttpClient(
+ new BasicCredentials("datasourceReadAndSysUser", "helloworld"),
httpClient
);
- datasourceAndSysUserClient = new CredentialedHttpClient(
- new BasicCredentials("datasourceAndSysUser", "helloworld"),
+ datasourceWriteAndSysUserClient = new CredentialedHttpClient(
+ new BasicCredentials("datasourceWriteAndSysUser", "helloworld"),
httpClient
);
- datasourceWithStateUserClient = new CredentialedHttpClient(
- new BasicCredentials("datasourceWithStateUser", "helloworld"),
+ datasourceReadWithStateUserClient = new CredentialedHttpClient(
+ new BasicCredentials("datasourceReadWithStateUser", "helloworld"),
httpClient
);
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
index 690757f..c87b750 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
@@ -70,38 +70,50 @@ public class ITBasicAuthConfigurationTest extends
AbstractAuthConfigurationTest
}
@Override
- protected void setupDatasourceOnlyUser() throws Exception
+ protected void setupDatasourceReadOnlyUser() throws Exception
{
createUserAndRoleWithPermissions(
adminClient,
- "datasourceOnlyUser",
+ "datasourceReadOnlyUser",
"helloworld",
- "datasourceOnlyRole",
- DATASOURCE_ONLY_PERMISSIONS
+ "datasourceReadOnlyRole",
+ DATASOURCE_READ_ONLY_PERMISSIONS
);
}
@Override
- protected void setupDatasourceAndSysTableUser() throws Exception
+ protected void setupDatasourceReadAndSysTableUser() throws Exception
{
createUserAndRoleWithPermissions(
adminClient,
- "datasourceAndSysUser",
+ "datasourceReadAndSysUser",
"helloworld",
- "datasourceAndSysRole",
- DATASOURCE_SYS_PERMISSIONS
+ "datasourceReadAndSysRole",
+ DATASOURCE_READ_SYS_PERMISSIONS
);
}
@Override
- protected void setupDatasourceAndSysAndStateUser() throws Exception
+ protected void setupDatasourceWriteAndSysTableUser() throws Exception
{
createUserAndRoleWithPermissions(
adminClient,
- "datasourceWithStateUser",
+ "datasourceWriteAndSysUser",
"helloworld",
- "datasourceWithStateRole",
- DATASOURCE_SYS_STATE_PERMISSIONS
+ "datasourceWriteAndSysRole",
+ DATASOURCE_WRITE_SYS_PERMISSIONS
+ );
+ }
+
+ @Override
+ protected void setupDatasourceReadAndSysAndStateUser() throws Exception
+ {
+ createUserAndRoleWithPermissions(
+ adminClient,
+ "datasourceReadWithStateUser",
+ "helloworld",
+ "datasourceReadWithStateRole",
+ DATASOURCE_READ_SYS_STATE_PERMISSIONS
);
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
index 97a7c53..3469f56 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
@@ -120,29 +120,38 @@ public class ITBasicAuthLdapConfigurationTest extends
AbstractAuthConfigurationT
@Override
- protected void setupDatasourceOnlyUser() throws Exception
+ protected void setupDatasourceReadOnlyUser() throws Exception
{
createRoleWithPermissionsAndGroupMapping(
- "datasourceOnlyGroup",
- ImmutableMap.of("datasourceOnlyRole", DATASOURCE_ONLY_PERMISSIONS)
+ "datasourceReadOnlyGroup",
+ ImmutableMap.of("datasourceReadOnlyRole",
DATASOURCE_READ_ONLY_PERMISSIONS)
);
}
@Override
- protected void setupDatasourceAndSysTableUser() throws Exception
+ protected void setupDatasourceReadAndSysTableUser() throws Exception
{
createRoleWithPermissionsAndGroupMapping(
- "datasourceWithSysGroup",
- ImmutableMap.of("datasourceWithSysRole", DATASOURCE_SYS_PERMISSIONS)
+ "datasourceReadWithSysGroup",
+ ImmutableMap.of("datasourceReadWithSysRole",
DATASOURCE_READ_SYS_PERMISSIONS)
);
}
@Override
- protected void setupDatasourceAndSysAndStateUser() throws Exception
+ protected void setupDatasourceWriteAndSysTableUser() throws Exception
{
createRoleWithPermissionsAndGroupMapping(
- "datasourceWithStateGroup",
- ImmutableMap.of("datasourceWithStateRole",
DATASOURCE_SYS_STATE_PERMISSIONS)
+ "datasourceWriteWithSysGroup",
+ ImmutableMap.of("datasourceWriteWithSysRole",
DATASOURCE_WRITE_SYS_PERMISSIONS)
+ );
+ }
+
+ @Override
+ protected void setupDatasourceReadAndSysAndStateUser() throws Exception
+ {
+ createRoleWithPermissionsAndGroupMapping(
+ "datasourceReadWithStateGroup",
+ ImmutableMap.of("datasourceReadWithStateRole",
DATASOURCE_READ_SYS_STATE_PERMISSIONS)
);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 0562e33..aa2bdce 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -872,7 +872,7 @@ public class SystemSchema extends AbstractSchema
);
Function<TaskStatusPlus, Iterable<ResourceAction>> raGenerator = task ->
Collections.singletonList(
-
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(task.getDataSource()));
+
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(task.getDataSource()));
final Iterable<TaskStatusPlus> authorizedTasks =
AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
@@ -1014,7 +1014,7 @@ public class SystemSchema extends AbstractSchema
);
Function<SupervisorStatus, Iterable<ResourceAction>> raGenerator =
supervisor -> Collections.singletonList(
-
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource()));
+
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(supervisor.getSource()));
final Iterable<SupervisorStatus> authorizedSupervisors =
AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index d25a902..b299a3a 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -81,9 +81,12 @@ import
org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
+import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -196,14 +199,7 @@ public class SystemSchemaTest extends CalciteTestBase
.addMockedMethod("getStatus")
.createMock();
request = EasyMock.createMock(Request.class);
- authMapper = new AuthorizerMapper(null)
- {
- @Override
- public Authorizer getAuthorizer(String name)
- {
- return (authenticationResult, resource, action) -> new Access(true);
- }
- };
+ authMapper = createAuthMapper();
final File tmpDir = temporaryFolder.newFolder();
final QueryableIndex index1 = IndexBuilder.create()
@@ -554,33 +550,7 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
EasyMock.replay(client, request, responseHolder, responseHandler,
metadataView);
- DataContext dataContext = new DataContext()
- {
- @Override
- public SchemaPlus getRootSchema()
- {
- return null;
- }
-
- @Override
- public JavaTypeFactory getTypeFactory()
- {
- return null;
- }
-
- @Override
- public QueryProvider getQueryProvider()
- {
- return null;
- }
-
- @Override
- public Object get(String name)
- {
- return CalciteTests.SUPER_USER_AUTH_RESULT;
- }
- };
-
+ DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
rows.sort((Object[] row1, Object[] row2) -> ((Comparable)
row1[0]).compareTo(row2[0]));
@@ -831,33 +801,7 @@ public class SystemSchemaTest extends CalciteTestBase
indexerNodeDiscovery
);
- DataContext dataContext = new DataContext()
- {
- @Override
- public SchemaPlus getRootSchema()
- {
- return null;
- }
-
- @Override
- public JavaTypeFactory getTypeFactory()
- {
- return null;
- }
-
- @Override
- public QueryProvider getQueryProvider()
- {
- return null;
- }
-
- @Override
- public Object get(String name)
- {
- return CalciteTests.SUPER_USER_AUTH_RESULT;
- }
- };
-
+ DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = serversTable.scan(dataContext).toList();
rows.sort((Object[] row1, Object[] row2) -> ((Comparable)
row1[0]).compareTo(row2[0]));
@@ -1112,32 +1056,7 @@ public class SystemSchemaTest extends CalciteTestBase
.andReturn(immutableDruidServers)
.once();
EasyMock.replay(serverView);
- DataContext dataContext = new DataContext()
- {
- @Override
- public SchemaPlus getRootSchema()
- {
- return null;
- }
-
- @Override
- public JavaTypeFactory getTypeFactory()
- {
- return null;
- }
-
- @Override
- public QueryProvider getQueryProvider()
- {
- return null;
- }
-
- @Override
- public Object get(String name)
- {
- return CalciteTests.SUPER_USER_AUTH_RESULT;
- }
- };
+ DataContext dataContext = createDataContext(Users.SUPER);
//server_segments table is the join of servers and segments table
// it will have 5 rows as follows
@@ -1230,32 +1149,7 @@ public class SystemSchemaTest extends CalciteTestBase
responseHolder.done();
EasyMock.replay(client, request, responseHandler);
- DataContext dataContext = new DataContext()
- {
- @Override
- public SchemaPlus getRootSchema()
- {
- return null;
- }
-
- @Override
- public JavaTypeFactory getTypeFactory()
- {
- return null;
- }
-
- @Override
- public QueryProvider getQueryProvider()
- {
- return null;
- }
-
- @Override
- public Object get(String name)
- {
- return CalciteTests.SUPER_USER_AUTH_RESULT;
- }
- };
+ DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = tasksTable.scan(dataContext).toList();
Object[] row0 = rows.get(0);
@@ -1295,6 +1189,81 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Test
+ public void testTasksTableAuth() throws Exception
+ {
+ SystemSchema.TasksTable tasksTable = new SystemSchema.TasksTable(client,
mapper, authMapper);
+
+ EasyMock.expect(client.makeRequest(HttpMethod.GET,
"/druid/indexer/v1/tasks"))
+ .andReturn(request)
+ .anyTimes();
+
+ String json = "[{\n"
+ + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+ + "\t\"groupId\":
\"group_index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+ + "\t\"type\": \"index\",\n"
+ + "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n"
+ + "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n"
+ + "\t\"statusCode\": \"FAILED\",\n"
+ + "\t\"runnerStatusCode\": \"NONE\",\n"
+ + "\t\"duration\": -1,\n"
+ + "\t\"location\": {\n"
+ + "\t\t\"host\": \"testHost\",\n"
+ + "\t\t\"port\": 1234,\n"
+ + "\t\t\"tlsPort\": -1\n"
+ + "\t},\n"
+ + "\t\"dataSource\": \"wikipedia\",\n"
+ + "\t\"errorMsg\": null\n"
+ + "}, {\n"
+ + "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+ + "\t\"groupId\":
\"group_index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+ + "\t\"type\": \"index\",\n"
+ + "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n"
+ + "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n"
+ + "\t\"statusCode\": \"RUNNING\",\n"
+ + "\t\"runnerStatusCode\": \"RUNNING\",\n"
+ + "\t\"duration\": null,\n"
+ + "\t\"location\": {\n"
+ + "\t\t\"host\": \"192.168.1.6\",\n"
+ + "\t\t\"port\": 8100,\n"
+ + "\t\t\"tlsPort\": -1\n"
+ + "\t},\n"
+ + "\t\"dataSource\": \"wikipedia\",\n"
+ + "\t\"errorMsg\": null\n"
+ + "}]";
+
+ HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+
+ EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject()))
+ .andReturn(createFullResponseHolder(httpResp, json))
+ .andReturn(createFullResponseHolder(httpResp, json))
+ .andReturn(createFullResponseHolder(httpResp, json));
+
+ EasyMock.expect(request.getUrl())
+ .andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks"))
+ .anyTimes();
+
+ EasyMock.replay(client, request, responseHandler);
+
+ // Verify that no row is returned for Datasource Read user
+ List<Object[]> rows = tasksTable
+ .scan(createDataContext(Users.DATASOURCE_READ))
+ .toList();
+ Assert.assertTrue(rows.isEmpty());
+
+ // Verify that 2 rows are is returned for Datasource Write user
+ rows = tasksTable
+ .scan(createDataContext(Users.DATASOURCE_WRITE))
+ .toList();
+ Assert.assertEquals(2, rows.size());
+
+ // Verify that 2 rows are returned for Super user
+ rows = tasksTable
+ .scan(createDataContext(Users.SUPER))
+ .toList();
+ Assert.assertEquals(2, rows.size());
+ }
+
+ @Test
public void testSupervisorTable() throws Exception
{
@@ -1337,7 +1306,111 @@ public class SystemSchemaTest extends CalciteTestBase
responseHolder.done();
EasyMock.replay(client, request, responseHandler);
- DataContext dataContext = new DataContext()
+ DataContext dataContext = createDataContext(Users.SUPER);
+ final List<Object[]> rows = supervisorTable.scan(dataContext).toList();
+
+ Object[] row0 = rows.get(0);
+ Assert.assertEquals("wikipedia", row0[0].toString());
+ Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString());
+ Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString());
+ Assert.assertEquals(0L, row0[3]);
+ Assert.assertEquals("kafka", row0[4].toString());
+ Assert.assertEquals("wikipedia", row0[5].toString());
+ Assert.assertEquals(0L, row0[6]);
+ Assert.assertEquals(
+
"{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}",
+ row0[7].toString()
+ );
+
+ // Verify value types.
+ verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
+ }
+
+ @Test
+ public void testSupervisorTableAuth() throws Exception
+ {
+ SystemSchema.SupervisorsTable supervisorTable =
+ new SystemSchema.SupervisorsTable(client, mapper, createAuthMapper());
+
+ EasyMock.expect(client.makeRequest(HttpMethod.GET,
"/druid/indexer/v1/supervisor?system"))
+ .andReturn(request)
+ .anyTimes();
+
+ final String json = "[{\n"
+ + "\t\"id\": \"wikipedia\",\n"
+ + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+ + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+ + "\t\"healthy\": false,\n"
+ + "\t\"specString\":
\"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}"
+ + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n"
+ + "\t\"type\": \"kafka\",\n"
+ + "\t\"source\": \"wikipedia\",\n"
+ + "\t\"suspended\": false\n"
+ + "}]";
+
+ HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject()))
+ .andReturn(createFullResponseHolder(httpResponse, json))
+ .andReturn(createFullResponseHolder(httpResponse, json))
+ .andReturn(createFullResponseHolder(httpResponse, json));
+
+ EasyMock.expect(responseHandler.getStatus())
+ .andReturn(httpResponse.getStatus().getCode())
+ .anyTimes();
+ EasyMock.expect(request.getUrl())
+ .andReturn(new
URL("http://test-host:1234/druid/indexer/v1/supervisor?system"))
+ .anyTimes();
+
+ EasyMock.replay(client, request, responseHandler);
+
+ // Verify that no row is returned for Datasource Read user
+ List<Object[]> rows = supervisorTable
+ .scan(createDataContext(Users.DATASOURCE_READ))
+ .toList();
+ Assert.assertTrue(rows.isEmpty());
+
+ // Verify that 1 row is returned for Datasource Write user
+ rows = supervisorTable
+ .scan(createDataContext(Users.DATASOURCE_WRITE))
+ .toList();
+ Assert.assertEquals(1, rows.size());
+
+ // Verify that 1 row is returned for Super user
+ rows = supervisorTable
+ .scan(createDataContext(Users.SUPER))
+ .toList();
+ Assert.assertEquals(1, rows.size());
+
+ // TODO: If needed, verify the first row here
+
+ // TODO: Verify value types.
+ // verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
+ }
+
+ /**
+ * Creates a response holder that contains the given json.
+ */
+ private InputStreamFullResponseHolder createFullResponseHolder(
+ HttpResponse httpResponse,
+ String json
+ )
+ {
+ InputStreamFullResponseHolder responseHolder =
+ new InputStreamFullResponseHolder(httpResponse.getStatus(),
httpResponse);
+
+ byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
+ responseHolder.addChunk(bytesToWrite);
+ responseHolder.done();
+
+ return responseHolder;
+ }
+
+ /**
+ * Creates a DataContext for the given username.
+ */
+ private DataContext createDataContext(String username)
+ {
+ return new DataContext()
{
@Override
public SchemaPlus getRootSchema()
@@ -1358,28 +1431,40 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Override
- public Object get(String name)
+ public Object get(String authorizerName)
{
- return CalciteTests.SUPER_USER_AUTH_RESULT;
+ return CalciteTests.TEST_SUPERUSER_NAME.equals(username)
+ ? CalciteTests.SUPER_USER_AUTH_RESULT
+ : new AuthenticationResult(username, authorizerName, null,
null);
}
};
- final List<Object[]> rows = supervisorTable.scan(dataContext).toList();
-
- Object[] row0 = rows.get(0);
- Assert.assertEquals("wikipedia", row0[0].toString());
- Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString());
- Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString());
- Assert.assertEquals(0L, row0[3]);
- Assert.assertEquals("kafka", row0[4].toString());
- Assert.assertEquals("wikipedia", row0[5].toString());
- Assert.assertEquals(0L, row0[6]);
- Assert.assertEquals(
-
"{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}",
- row0[7].toString()
- );
+ }
- // Verify value types.
- verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
+ private AuthorizerMapper createAuthMapper()
+ {
+ return new AuthorizerMapper(null)
+ {
+ @Override
+ public Authorizer getAuthorizer(String name)
+ {
+ return (authenticationResult, resource, action) -> {
+ final String username = authenticationResult.getIdentity();
+
+ // Allow access to a Datasource if
+ // - any user requests Read access
+ // - Super User or Datasource Write User requests Write access
+ if (resource.getType().equals(ResourceType.DATASOURCE)) {
+ return new Access(
+ action == Action.READ
+ || username.equals(Users.SUPER)
+ || username.equals(Users.DATASOURCE_WRITE)
+ );
+ }
+
+ return new Access(true);
+ };
+ }
+ };
}
private static void verifyTypes(final List<Object[]> rows, final
RowSignature signature)
@@ -1443,4 +1528,14 @@ public class SystemSchemaTest extends CalciteTestBase
}
}
}
+
+ /**
+ * Usernames to be used in tests.
+ */
+ private static class Users
+ {
+ private static final String SUPER = CalciteTests.TEST_SUPERUSER_NAME;
+ private static final String DATASOURCE_READ = "datasourceRead";
+ private static final String DATASOURCE_WRITE = "datasourceWrite";
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]