nvharikrishna commented on code in PR #165:
URL: https://github.com/apache/cassandra-sidecar/pull/165#discussion_r1896763272


##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/Action.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.ext.auth.authorization.Authorization;
+
+/**
+ * Represents an action that can be granted to a user on a resource or across 
resources.
+ */
+public interface Action

Review Comment:
   Can it exist without a name? If no, I think it is good to have a method 
returning the name.



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SidecarActions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+/**
+ * Sidecar actions allowed on specific targets are listed here. Majority of 
sidecar actions are represented in
+ * format <action_allowed>:<action_target>.

Review Comment:
   Roughly it says `action = action + ":" + target`. When something is added to 
an action, then it not just action but something more than an action. The 
action_target looks like resource or type of resource. It sounds like 
permission, not just action. Doesn't it makes sense to call them as Permission 
instead of `Action`?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/VariableAwareResource.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE;
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE;
+
+/**
+ * Resources sidecar can expect permissions for. This list is not exhaustive.
+ */
+public enum VariableAwareResource

Review Comment:
   What is the significance of `VariableAware` in it?



##########
server/src/main/java/org/apache/cassandra/sidecar/utils/AuthUtils.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.authorization.Action;
+import org.apache.cassandra.sidecar.acl.authorization.StandardAction;
+import org.apache.cassandra.sidecar.acl.authorization.WildcardAction;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_PART_DIVIDER_TOKEN;
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_TOKEN;
+
+/**
+ * Class with utility methods for Authentication and Authorization.
+ */
+public class AuthUtils
+{
+    /**
+     * Extracts a list of identities a user holds from their principal.
+     *
+     * @param user User object in Vertx
+     * @return extracted identities of user
+     */
+    public static List<String> extractIdentities(User user)
+    {
+        validatePrincipal(user);
+
+        return Optional.ofNullable(user.principal().getString("identity"))
+                       .map(Collections::singletonList)
+                       .orElseGet(() -> Arrays.asList(user.principal()
+                                                          
.getString("identities")
+                                                          .split(",")));
+    }
+
+    /**
+     * @return an instance of {@link Action} given the name
+     */
+    public static Action actionFromName(String name)
+    {
+        if (name == null)
+        {
+            throw new IllegalArgumentException("Name can not be null");
+        }
+
+        boolean isWildCard = name.equals(WILDCARD_TOKEN) || 
name.contains(WILDCARD_PART_DIVIDER_TOKEN);
+        if (isWildCard)
+        {
+            return new WildcardAction(name);
+        }
+        return new StandardAction(name);
+    }
+
+    private static void validatePrincipal(User user)
+    {
+        if (user.principal() == null)
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), "User 
principal empty");

Review Comment:
   HttpException will not sound apt if used from a place which is not a 
handler.  Can we use a new exception so that it sounds generic enough for any 
caller? 



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandler.java:
##########
@@ -64,6 +73,17 @@ public RestoreJobProgressHandler(InstanceMetadataFetcher 
metadataFetcher,
         this.consistencyLevelChecker = consistencyLevelChecker;
     }
 
+    @Override
+    public Set<Authorization> requiredAuthorizations()
+    {
+        String resource = 
VariableAwareResource.DATA_WITH_KEYSPACE_TABLE.resource();
+        Authorization createRestore = 
SidecarActions.CREATE_RESTORE.toAuthorization(resource);
+        Authorization viewRestore = 
SidecarActions.VIEW_RESTORE.toAuthorization(resource);
+        OrAuthorization createOrView

Review Comment:
   Shouldn't it be AndAuthorization?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/Action.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.ext.auth.authorization.Authorization;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_PART_DIVIDER_TOKEN;
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_TOKEN;
+
+/**
+ * Represents an action that can be granted to a user on a resource or across 
resources.
+ */
+public interface Action
+{
+    /**
+     * @return {@link Authorization}.
+     */
+    default Authorization toAuthorization()
+    {
+        return toAuthorization(null);

Review Comment:
   The code in the specified link is using the `Authorization 
toAuthorization(String resource);` method. It it used only in test cases. Is it 
required? If yes, then it is good to have expected behavior in the 
documentation comment.



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/AccessProtectedRouteBuilder.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import io.vertx.core.Handler;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.auth.authorization.AndAuthorization;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+import org.apache.cassandra.sidecar.acl.authorization.AdminIdentityResolver;
+import 
org.apache.cassandra.sidecar.acl.authorization.AuthorizationWithAdminBypassHandler;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE;
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE;
+
+/**
+ * Builder for building authorized routes
+ */
+public class AccessProtectedRouteBuilder
+{
+    private final AccessControlConfiguration accessControlConfiguration;
+    private final AuthorizationProvider authorizationProvider;
+    private final AdminIdentityResolver adminIdentityResolver;
+
+    private Router router;
+    private HttpMethod method;
+    private String endpoint;
+    private boolean setBodyHandler;
+    private final List<Handler<RoutingContext>> handlers = new ArrayList<>();
+
+    public AccessProtectedRouteBuilder(AccessControlConfiguration 
accessControlConfiguration,
+                                       AuthorizationProvider 
authorizationProvider,
+                                       AdminIdentityResolver 
adminIdentityResolver)
+    {
+        this.accessControlConfiguration = accessControlConfiguration;
+        this.authorizationProvider = authorizationProvider;
+        this.adminIdentityResolver = adminIdentityResolver;
+    }
+
+    /**
+     * Sets {@link Router} authorized route is built for
+     *
+     * @param router Router authorized route is built for
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder router(Router router)
+    {
+        this.router = router;
+        return this;
+    }
+
+    /**
+     * Sets {@link HttpMethod} for route
+     *
+     * @param method HttpMethod set for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder method(HttpMethod method)
+    {
+        this.method = method;
+        return this;
+    }
+
+    /**
+     * Sets path for route
+     *
+     * @param endpoint REST path for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder endpoint(String endpoint)
+    {
+        this.endpoint = endpoint;
+        return this;
+    }
+
+    /**
+     * Sets if BodyHandler should be created for the route.
+     *
+     * @param setBodyHandler boolean flag indicating if route requires 
BodyHandler
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder setBodyHandler(Boolean setBodyHandler)
+    {
+        this.setBodyHandler = setBodyHandler;
+        return this;
+    }
+
+    /**
+     * Adds handler to handler chain of route. Handlers are ordered, they are 
called in order they are set in chain.
+     *
+     * @param handler handler for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder handler(Handler<RoutingContext> handler)
+    {
+        this.handlers.add(handler);
+        return this;
+    }
+
+    /**
+     * Builds an authorized route. Adds {@link 
io.vertx.ext.web.handler.AuthorizationHandler} at top of handler
+     * chain if access control is enabled.
+     */
+    public void build()
+    {
+        Preconditions.checkArgument(router != null, "Router must be set");
+        Preconditions.checkArgument(method != null, "Http method must be set");
+        Preconditions.checkArgument(endpoint != null && !endpoint.isEmpty(), 
"Endpoint must be set");
+        Preconditions.checkArgument(!handlers.isEmpty(), "Handler chain can 
not be empty");
+
+        Route route = router.route(method, endpoint);
+
+        // BodyHandler should be at index 0 in handler chain
+        if (setBodyHandler)
+        {
+            route.handler(BodyHandler.create());
+        }
+
+        if (accessControlConfiguration.enabled())
+        {
+            // authorization handler added before route specific handler chain
+            AuthorizationWithAdminBypassHandler authorizationHandler
+            = new AuthorizationWithAdminBypassHandler(adminIdentityResolver, 
requiredAuthorization());
+            
authorizationHandler.addAuthorizationProvider(authorizationProvider);
+            
authorizationHandler.variableConsumer(routeGenericVariableConsumer());
+
+            route.handler(authorizationHandler);
+        }
+        handlers.forEach(route::handler);
+    }
+
+    private Authorization requiredAuthorization()
+    {
+        Set<Authorization> requiredAuthorizations = handlers
+                                                    .stream()
+                                                    .filter(handler -> handler 
instanceof AccessProtected)
+                                                    .map(handler -> 
(AccessProtected) handler)
+                                                    .flatMap(handler -> 
handler.requiredAuthorizations().stream())
+                                                    
.collect(Collectors.toSet());
+        if (requiredAuthorizations.isEmpty())
+        {
+            throw new ConfigurationException("Authorized route must have 
enforced authorizations set");
+        }
+        AndAuthorization andAuthorization = AndAuthorization.create();
+        requiredAuthorizations.forEach(andAuthorization::addAuthorization);
+        return andAuthorization;
+    }
+
+    private BiConsumer<RoutingContext, AuthorizationContext> 
routeGenericVariableConsumer()
+    {
+        return (routingCtx, authZContext) -> {
+            if (routingCtx.pathParams().containsKey(KEYSPACE))

Review Comment:
   Any reason to select pass only KEYSPACE & TABLE? What if we copy all path 
params?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/Action.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.ext.auth.authorization.Authorization;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_PART_DIVIDER_TOKEN;
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_TOKEN;
+
+/**
+ * Represents an action that can be granted to a user on a resource or across 
resources.

Review Comment:
   Based on the observation from the StandardAction and WildcardAction which 
are constructing PermissionBasedAuthorization & 
WildcardPermissionBasedAuthorization which are accepting `permission` as the 
constructor parameter. For consistency I feel it is apt to call it as 
Permission.



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.ext.auth.authorization.Authorization;
+import org.apache.cassandra.sidecar.acl.AuthCache;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.SidecarPermissionsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.SystemAuthDatabaseAccessor;
+
+/**
+ * Caches role and authorizations held by it. Entries from 
system_auth.role_permissions table in Cassandra and
+ * sidecar_internal.role_permissions_v1 table are processed into 
authorizations and cached here. All table entries are
+ * stored against a unique cache key. Caching against UNIQUE_CACHE_ENTRY is 
done to make sure new entries in the table
+ * are picked up during cache refreshes.
+ */
+@Singleton
+public class RoleAuthorizationsCache extends AuthCache<String, Map<String, 
Set<Authorization>>>

Review Comment:
   "AuthCache" name gives the impression that it is an authentication cache. 
But this class says "Authorization"Cache. Names are conflicting.



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationProvider.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * Provides authorizations based on user's role. Extracts permissions user 
holds from Cassandra's
+ * system_auth.role_permissions table and from Sidecar's 
sidecar_internal.role_permissions_v1 table and sets
+ * them in user.
+ */
+public class RoleBasedAuthorizationProvider implements AuthorizationProvider
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final RoleAuthorizationsCache roleAuthorizationsCache;
+
+    public RoleBasedAuthorizationProvider(IdentityToRoleCache 
identityToRoleCache,
+                                          RoleAuthorizationsCache 
roleAuthorizationsCache)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.roleAuthorizationsCache = roleAuthorizationsCache;
+    }
+
+    @Override
+    public String getId()
+    {
+        return "RoleBasedAccessControl";
+    }
+
+    @Override
+    public void getAuthorizations(User user, Handler<AsyncResult<Void>> 
handler)
+    {
+        getAuthorizations(user).onComplete(handler);
+    }
+
+    @Override
+    public Future<Void> getAuthorizations(User user)
+    {
+        List<String> identities = extractIdentities(user);
+
+        if (identities.isEmpty())
+        {
+            return Future.failedFuture(new 
HttpException(HttpResponseStatus.FORBIDDEN.code(),

Review Comment:
   HttpException is used to convery that user is unauthorized. It is giving the 
impression that it is used by handler while processing a request. Is it the 
case all the time? Does it makes sense to have separate an exception for it 
(something like UnauthorizedException)?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AdminIdentityResolver.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+
+/**
+ * Evaluates if provided identity is an admin identity.
+ */
+@Singleton
+public class AdminIdentityResolver
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final SuperUserCache superUserCache;
+    private final Set<String> adminIdentities;
+
+    @Inject
+    public AdminIdentityResolver(IdentityToRoleCache identityToRoleCache,
+                                 SuperUserCache superUserCache,
+                                 SidecarConfiguration sidecarConfiguration)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.superUserCache = superUserCache;
+        this.adminIdentities = 
sidecarConfiguration.accessControlConfiguration().adminIdentities();
+    }
+
+    public boolean isAdmin(String identity)
+    {
+        String role = identityToRoleCache.get(identity);
+        if (role == null)
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), "No 
matching Cassandra role found");
+        }
+        // Sidecar configured and Cassandra superusers have admin privileges
+        return adminIdentities.contains(identity) || 
superUserCache.isSuperUser(role);

Review Comment:
   If this `adminIdentities.contains(identity)` no need to look for roles. Then 
the call to `identityToRoleCache.get(identity);` can be deferred till we need 
to check with superUserCache.



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SidecarActions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+/**
+ * Sidecar actions allowed on specific targets are listed here. Majority of 
sidecar actions are represented in
+ * format <action_allowed>:<action_target>.
+ * <p>
+ * Example, with CREATE:SNAPSHOT permission, CREATE action is allowed for 
SNAPSHOT target. Sample actions are
+ * CREATE, VIEW, UPDATE, DELETE, STREAM, IMPORT, UPLOAD, START, ABORT etc.
+ * <p>
+ * Wildcard actions are supported with ':' wildcard parts divider and '*' 
wildcard token to match parts:
+ * <p>
+ * - *:SNAPSHOT allows CREATE:SNAPSHOT, VIEW:SNAPSHOT and DELETE:SNAPSHOT.
+ * - CREATE:* allows CREATE action on all possible targets.
+ * - *:* allows all possible permissions for specified resource
+ */
+public class SidecarActions

Review Comment:
   None of the wildcard actions initialized doesn't have wildcards in it. Any 
reason to use Wildcard action here instead of StandardAction? I see that 
StandardAction is not accepting `<action_allowed>:<action_target>` format of 
name, but it can be a StandardAction if the action and target type are 
specified precisely right?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SidecarActions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+/**
+ * Sidecar actions allowed on specific targets are listed here. Majority of 
sidecar actions are represented in
+ * format <action_allowed>:<action_target>.
+ * <p>
+ * Example, with CREATE:SNAPSHOT permission, CREATE action is allowed for 
SNAPSHOT target. Sample actions are
+ * CREATE, VIEW, UPDATE, DELETE, STREAM, IMPORT, UPLOAD, START, ABORT etc.
+ * <p>
+ * Wildcard actions are supported with ':' wildcard parts divider and '*' 
wildcard token to match parts:
+ * <p>
+ * - *:SNAPSHOT allows CREATE:SNAPSHOT, VIEW:SNAPSHOT and DELETE:SNAPSHOT.
+ * - CREATE:* allows CREATE action on all possible targets.
+ * - *:* allows all possible permissions for specified resource
+ */
+public class SidecarActions
+{
+    // cassandra cluster related actions
+    public static final Action VIEW_CLUSTER = new 
WildcardAction("VIEW:CLUSTER");
+
+    // SSTable related actions
+    public static final Action UPLOAD_SSTABLE = new 
WildcardAction("UPLOAD:SSTABLE");
+    public static final Action IMPORT_SSTABLE = new 
WildcardAction("IMPORT:SSTABLE");
+    public static final Action STREAM_SSTABLE = new 
WildcardAction("STREAM:SSTABLE");
+
+    // Upload related actions
+    public static final Action DELETE_UPLOAD = new 
WildcardAction("DELETE:UPLOAD");

Review Comment:
   What does UPLOAD as a target represent? UPLOAD sounds more like an action 
than a target. UPLOAD_SSTABLE is using UPLOAD as an action. It's a bit 
confusing to have it as a target. The target can be more precise I feel.
   



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AdminIdentityResolver.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+
+/**
+ * Evaluates if provided identity is an admin identity.
+ */
+@Singleton
+public class AdminIdentityResolver
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final SuperUserCache superUserCache;
+    private final Set<String> adminIdentities;
+
+    @Inject
+    public AdminIdentityResolver(IdentityToRoleCache identityToRoleCache,
+                                 SuperUserCache superUserCache,
+                                 SidecarConfiguration sidecarConfiguration)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.superUserCache = superUserCache;
+        this.adminIdentities = 
sidecarConfiguration.accessControlConfiguration().adminIdentities();
+    }
+
+    public boolean isAdmin(String identity)
+    {
+        String role = identityToRoleCache.get(identity);
+        if (role == null)
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), "No 
matching Cassandra role found");

Review Comment:
   When role is null, it means that identity doesn't have a role yet, which 
means he is not an admin. If he is not an admin, then shouldn't it just return 
false instead of throwing exception?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/WildcardAction.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Arrays;
+
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.WildcardPermissionBasedAuthorization;
+import 
io.vertx.ext.auth.authorization.impl.WildcardPermissionBasedAuthorizationImpl;
+
+/**
+ * Wildcard actions allow grouping allowed actions
+ */
+public class WildcardAction extends StandardAction

Review Comment:
   Not inheriting any behavior from StandardAction, should it really extend 
StandardAction? Can it just implement Actoin?



##########
conf/sidecar.yaml:
##########
@@ -174,6 +174,13 @@ access_control:
         #
         # other options are, 
io.vertx.ext.auth.mtls.impl.SpiffeIdentityExtractor.
         certificate_identity_extractor: 
org.apache.cassandra.sidecar.acl.authentication.CassandraIdentityExtractor
+  authorizer:
+    # AuthorizationProvider provides authorizations an authenticated user 
holds.
+    #
+    # 
org.apache.cassandra.sidecar.acl.authorization.AllowAllAuthorizationProvider 
marks all requests as authorized.
+    # Other options are 
org.apache.cassandra.sidecar.acl.authorization.RoleBaseAuthorizationProvider, 
it validates
+    # role associated with authenticated user has permission for resource it 
accesses.
+    - class_name: 
org.apache.cassandra.sidecar.acl.authorization.AllowAllAuthorizationProvider

Review Comment:
   Is it the default authorizer?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SidecarActions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+/**
+ * Sidecar actions allowed on specific targets are listed here. Majority of 
sidecar actions are represented in
+ * format <action_allowed>:<action_target>.
+ * <p>
+ * Example, with CREATE:SNAPSHOT permission, CREATE action is allowed for 
SNAPSHOT target. Sample actions are
+ * CREATE, VIEW, UPDATE, DELETE, STREAM, IMPORT, UPLOAD, START, ABORT etc.
+ * <p>
+ * Wildcard actions are supported with ':' wildcard parts divider and '*' 
wildcard token to match parts:
+ * <p>
+ * - *:SNAPSHOT allows CREATE:SNAPSHOT, VIEW:SNAPSHOT and DELETE:SNAPSHOT.
+ * - CREATE:* allows CREATE action on all possible targets.
+ * - *:* allows all possible permissions for specified resource
+ */
+public class SidecarActions
+{
+    // cassandra cluster related actions
+    public static final Action VIEW_CLUSTER = new 
WildcardAction("VIEW:CLUSTER");
+
+    // SSTable related actions
+    public static final Action UPLOAD_SSTABLE = new 
WildcardAction("UPLOAD:SSTABLE");
+    public static final Action IMPORT_SSTABLE = new 
WildcardAction("IMPORT:SSTABLE");
+    public static final Action STREAM_SSTABLE = new 
WildcardAction("STREAM:SSTABLE");
+
+    // Upload related actions
+    public static final Action DELETE_UPLOAD = new 
WildcardAction("DELETE:UPLOAD");
+
+    // snapshot related actions
+    public static final Action CREATE_SNAPSHOT = new 
WildcardAction("CREATE:SNAPSHOT");
+    public static final Action VIEW_SNAPSHOT = new 
WildcardAction("VIEW:SNAPSHOT");
+    public static final Action DELETE_SNAPSHOT = new 
WildcardAction("DELETE:SNAPSHOT");
+
+    // restore related actions
+    public static final Action CREATE_RESTORE = new 
WildcardAction("CREATE:RESTORE_JOB");
+    public static final Action VIEW_RESTORE = new 
WildcardAction("VIEW:RESTORE_JOB");
+    public static final Action UPDATE_RESTORE = new 
WildcardAction("UPDATE:RESTORE_JOB");
+    public static final Action ABORT_RESTORE = new 
WildcardAction("ABORT:RESTORE_JOB");

Review Comment:
   CREATE_RESTORE - create and restore both sounds like actions. Make sense to 
call them like this?
   
   ```suggestion
       public static final Action CREATE_RESTORE_JOB = new 
WildcardAction("CREATE:RESTORE_JOB");
       public static final Action VIEW_RESTORE_JOB = new 
WildcardAction("VIEW:RESTORE_JOB");
       public static final Action UPDATE_RESTORE_JOB = new 
WildcardAction("UPDATE:RESTORE_JOB");
       public static final Action ABORT_RESTORE_JOB = new 
WildcardAction("ABORT:RESTORE_JOB");
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationProvider.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * Provides authorizations based on user's role. Extracts permissions user 
holds from Cassandra's
+ * system_auth.role_permissions table and from Sidecar's 
sidecar_internal.role_permissions_v1 table and sets
+ * them in user.
+ */
+public class RoleBasedAuthorizationProvider implements AuthorizationProvider
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final RoleAuthorizationsCache roleAuthorizationsCache;
+
+    public RoleBasedAuthorizationProvider(IdentityToRoleCache 
identityToRoleCache,
+                                          RoleAuthorizationsCache 
roleAuthorizationsCache)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.roleAuthorizationsCache = roleAuthorizationsCache;
+    }
+
+    @Override
+    public String getId()
+    {
+        return "RoleBasedAccessControl";
+    }
+
+    @Override
+    public void getAuthorizations(User user, Handler<AsyncResult<Void>> 
handler)
+    {
+        getAuthorizations(user).onComplete(handler);
+    }
+
+    @Override
+    public Future<Void> getAuthorizations(User user)
+    {
+        List<String> identities = extractIdentities(user);
+
+        if (identities.isEmpty())
+        {
+            return Future.failedFuture(new 
HttpException(HttpResponseStatus.FORBIDDEN.code(),
+                                                         "Missing client 
identities"));
+        }
+
+        Set<Authorization> authorizations = new HashSet<>();
+        for (String identity : identities)
+        {
+            String role = identityToRoleCache.get(identity);
+            if (role == null)
+            {
+                continue;
+            }
+            // when entries in cache are not found, null is returned. We can 
not add null in user.authorizations()
+            
authorizations.addAll(Optional.ofNullable(roleAuthorizationsCache.getAuthorizations(role))
+                                          .orElse(Collections.emptySet()));

Review Comment:
   ```suggestion
               
Optional.ofNullable(roleAuthorizationsCache.getAuthorizations(role)).ifPresent(authorizations::addAll);
   ```
   If done this way, no need to invoke authorizations.addAll with emptySet.



##########
server/src/main/java/org/apache/cassandra/sidecar/db/SystemAuthDatabaseAccessor.java:
##########
@@ -78,11 +78,59 @@ public Map<String, String> findAllIdentityToRoles()
         return results;
     }
 
-    private void ensureIdentityToRoleTableAccess()
+    /**
+     * Queries Cassandra for all rows in system_auth.role_permissions table. 
Maps permissions of a role into
+     * {@link Authorization} and returns a {@code Map} of cassandra role to 
authorizations
+     *
+     * @return - {@code Map} contains role and granted authorizations
+     */
+    public Map<String, Set<Authorization>> getAllRolesAndPermissions()
+    {
+        BoundStatement statement = 
tableSchema.getAllRolesAndPermissions().bind();
+        ResultSet result = execute(statement);
+        Map<String, Set<Authorization>> roleAuthorizations = new HashMap<>();
+        for (Row row : result)
+        {
+            String role = row.getString("role");
+            String resource = row.getString("resource");
+            Set<Authorization> authorizations = row.getSet("permissions", 
String.class)
+                                                   .stream()
+                                                   .map(permission -> new 
PermissionBasedAuthorizationImpl(permission)
+                                                                      
.setResource(resource))
+                                                   
.collect(Collectors.toSet());
+            roleAuthorizations.computeIfAbsent(role, k -> new 
HashSet<>(authorizations)).addAll(authorizations);

Review Comment:
   ```suggestion
               roleAuthorizations.computeIfAbsent(role, k -> new 
HashSet<>()).addAll(authorizations);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/SidecarPermissionsDatabaseAccessor.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.ext.auth.authorization.Authorization;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.SidecarRolePermissionsSchema;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.actionFromName;
+
+/**
+ * {@link SidecarPermissionsDatabaseAccessor} is an accessor for 
role_permissions_v1 table under sidecar_internal
+ * keyspace. Custom sidecar specific permissions are stored in this table.
+ */
+@Singleton
+public class SidecarPermissionsDatabaseAccessor extends 
DatabaseAccessor<SidecarRolePermissionsSchema>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SidecarPermissionsDatabaseAccessor.class);
+
+    @Inject
+    protected SidecarPermissionsDatabaseAccessor(SidecarRolePermissionsSchema 
tableSchema,
+                                                 CQLSessionProvider 
sessionProvider)
+    {
+        super(tableSchema, sessionProvider);
+    }
+
+    /**
+     * Queries Sidecar for all rows in sidecar_internal.role_permissions_v1 
table. This table maps permissions of a
+     * role into {@link Authorization} and returns a {@code Map} of user role 
to authorizations.
+     *
+     * @return - {@code Map} contains role and granted authorizations
+     */
+    public Map<String, Set<Authorization>> getAllRolesAndPermissions()
+    {
+        BoundStatement statement = 
tableSchema.getAllRolesAndPermissions().bind();
+        ResultSet result = execute(statement);
+        Map<String, Set<Authorization>> roleAuthorizations = new HashMap<>();
+        for (Row row : result)
+        {
+            String role = row.getString("role");
+            String resource = row.getString("resource");
+            Set<String> permissions = row.getSet("permissions", String.class);
+            Set<Authorization> authorizations = new HashSet<>();
+            for (String permission : permissions)
+            {
+                try
+                {
+                    
authorizations.add(actionFromName(permission).toAuthorization(resource));
+                }
+                catch (Exception e)
+                {
+                    logger.error("Error reading sidecar permission {} for 
resource {} for role {}, e",
+                                 permission, resource, role, e);
+                }
+            }
+            if (!authorizations.isEmpty())
+            {
+                roleAuthorizations.computeIfAbsent(role, k -> new 
HashSet<>(authorizations)).addAll(authorizations);

Review Comment:
   If it is the first time role is getting added, then `addAll` will be called 
implicitly from the construstor and same set of elements will bee added again 
with `.addAll` outside. How about this?
   ```suggestion
                   roleAuthorizations.computeIfAbsent(role, k -> new 
HashSet<>(authorizations.size())).addAll(authorizations);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java:
##########
@@ -66,6 +75,17 @@ public UpdateRestoreJobHandler(ExecutorPools executorPools,
         this.metrics = metrics.server().restore();
     }
 
+    @Override
+    public Set<Authorization> requiredAuthorizations()
+    {
+        String resource = 
VariableAwareResource.DATA_WITH_KEYSPACE_TABLE.resource();
+        Authorization createRestore = 
SidecarActions.CREATE_RESTORE.toAuthorization(resource);
+        Authorization updateRestore = 
SidecarActions.UPDATE_RESTORE.toAuthorization(resource);
+        OrAuthorization createOrUpdate
+        = new 
OrAuthorizationImpl().addAuthorization(createRestore).addAuthorization(updateRestore);

Review Comment:
   Why it needs createRestore authorization?



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemAuthSchema.java:
##########
@@ -44,19 +55,42 @@ protected String keyspaceName()
     @Override
     protected void prepareStatements(@NotNull Session session)
     {
+        getSuperUserStatus = prepare(getSuperUserStatus,
+                                     session,
+                                     GET_SUPER_USER_STATUS);
+
+        getRoles = prepare(getRoles,
+                           session,
+                           GET_ROLES);
+
+        getAllRolesAndPermissions = prepare(getAllRolesAndPermissions,
+                                            session,
+                                            GET_ALL_ROLES_AND_PERMISSIONS);
+
         KeyspaceMetadata keyspaceMetadata = 
session.getCluster().getMetadata().getKeyspace(keyspaceName());
         // identity_to_role table exists in Cassandra versions starting 5.x
         if (keyspaceMetadata == null || 
keyspaceMetadata.getTable(IDENTITY_TO_ROLE_TABLE) == null)
         {
+            logger.info("Auth table does not exist. Skip preparing. 
table={}/{}", keyspaceName(), IDENTITY_TO_ROLE_TABLE);

Review Comment:
   ```suggestion
               logger.info("Auth table does not exist. Skip preparing 
table={}/{}", keyspaceName(), IDENTITY_TO_ROLE_TABLE);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java:
##########
@@ -52,7 +53,7 @@ public class SidecarSchema
     private final ExecutorPools executorPools;
     private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration;
     private final SidecarInternalKeyspace sidecarInternalKeyspace;
-    private final AtomicLong initializationTimerId = new AtomicLong(-1L);
+    private final AtomicLong initializationTimerId = new 
AtomicLong(Long.MIN_VALUE);

Review Comment:
   Any reason for this change?



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/AccessProtectedRouteBuilder.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import io.vertx.core.Handler;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.auth.authorization.AndAuthorization;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+import org.apache.cassandra.sidecar.acl.authorization.AdminIdentityResolver;
+import 
org.apache.cassandra.sidecar.acl.authorization.AuthorizationWithAdminBypassHandler;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE;
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE;
+
+/**
+ * Builder for building authorized routes
+ */
+public class AccessProtectedRouteBuilder
+{
+    private final AccessControlConfiguration accessControlConfiguration;
+    private final AuthorizationProvider authorizationProvider;
+    private final AdminIdentityResolver adminIdentityResolver;
+
+    private Router router;
+    private HttpMethod method;
+    private String endpoint;
+    private boolean setBodyHandler;
+    private final List<Handler<RoutingContext>> handlers = new ArrayList<>();
+
+    public AccessProtectedRouteBuilder(AccessControlConfiguration 
accessControlConfiguration,
+                                       AuthorizationProvider 
authorizationProvider,
+                                       AdminIdentityResolver 
adminIdentityResolver)
+    {
+        this.accessControlConfiguration = accessControlConfiguration;
+        this.authorizationProvider = authorizationProvider;
+        this.adminIdentityResolver = adminIdentityResolver;
+    }
+
+    /**
+     * Sets {@link Router} authorized route is built for
+     *
+     * @param router Router authorized route is built for
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder router(Router router)
+    {
+        this.router = router;
+        return this;
+    }
+
+    /**
+     * Sets {@link HttpMethod} for route
+     *
+     * @param method HttpMethod set for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder method(HttpMethod method)
+    {
+        this.method = method;
+        return this;
+    }
+
+    /**
+     * Sets path for route
+     *
+     * @param endpoint REST path for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder endpoint(String endpoint)
+    {
+        this.endpoint = endpoint;
+        return this;
+    }
+
+    /**
+     * Sets if BodyHandler should be created for the route.
+     *
+     * @param setBodyHandler boolean flag indicating if route requires 
BodyHandler
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder setBodyHandler(Boolean setBodyHandler)
+    {
+        this.setBodyHandler = setBodyHandler;
+        return this;
+    }
+
+    /**
+     * Adds handler to handler chain of route. Handlers are ordered, they are 
called in order they are set in chain.
+     *
+     * @param handler handler for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder handler(Handler<RoutingContext> handler)
+    {
+        this.handlers.add(handler);
+        return this;
+    }
+
+    /**
+     * Builds an authorized route. Adds {@link 
io.vertx.ext.web.handler.AuthorizationHandler} at top of handler
+     * chain if access control is enabled.
+     */
+    public void build()
+    {
+        Preconditions.checkArgument(router != null, "Router must be set");
+        Preconditions.checkArgument(method != null, "Http method must be set");
+        Preconditions.checkArgument(endpoint != null && !endpoint.isEmpty(), 
"Endpoint must be set");
+        Preconditions.checkArgument(!handlers.isEmpty(), "Handler chain can 
not be empty");
+
+        Route route = router.route(method, endpoint);
+
+        // BodyHandler should be at index 0 in handler chain
+        if (setBodyHandler)
+        {
+            route.handler(BodyHandler.create());
+        }
+
+        if (accessControlConfiguration.enabled())
+        {
+            // authorization handler added before route specific handler chain
+            AuthorizationWithAdminBypassHandler authorizationHandler
+            = new AuthorizationWithAdminBypassHandler(adminIdentityResolver, 
requiredAuthorization());
+            
authorizationHandler.addAuthorizationProvider(authorizationProvider);
+            
authorizationHandler.variableConsumer(routeGenericVariableConsumer());
+
+            route.handler(authorizationHandler);
+        }
+        handlers.forEach(route::handler);
+    }
+
+    private Authorization requiredAuthorization()
+    {
+        Set<Authorization> requiredAuthorizations = handlers
+                                                    .stream()
+                                                    .filter(handler -> handler 
instanceof AccessProtected)
+                                                    .map(handler -> 
(AccessProtected) handler)
+                                                    .flatMap(handler -> 
handler.requiredAuthorizations().stream())
+                                                    
.collect(Collectors.toSet());
+        if (requiredAuthorizations.isEmpty())
+        {
+            throw new ConfigurationException("Authorized route must have 
enforced authorizations set");

Review Comment:
   Meant to say that authorizations should have been 'declared'? If yes, then 
does it sound better?
   
   ```suggestion
               throw new ConfigurationException("Authorized route must have 
authorizations declared");
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/AccessProtectedRouteBuilder.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import io.vertx.core.Handler;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.auth.authorization.AndAuthorization;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+import org.apache.cassandra.sidecar.acl.authorization.AdminIdentityResolver;
+import 
org.apache.cassandra.sidecar.acl.authorization.AuthorizationWithAdminBypassHandler;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE;
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE;
+
+/**
+ * Builder for building authorized routes
+ */
+public class AccessProtectedRouteBuilder
+{
+    private final AccessControlConfiguration accessControlConfiguration;
+    private final AuthorizationProvider authorizationProvider;
+    private final AdminIdentityResolver adminIdentityResolver;
+
+    private Router router;
+    private HttpMethod method;
+    private String endpoint;
+    private boolean setBodyHandler;
+    private final List<Handler<RoutingContext>> handlers = new ArrayList<>();
+
+    public AccessProtectedRouteBuilder(AccessControlConfiguration 
accessControlConfiguration,
+                                       AuthorizationProvider 
authorizationProvider,
+                                       AdminIdentityResolver 
adminIdentityResolver)
+    {
+        this.accessControlConfiguration = accessControlConfiguration;
+        this.authorizationProvider = authorizationProvider;
+        this.adminIdentityResolver = adminIdentityResolver;
+    }
+
+    /**
+     * Sets {@link Router} authorized route is built for
+     *
+     * @param router Router authorized route is built for
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder router(Router router)
+    {
+        this.router = router;
+        return this;
+    }
+
+    /**
+     * Sets {@link HttpMethod} for route
+     *
+     * @param method HttpMethod set for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder method(HttpMethod method)
+    {
+        this.method = method;
+        return this;
+    }
+
+    /**
+     * Sets path for route
+     *
+     * @param endpoint REST path for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder endpoint(String endpoint)
+    {
+        this.endpoint = endpoint;
+        return this;
+    }
+
+    /**
+     * Sets if BodyHandler should be created for the route.
+     *
+     * @param setBodyHandler boolean flag indicating if route requires 
BodyHandler
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder setBodyHandler(Boolean setBodyHandler)
+    {
+        this.setBodyHandler = setBodyHandler;
+        return this;
+    }
+
+    /**
+     * Adds handler to handler chain of route. Handlers are ordered, they are 
called in order they are set in chain.
+     *
+     * @param handler handler for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder handler(Handler<RoutingContext> handler)
+    {
+        this.handlers.add(handler);
+        return this;
+    }
+
+    /**
+     * Builds an authorized route. Adds {@link 
io.vertx.ext.web.handler.AuthorizationHandler} at top of handler
+     * chain if access control is enabled.
+     */
+    public void build()
+    {
+        Preconditions.checkArgument(router != null, "Router must be set");
+        Preconditions.checkArgument(method != null, "Http method must be set");
+        Preconditions.checkArgument(endpoint != null && !endpoint.isEmpty(), 
"Endpoint must be set");
+        Preconditions.checkArgument(!handlers.isEmpty(), "Handler chain can 
not be empty");
+
+        Route route = router.route(method, endpoint);
+
+        // BodyHandler should be at index 0 in handler chain
+        if (setBodyHandler)
+        {
+            route.handler(BodyHandler.create());
+        }
+
+        if (accessControlConfiguration.enabled())
+        {
+            // authorization handler added before route specific handler chain
+            AuthorizationWithAdminBypassHandler authorizationHandler
+            = new AuthorizationWithAdminBypassHandler(adminIdentityResolver, 
requiredAuthorization());
+            
authorizationHandler.addAuthorizationProvider(authorizationProvider);
+            
authorizationHandler.variableConsumer(routeGenericVariableConsumer());
+
+            route.handler(authorizationHandler);
+        }
+        handlers.forEach(route::handler);

Review Comment:
   It doesn't allow to add a blocking handler. Can it allow adding a blocking 
handler?



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemAuthSchema.java:
##########
@@ -66,21 +100,41 @@ protected String tableName()
                                                 "tables in system_auth 
keyspace");
     }
 
-    @Nullable
+    @NotNull
     public PreparedStatement selectRoleFromIdentity()
     {
+        ensureSchemaAvailable();

Review Comment:
   ensureSchemaAvailable is throwing SchemaUnavailableException which a runtime 
exception. While the other methods whcih return the prepared statements could 
return null. Any reason to chose different behavior for these methods? Can it 
be made same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to