[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15022398#comment-15022398
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45624346
--- Diff:
storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java ---
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.NimbusPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+ public static final Logger LOG =
LoggerFactory.getLogger(BlobStoreAclHandler.class);
+ private final IPrincipalToLocal _ptol;
+
+ public static final int READ = 0x01;
+ public static final int WRITE = 0x02;
+ public static final int ADMIN = 0x04;
+ public static final List<AccessControl> WORLD_EVERYTHING =
+ Arrays.asList(new AccessControl(AccessControlType.OTHER, READ |
WRITE | ADMIN));
+ public static final List<AccessControl> DEFAULT = new
ArrayList<AccessControl>();
+ private Set<String> _supervisors;
+ private Set<String> _admins;
+
+ public BlobStoreAclHandler(Map conf) {
+ _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+ _supervisors = new HashSet<String>();
+ _admins = new HashSet<String>();
+ if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+
_supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+ }
+ if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+ _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS));
+ }
+ }
+
+ private static AccessControlType parseACLType(String type) {
+ if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+ return AccessControlType.OTHER;
+ } else if ("user".equalsIgnoreCase(type) ||
"u".equalsIgnoreCase(type)) {
+ return AccessControlType.USER;
+ }
+ throw new IllegalArgumentException(type+" is not a valid access
control type");
+ }
+
+ private static int parseAccess(String access) {
+ int ret = 0;
+ for (char c: access.toCharArray()) {
+ if ('r' == c) {
+ ret = ret | READ;
+ } else if ('w' == c) {
+ ret = ret | WRITE;
+ } else if ('a' == c) {
+ ret = ret | ADMIN;
+ } else if ('-' == c) {
+ //ignored
+ } else {
+ throw new IllegalArgumentException("");
+ }
+ }
+ return ret;
+ }
+
+ public static AccessControl parseAccessControl(String str) {
+ String[] parts = str.split(":");
+ String type = "other";
+ String name = "";
+ String access = "-";
+ if (parts.length > 3) {
+ throw new IllegalArgumentException("Don't know how to parse "+str+"
into an ACL value");
+ } else if (parts.length == 1) {
+ type = "other";
+ name = "";
+ access = parts[0];
+ } else if (parts.length == 2) {
+ type = "user";
+ name = parts[0];
+ access = parts[1];
+ } else if (parts.length == 3) {
+ type = parts[0];
+ name = parts[1];
+ access = parts[2];
+ }
+ AccessControl ret = new AccessControl();
+ ret.set_type(parseACLType(type));
+ ret.set_name(name);
+ ret.set_access(parseAccess(access));
+ return ret;
+ }
+
+ private static String accessToString(int access) {
+ StringBuffer ret = new StringBuffer();
+ ret.append(((access & READ) > 0) ? "r" : "-");
+ ret.append(((access & WRITE) > 0) ? "w" : "-");
+ ret.append(((access & ADMIN) > 0) ? "a" : "-");
+ return ret.toString();
+ }
+
+ public static String accessControlToString(AccessControl ac) {
+ StringBuffer ret = new StringBuffer();
+ switch(ac.get_type()) {
+ case OTHER:
+ ret.append("o");
+ break;
+ case USER:
+ ret.append("u");
+ break;
+ default:
+ throw new IllegalArgumentException("Ahh don't know what a type of
"+ac.get_type()+" means ");
+ }
+ ret.append(":");
+ if (ac.is_set_name()) {
+ ret.append(ac.get_name());
+ }
+ ret.append(":");
+ ret.append(accessToString(ac.get_access()));
+ return ret.toString();
+ }
+
+ public static void validateSettableACLs(String key, List<AccessControl>
acls) throws AuthorizationException {
+ Set<String> aclUsers = new HashSet<>();
+ List<String> duplicateUsers = new ArrayList<>();
+ for (AccessControl acl : acls) {
+ String aclUser = acl.get_name();
+ if (aclUser != null && !aclUser.isEmpty() && !aclUsers.add(aclUser))
{
+ LOG.error("'{}' user can't appear more than once in the ACLs",
aclUser);
+ duplicateUsers.add(aclUser);
+ }
+ }
+ if (duplicateUsers.size() > 0) {
+ String errorMessage = "user " +
Arrays.toString(duplicateUsers.toArray())
+ + " can't appear more than once in the ACLs for key [" + key
+"].";
+ throw new AuthorizationException(errorMessage);
+ }
+ }
+
+ private Set<String> constructUserFromPrincipals(Subject who) {
+ Set<String> user = new HashSet<String>();
+ if (who != null) {
+ for (Principal p : who.getPrincipals()) {
+ user.add(_ptol.toLocal(p));
+ }
+ }
+ return user;
+ }
+
+ private boolean isAdmin(Subject who) {
+ Set<String> user = constructUserFromPrincipals(who);
+ for (String u : user) {
+ if (_admins.contains(u)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isReadOperation(int operation) {
+ if (operation == 1) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isSupervisor(Subject who, int operation) {
+ Set<String> user = constructUserFromPrincipals(who);
+ if (isReadOperation(operation)) {
+ for (String u : user) {
+ if (_supervisors.contains(u)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isNimbus(Subject who) {
+ Set<Principal> principals = null;
+ boolean isNimbusInstance = false;
+ if (who != null) {
+ principals = who.getPrincipals();
+ for (Principal principal : principals) {
+ if (principal instanceof NimbusPrincipal) {
+ isNimbusInstance = true;
+ }
+ }
+ }
+ return isNimbusInstance;
+ }
+
+ public boolean checkForValidUsers(Subject who, int mask) {
+ if (isNimbus(who) || isAdmin(who) || isSupervisor(who,mask)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * The user should be able to see the metadata if and only if they have
any of READ, WRITE, or ADMIN
+ */
+ public void validateUserCanReadMeta(List<AccessControl> acl, Subject
who, String key) throws AuthorizationException {
+ hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key);
+ }
+
+ /**
+ * Validates if the user has any of the permissions
+ * mentioned in the mask.
+ * @param acl ACL for the key.
+ * @param mask mask holds the cummulative value of
+ * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+ * mask = 1 implies READ privilege.
+ * mask = 5 implies READ and ADMIN privileges.
+ * @param who Is the user against whom the permissions
+ * are validated for a key using the ACL and the mask.
+ * @param key Key used to identify the blob.
+ * @throws AuthorizationException
+ */
+ public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject
who, String key) throws AuthorizationException {
+ Set<String> user = constructUserFromPrincipals(who);
+ LOG.debug("user {}", user);
+ if (checkForValidUsers(who, mask)) {
+ return;
+ }
+ for (AccessControl ac : acl) {
+ int allowed = getAllowed(ac, user);
+ LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key);
+ if ((allowed & mask) > 0) {
+ return;
+ }
+ }
+ throw new AuthorizationException(
+ user + " does not have access to " + key);
+ }
+
+ /**
+ * Validates if the user has atleast the set of permissions
+ * mentioned in the mask.
+ * @param acl ACL for the key.
+ * @param mask mask holds the cummulative value of
--- End diff --
`cumulative`
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)