http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java new file mode 100644 index 0000000..4b27e7b --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.fs.permission.FsAction; + +import com.google.common.collect.Lists; + +public class SentryPermissions implements AuthzPermissions { + + public static class PrivilegeInfo { + private final String authzObj; + private final Map<String, FsAction> roleToPermission = new HashMap<String, FsAction>(); + public PrivilegeInfo(String authzObj) { + this.authzObj = authzObj; + } + public PrivilegeInfo setPermission(String role, FsAction perm) { + roleToPermission.put(role, perm); + return this; + } + public PrivilegeInfo removePermission(String role) { + roleToPermission.remove(role); + return this; + } + public FsAction getPermission(String role) { + return roleToPermission.get(role); + } + public Map<String, FsAction> getAllPermissions() { + return roleToPermission; + } + public String getAuthzObj() { + return authzObj; + } + } + + public static class RoleInfo { + private final String role; + private final Set<String> groups = new HashSet<String>(); + public RoleInfo(String role) { + this.role = role; + } + public RoleInfo addGroup(String group) { + groups.add(group); + return this; + } + public RoleInfo delGroup(String group) { + groups.remove(group); + return this; + } + public String getRole() { + return role; + } + public Set<String> getAllGroups() { + return groups; + } + } + + private final Map<String, PrivilegeInfo> privileges = new HashMap<String, PrivilegeInfo>(); + private final Map<String, RoleInfo> roles = new HashMap<String, RoleInfo>(); + private Map<String, Set<String>> authzObjChildren = new HashMap<String, Set<String>>(); + + String getParentAuthzObject(String authzObject) { + int dot = authzObject.indexOf('.'); + if (dot > 0) { + return authzObject.substring(0, dot); + } else { + return authzObject; + } + } + + void addParentChildMappings(String authzObject) { + String parent = getParentAuthzObject(authzObject); + if (parent != null) { + Set<String> children = authzObjChildren.get(parent); + if (children == null) { + children = new HashSet<String>(); + authzObjChildren.put(parent, children); + } + children.add(authzObject); + } + } + + void removeParentChildMappings(String authzObject) { + String parent = getParentAuthzObject(authzObject); + if (parent != null) { + Set<String> children = authzObjChildren.get(parent); + if (children != null) { + children.remove(authzObject); + } + } else { + // is parent + authzObjChildren.remove(authzObject); + } + } + + private Map<String, FsAction> getGroupPerms(String authzObj) { + Map<String, FsAction> groupPerms = new HashMap<String, FsAction>(); + if (authzObj == null) { + return groupPerms; + } + PrivilegeInfo privilegeInfo = privileges.get(authzObj); + if (privilegeInfo != null) { + for (Map.Entry<String, FsAction> privs : privilegeInfo + .getAllPermissions().entrySet()) { + constructAclEntry(privs.getKey(), privs.getValue(), groupPerms); + } + } + return groupPerms; + } + + @Override + public List<AclEntry> getAcls(String authzObj) { + Map<String, FsAction> groupPerms = getGroupPerms(authzObj); + String parent = getParentAuthzObject(authzObj); + Map<String, FsAction> pGroupPerms = null; + if (parent == null) { + pGroupPerms = new HashMap<String, FsAction>(); + } else { + pGroupPerms = getGroupPerms(getParentAuthzObject(authzObj)); + if ((groupPerms == null)||(groupPerms.size() == 0)) { + groupPerms = pGroupPerms; + } + } + List<AclEntry> retList = new LinkedList<AclEntry>(); + for (Map.Entry<String, FsAction> groupPerm : groupPerms.entrySet()) { + AclEntry.Builder builder = new AclEntry.Builder(); + builder.setName(groupPerm.getKey()); + builder.setType(AclEntryType.GROUP); + builder.setScope(AclEntryScope.ACCESS); + FsAction action = groupPerm.getValue(); + FsAction pAction = pGroupPerms.get(groupPerm.getKey()); + if (pAction != null) { + action.or(pAction); + } + if ((action == FsAction.READ) || (action == FsAction.WRITE) + || (action == FsAction.READ_WRITE)) { + action = action.or(FsAction.EXECUTE); + } + builder.setPermission(action); + retList.add(builder.build()); + } + return retList; + } + + private void constructAclEntry(String role, FsAction permission, + Map<String, FsAction> groupPerms) { + RoleInfo roleInfo = roles.get(role); + if (roleInfo != null) { + for (String group : roleInfo.groups) { + FsAction fsAction = groupPerms.get(group); + if (fsAction == null) { + fsAction = FsAction.NONE; + } + groupPerms.put(group, fsAction.or(permission)); + } + } + } + + public PrivilegeInfo getPrivilegeInfo(String authzObj) { + return privileges.get(authzObj); + } + + Collection<PrivilegeInfo> getAllPrivileges() { + return privileges.values(); + } + + Collection<RoleInfo> getAllRoles() { + return roles.values(); + } + + public void delPrivilegeInfo(String authzObj) { + privileges.remove(authzObj); + } + + public void addPrivilegeInfo(PrivilegeInfo privilegeInfo) { + privileges.put(privilegeInfo.authzObj, privilegeInfo); + } + + public Set<String> getChildren(String authzObj) { + return authzObjChildren.get(authzObj); + } + + public RoleInfo getRoleInfo(String role) { + return roles.get(role); + } + + public void delRoleInfo(String role) { + roles.remove(role); + } + + public void addRoleInfo(RoleInfo roleInfo) { + roles.put(roleInfo.role, roleInfo); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java new file mode 100644 index 0000000..9540397 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.hdfs.SentryHDFSServiceClient.SentryAuthzUpdate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SentryUpdater { + + private SentryHDFSServiceClient sentryClient; + private final Configuration conf; + private final SentryAuthorizationInfo authzInfo; + + private static Logger LOG = LoggerFactory.getLogger(SentryUpdater.class); + + public SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) throws Exception { + this.conf = conf; + this.authzInfo = authzInfo; + } + + public SentryAuthzUpdate getUpdates() { + if (sentryClient == null) { + try { + sentryClient = new SentryHDFSServiceClient(conf); + } catch (Exception e) { + LOG.error("Error connecting to Sentry ['{}'] !!", + e.getMessage()); + sentryClient = null; + return null; + } + } + try { + SentryAuthzUpdate sentryUpdates = sentryClient.getAllUpdatesFrom( + authzInfo.getAuthzPermissions().getLastUpdatedSeqNum() + 1, + authzInfo.getAuthzPaths().getLastUpdatedSeqNum() + 1); + return sentryUpdates; + } catch (Exception e) { + sentryClient = null; + LOG.error("Error receiving updates from Sentry !!", e); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java new file mode 100644 index 0000000..e5af802 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; + +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.sentry.hdfs.SentryPermissions.PrivilegeInfo; +import org.apache.sentry.hdfs.SentryPermissions.RoleInfo; +import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; +import org.apache.sentry.hdfs.service.thrift.TRoleChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<PermissionsUpdate> { + private static final int MAX_UPDATES_PER_LOCK_USE = 99; + private volatile SentryPermissions perms = new SentryPermissions(); + private final AtomicLong seqNum = new AtomicLong(0); + + private static Logger LOG = LoggerFactory.getLogger(UpdateableAuthzPermissions.class); + + public static Map<String, FsAction> ACTION_MAPPING = new HashMap<String, FsAction>(); + static { + ACTION_MAPPING.put("ALL", FsAction.ALL); + ACTION_MAPPING.put("*", FsAction.ALL); + ACTION_MAPPING.put("SELECT", FsAction.READ_EXECUTE); + ACTION_MAPPING.put("select", FsAction.READ_EXECUTE); + ACTION_MAPPING.put("INSERT", FsAction.WRITE_EXECUTE); + ACTION_MAPPING.put("insert", FsAction.WRITE_EXECUTE); + } + + @Override + public List<AclEntry> getAcls(String authzObj) { + return perms.getAcls(authzObj); + } + + @Override + public UpdateableAuthzPermissions updateFull(PermissionsUpdate update) { + UpdateableAuthzPermissions other = new UpdateableAuthzPermissions(); + other.applyPartialUpdate(update); + other.seqNum.set(update.getSeqNum()); + return other; + } + + @Override + public void updatePartial(Iterable<PermissionsUpdate> updates, ReadWriteLock lock) { + lock.writeLock().lock(); + try { + int counter = 0; + for (PermissionsUpdate update : updates) { + applyPartialUpdate(update); + if (++counter > MAX_UPDATES_PER_LOCK_USE) { + counter = 0; + lock.writeLock().unlock(); + lock.writeLock().lock(); + } + seqNum.set(update.getSeqNum()); + LOG.debug("##### Updated perms seq Num [" + seqNum.get() + "]"); + } + } finally { + lock.writeLock().unlock(); + } + } + + + private void applyPartialUpdate(PermissionsUpdate update) { + applyPrivilegeUpdates(update); + applyRoleUpdates(update); + } + + private void applyRoleUpdates(PermissionsUpdate update) { + for (TRoleChanges rUpdate : update.getRoleUpdates()) { + if (rUpdate.getRole().equals(PermissionsUpdate.ALL_ROLES)) { + // Request to remove group from all roles + String groupToRemove = rUpdate.getDelGroups().iterator().next(); + for (RoleInfo rInfo : perms.getAllRoles()) { + rInfo.delGroup(groupToRemove); + } + } + RoleInfo rInfo = perms.getRoleInfo(rUpdate.getRole()); + for (String group : rUpdate.getAddGroups()) { + if (rInfo == null) { + rInfo = new RoleInfo(rUpdate.getRole()); + } + rInfo.addGroup(group); + } + if (rInfo != null) { + perms.addRoleInfo(rInfo); + for (String group : rUpdate.getDelGroups()) { + if (group.equals(PermissionsUpdate.ALL_GROUPS)) { + perms.delRoleInfo(rInfo.getRole()); + break; + } + // If there are no groups to remove, rUpdate.getDelGroups() will + // return empty list and this code will not be reached + rInfo.delGroup(group); + } + } + } + } + + private void applyPrivilegeUpdates(PermissionsUpdate update) { + for (TPrivilegeChanges pUpdate : update.getPrivilegeUpdates()) { + if (pUpdate.getAuthzObj().equals(PermissionsUpdate.RENAME_PRIVS)) { + String newAuthzObj = pUpdate.getAddPrivileges().keySet().iterator().next(); + String oldAuthzObj = pUpdate.getDelPrivileges().keySet().iterator().next(); + PrivilegeInfo privilegeInfo = perms.getPrivilegeInfo(oldAuthzObj); + Map<String, FsAction> allPermissions = privilegeInfo.getAllPermissions(); + perms.delPrivilegeInfo(oldAuthzObj); + perms.removeParentChildMappings(oldAuthzObj); + PrivilegeInfo newPrivilegeInfo = new PrivilegeInfo(newAuthzObj); + for (Map.Entry<String, FsAction> e : allPermissions.entrySet()) { + newPrivilegeInfo.setPermission(e.getKey(), e.getValue()); + } + perms.addPrivilegeInfo(newPrivilegeInfo); + perms.addParentChildMappings(newAuthzObj); + return; + } + if (pUpdate.getAuthzObj().equals(PermissionsUpdate.ALL_AUTHZ_OBJ)) { + // Request to remove role from all Privileges + String roleToRemove = pUpdate.getDelPrivileges().keySet().iterator() + .next(); + for (PrivilegeInfo pInfo : perms.getAllPrivileges()) { + pInfo.removePermission(roleToRemove); + } + } + PrivilegeInfo pInfo = perms.getPrivilegeInfo(pUpdate.getAuthzObj()); + for (Map.Entry<String, String> aMap : pUpdate.getAddPrivileges().entrySet()) { + if (pInfo == null) { + pInfo = new PrivilegeInfo(pUpdate.getAuthzObj()); + } + FsAction fsAction = pInfo.getPermission(aMap.getKey()); + if (fsAction == null) { + fsAction = getFAction(aMap.getValue()); + } else { + fsAction = fsAction.or(getFAction(aMap.getValue())); + } + pInfo.setPermission(aMap.getKey(), fsAction); + } + if (pInfo != null) { + perms.addPrivilegeInfo(pInfo); + perms.addParentChildMappings(pUpdate.getAuthzObj()); + for (Map.Entry<String, String> dMap : pUpdate.getDelPrivileges().entrySet()) { + if (dMap.getKey().equals(PermissionsUpdate.ALL_ROLES)) { + // Remove all privileges + perms.delPrivilegeInfo(pUpdate.getAuthzObj()); + perms.removeParentChildMappings(pUpdate.getAuthzObj()); + break; + } + List<PrivilegeInfo> parentAndChild = new LinkedList<PrivilegeInfo>(); + parentAndChild.add(pInfo); + Set<String> children = perms.getChildren(pInfo.getAuthzObj()); + if (children != null) { + for (String child : children) { + parentAndChild.add(perms.getPrivilegeInfo(child)); + } + } + // recursive revoke + for (PrivilegeInfo pInfo2 : parentAndChild) { + FsAction fsAction = pInfo2.getPermission(dMap.getKey()); + if (fsAction != null) { + fsAction = fsAction.and(getFAction(dMap.getValue()).not()); + if (FsAction.NONE == fsAction) { + pInfo2.removePermission(dMap.getKey()); + } else { + pInfo2.setPermission(dMap.getKey(), fsAction); + } + } + } + } + } + } + } + + static FsAction getFAction(String sentryPriv) { + String[] strPrivs = sentryPriv.trim().split(","); + FsAction retVal = FsAction.NONE; + for (String strPriv : strPrivs) { + retVal = retVal.or(ACTION_MAPPING.get(strPriv.toUpperCase())); + } + return retVal; + } + + @Override + public long getLastUpdatedSeqNum() { + return seqNum.get(); + } + + @Override + public PermissionsUpdate createFullImageUpdate(long currSeqNum) { + PermissionsUpdate retVal = new PermissionsUpdate(currSeqNum, true); + for (PrivilegeInfo pInfo : perms.getAllPrivileges()) { + TPrivilegeChanges pUpdate = retVal.addPrivilegeUpdate(pInfo.getAuthzObj()); + for (Map.Entry<String, FsAction> ent : pInfo.getAllPermissions().entrySet()) { + pUpdate.putToAddPrivileges(ent.getKey(), ent.getValue().SYMBOL); + } + } + for (RoleInfo rInfo : perms.getAllRoles()) { + TRoleChanges rUpdate = retVal.addRoleUpdate(rInfo.getRole()); + for (String group : rInfo.getAllGroups()) { + rUpdate.addToAddGroups(group); + } + } + return retVal; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java new file mode 100644 index 0000000..2085b52 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +public class MockSentryAuthorizationProvider extends + SentryAuthorizationProvider { + + public MockSentryAuthorizationProvider() { + super(new SentryAuthorizationInfoX()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java new file mode 100644 index 0000000..7a1539b --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.fs.permission.FsAction; + +public class SentryAuthorizationInfoX extends SentryAuthorizationInfo { + + public SentryAuthorizationInfoX() { + super(); + } + + @Override + public void run() { + + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public boolean isStale() { + return false; + } + + private static final String[] MANAGED = {"user", "authz"}; + private static final String[] AUTHZ_OBJ = {"user", "authz", "obj"}; + + private boolean hasPrefix(String[] prefix, String[] pathElement) { + int i = 0; + for (; i < prefix.length && i < pathElement.length; i ++) { + if (!prefix[i].equals(pathElement[i])) { + return false; + } + } + return (i == prefix.length); + } + + @Override + public boolean isManaged(String[] pathElements) { + return hasPrefix(MANAGED, pathElements); + } + + @Override + public boolean doesBelongToAuthzObject(String[] pathElements) { + return hasPrefix(AUTHZ_OBJ, pathElements); + } + + @Override + public List<AclEntry> getAclEntries(String[] pathElements) { + AclEntry acl = new AclEntry.Builder().setType(AclEntryType.USER). + setPermission(FsAction.ALL).setName("user-authz"). + setScope(AclEntryScope.ACCESS).build(); + return Arrays.asList(acl); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java new file mode 100644 index 0000000..b766a8f --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestSentryAuthorizationProvider { + private MiniDFSCluster miniDFS; + private UserGroupInformation admin; + + @Before + public void setUp() throws Exception { + admin = UserGroupInformation.createUserForTesting( + System.getProperty("user.name"), new String[] { "supergroup" }); + admin.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data"); + Configuration conf = new HdfsConfiguration(); + conf.setBoolean("sentry.authorization-provider.include-hdfs-authz-as-acl", true); + conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY, + MockSentryAuthorizationProvider.class.getName()); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + miniDFS = new MiniDFSCluster.Builder(conf).build(); + return null; + } + }); + } + + @After + public void cleanUp() throws IOException { + if (miniDFS != null) { + miniDFS.shutdown(); + } + } + + @Test + public void testProvider() throws Exception { + admin.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + String sysUser = UserGroupInformation.getCurrentUser().getShortUserName(); + FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0)); + + List<AclEntry> baseAclList = new ArrayList<AclEntry>(); + AclEntry.Builder builder = new AclEntry.Builder(); + baseAclList.add(builder.setType(AclEntryType.USER) + .setScope(AclEntryScope.ACCESS).build()); + baseAclList.add(builder.setType(AclEntryType.GROUP) + .setScope(AclEntryScope.ACCESS).build()); + baseAclList.add(builder.setType(AclEntryType.OTHER) + .setScope(AclEntryScope.ACCESS).build()); + Path path1 = new Path("/user/authz/obj/xxx"); + fs.mkdirs(path1); + fs.setAcl(path1, baseAclList); + + fs.mkdirs(new Path("/user/authz/xxx")); + fs.mkdirs(new Path("/user/xxx")); + + // root + Path path = new Path("/"); + Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner()); + Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup()); + Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission()); + Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty()); + + // dir before prefixes + path = new Path("/user"); + Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner()); + Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup()); + Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission()); + Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty()); + + // prefix dir + path = new Path("/user/authz"); + Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner()); + Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup()); + Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission()); + Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty()); + + // dir inside of prefix, no obj + path = new Path("/user/authz/xxx"); + FileStatus status = fs.getFileStatus(path); + Assert.assertEquals(sysUser, status.getOwner()); + Assert.assertEquals("supergroup", status.getGroup()); + Assert.assertEquals(new FsPermission((short) 0755), status.getPermission()); + Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty()); + + // dir inside of prefix, obj + path = new Path("/user/authz/obj"); + Assert.assertEquals("hive", fs.getFileStatus(path).getOwner()); + Assert.assertEquals("hive", fs.getFileStatus(path).getGroup()); + Assert.assertEquals(new FsPermission((short) 0770), fs.getFileStatus(path).getPermission()); + Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty()); + + List<AclEntry> acls = new ArrayList<AclEntry>(); + acls.add(new AclEntry.Builder().setName(sysUser).setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build()); + acls.add(new AclEntry.Builder().setName("supergroup").setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build()); + acls.add(new AclEntry.Builder().setName(null).setType(AclEntryType.OTHER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build()); + acls.add(new AclEntry.Builder().setName("user-authz").setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build()); + Assert.assertEquals(new LinkedHashSet<AclEntry>(acls), new LinkedHashSet<AclEntry>(fs.getAclStatus(path).getEntries())); + + // dir inside of prefix, inside of obj + path = new Path("/user/authz/obj/xxx"); + Assert.assertEquals("hive", fs.getFileStatus(path).getOwner()); + Assert.assertEquals("hive", fs.getFileStatus(path).getGroup()); + Assert.assertEquals(new FsPermission((short) 0770), fs.getFileStatus(path).getPermission()); + Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty()); + + Path path2 = new Path("/user/authz/obj/path2"); + fs.mkdirs(path2); + fs.setAcl(path2, baseAclList); + + // dir outside of prefix + path = new Path("/user/xxx"); + Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner()); + Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup()); + Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission()); + Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty()); + return null; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml new file mode 100644 index 0000000..511bfdd --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml @@ -0,0 +1,33 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + <property> + <name>sentry.hdfs-plugin.path-prefixes</name> + <value>/user/hive/dw</value> + </property> + <property> + <name>sentry.hdfs-plugin.sentry-uri</name> + <value>thrift://localhost:1234</value> + </property> + <property> + <name>sentry.hdfs-plugin.stale-threshold.ms</name> + <value>-1</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/.gitignore ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/.gitignore b/sentry-hdfs/sentry-hdfs-service/.gitignore new file mode 100644 index 0000000..91ad75b --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/.gitignore @@ -0,0 +1,18 @@ +*.class +target/ +.classpath +.project +.settings +.metadata +.idea/ +*.iml +derby.log +datanucleus.log +sentry-core/sentry-core-common/src/gen +**/TempStatsStore/ +# Package Files # +*.jar +*.war +*.ear +test-output/ +maven-repo/ http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/pom.xml b/sentry-hdfs/sentry-hdfs-service/pom.xml new file mode 100644 index 0000000..365380e --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/pom.xml @@ -0,0 +1,104 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-hdfs</artifactId> + <version>1.5.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>sentry-hdfs-service</artifactId> + <name>Sentry HDFS service</name> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + <dependency> + <groupId>org.apache.shiro</groupId> + <artifactId>shiro-core</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-hdfs-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-provider-db</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </dependency> + <dependency> + <groupId>ant-contrib</groupId> + <artifactId>ant-contrib</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + </dependency> + </dependencies> + + +</project> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java new file mode 100644 index 0000000..e7677f2 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link MetastoreClient} + * + */ +public class ExtendedMetastoreClient implements MetastoreClient { + + private static Logger LOG = LoggerFactory.getLogger(ExtendedMetastoreClient.class); + + private volatile HiveMetaStoreClient client; + private final HiveConf hiveConf; + public ExtendedMetastoreClient(HiveConf hiveConf) { + this.hiveConf = hiveConf; + } + + @Override + public List<Database> getAllDatabases() { + List<Database> retList = new ArrayList<Database>(); + HiveMetaStoreClient client = getClient(); + if (client != null) { + try { + for (String dbName : client.getAllDatabases()) { + retList.add(client.getDatabase(dbName)); + } + } catch (Exception e) { + LOG.error("Could not get All Databases !!", e); + } + } + return retList; + } + + @Override + public List<Table> getAllTablesOfDatabase(Database db) { + List<Table> retList = new ArrayList<Table>(); + HiveMetaStoreClient client = getClient(); + if (client != null) { + try { + for (String tblName : client.getAllTables(db.getName())) { + retList.add(client.getTable(db.getName(), tblName)); + } + } catch (Exception e) { + LOG.error(String.format( + "Could not get Tables for '%s' !!", db.getName()), e); + } + } + return retList; + } + + @Override + public List<Partition> listAllPartitions(Database db, Table tbl) { + HiveMetaStoreClient client = getClient(); + if (client != null) { + try { + return client.listPartitions(db.getName(), tbl.getTableName(), Short.MAX_VALUE); + } catch (Exception e) { + LOG.error(String.format( + "Could not get partitions for '%s'.'%s' !!", db.getName(), + tbl.getTableName()), e); + } + } + return new LinkedList<Partition>(); + } + + private HiveMetaStoreClient getClient() { + if (client == null) { + try { + client = new HiveMetaStoreClient(hiveConf); + return client; + } catch (MetaException e) { + client = null; + LOG.error("Could not create metastore client !!", e); + return null; + } + } else { + return client; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java new file mode 100644 index 0000000..9a81e3a --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.MetaStorePreEventListener; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; +import org.apache.sentry.hdfs.service.thrift.TPathChanges; +import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks + * into the sites in the {@link MetaStorePreEventListener} that deal with + * creation/updation and deletion for paths. + */ +public class MetastorePlugin extends SentryMetastoreListenerPlugin { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class); + + private final Configuration conf; + private SentryHDFSServiceClient sentryClient; + private UpdateableAuthzPaths authzPaths; + private Lock notificiationLock; + + //Initialized to some value > 1 so that the first update notification + // will trigger a full Image fetch + private final AtomicLong seqNum = new AtomicLong(5); + private volatile long lastSentSeqNum = -1; + private final ExecutorService threadPool; + + static class ProxyHMSHandler extends HMSHandler { + public ProxyHMSHandler(String name, HiveConf conf) throws MetaException { + super(name, conf); + } + @Override + public String startFunction(String function, String extraLogInfo) { + return function; + } + } + + public MetastorePlugin(Configuration conf) { + this.notificiationLock = new ReentrantLock(); + this.conf = new HiveConf((HiveConf)conf); + this.conf.unset(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname); + this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname); + this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname); + this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname); + try { + this.authzPaths = createInitialUpdate(new ProxyHMSHandler("sentry.hdfs", (HiveConf)this.conf)); + } catch (Exception e1) { + LOGGER.error("Could not create Initial AuthzPaths or HMSHandler !!", e1); + throw new RuntimeException(e1); + } + try { + sentryClient = new SentryHDFSServiceClient(conf); + } catch (Exception e) { + sentryClient = null; + LOGGER.error("Could not connect to Sentry HDFS Service !!", e); + } + ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1); + threadPool.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + notificiationLock.lock(); + try { + long lastSeenHMSPathSeqNum = + MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum(); + if (lastSeenHMSPathSeqNum != lastSentSeqNum) { + LOGGER.warn("Sentry not in sync with HMS [" + lastSeenHMSPathSeqNum + ", " + lastSentSeqNum + "]"); + PathsUpdate fullImageUpdate = + MetastorePlugin.this.authzPaths.createFullImageUpdate( + lastSentSeqNum); + LOGGER.warn("Sentry not in sync with HMS !!"); + notifySentryNoLock(fullImageUpdate, false); + } + } catch (Exception e) { + sentryClient = null; + LOGGER.error("Error talking to Sentry HDFS Service !!", e); + } finally { + notificiationLock.unlock(); + } + } + }, this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, + ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), 1000, + TimeUnit.MILLISECONDS); + this.threadPool = threadPool; + } + + private UpdateableAuthzPaths createInitialUpdate(IHMSHandler hmsHandler) throws Exception { + UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new String[] {"/"}); + PathsUpdate tempUpdate = new PathsUpdate(-1, false); + List<String> allDbStr = hmsHandler.get_all_databases(); + for (String dbName : allDbStr) { + Database db = hmsHandler.get_database(dbName); + tempUpdate.newPathChange(db.getName()).addToAddPaths( + PathsUpdate.cleanPath(db.getLocationUri())); + List<String> allTblStr = hmsHandler.get_all_tables(db.getName()); + for (String tblName : allTblStr) { + Table tbl = hmsHandler.get_table(db.getName(), tblName); + TPathChanges tblPathChange = tempUpdate.newPathChange(tbl + .getDbName() + "." + tbl.getTableName()); + List<Partition> tblParts = + hmsHandler.get_partitions(db.getName(), tbl.getTableName(), (short) -1); + tblPathChange.addToAddPaths(PathsUpdate.cleanPath(tbl.getSd() + .getLocation() == null ? db.getLocationUri() : tbl + .getSd().getLocation())); + for (Partition part : tblParts) { + tblPathChange.addToAddPaths(PathsUpdate.cleanPath(part.getSd() + .getLocation())); + } + } + } + authzPaths.updatePartial(Lists.newArrayList(tempUpdate), + new ReentrantReadWriteLock()); + return authzPaths; + } + + @Override + public void addPath(String authzObj, String path) { + LOGGER.debug("#### HMS Path Update [" + + "OP : addPath, " + + "authzObj : " + authzObj + ", " + + "path : " + path + "]"); + PathsUpdate update = createHMSUpdate(); + update.newPathChange(authzObj).addToAddPaths(PathsUpdate.cleanPath(path)); + notifySentry(update, true); + } + + @Override + public void removeAllPaths(String authzObj, List<String> childObjects) { + LOGGER.debug("#### HMS Path Update [" + + "OP : removeAllPaths, " + + "authzObj : " + authzObj + ", " + + "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]"); + PathsUpdate update = createHMSUpdate(); + if (childObjects != null) { + for (String childObj : childObjects) { + update.newPathChange(authzObj + "." + childObj).addToDelPaths( + Lists.newArrayList(PathsUpdate.ALL_PATHS)); + } + } + update.newPathChange(authzObj).addToDelPaths( + Lists.newArrayList(PathsUpdate.ALL_PATHS)); + notifySentry(update, true); + } + + @Override + public void removePath(String authzObj, String path) { + if ("*".equals(path)) { + removeAllPaths(authzObj, null); + } else { + LOGGER.debug("#### HMS Path Update [" + + "OP : removePath, " + + "authzObj : " + authzObj + ", " + + "path : " + path + "]"); + PathsUpdate update = createHMSUpdate(); + update.newPathChange(authzObj).addToDelPaths(PathsUpdate.cleanPath(path)); + notifySentry(update, true); + } + } + + @Override + public void renameAuthzObject(String oldName, String oldPath, String newName, + String newPath) { + PathsUpdate update = createHMSUpdate(); + LOGGER.debug("#### HMS Path Update [" + + "OP : renameAuthzObject, " + + "oldName : " + oldName + "," + + "newPath : " + oldPath + "," + + "newName : " + newName + "," + + "newPath : " + newPath + "]"); + update.newPathChange(newName).addToAddPaths(PathsUpdate.cleanPath(newPath)); + update.newPathChange(oldName).addToDelPaths(PathsUpdate.cleanPath(oldPath)); + notifySentry(update, true); + } + + private SentryHDFSServiceClient getClient() { + if (sentryClient == null) { + try { + sentryClient = new SentryHDFSServiceClient(conf); + } catch (IOException e) { + sentryClient = null; + LOGGER.error("Could not connect to Sentry HDFS Service !!", e); + } + } + return sentryClient; + } + + private PathsUpdate createHMSUpdate() { + PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false); + LOGGER.debug("#### HMS Path Update SeqNum : [" + seqNum.get() + "]"); + return update; + } + + private void notifySentryNoLock(PathsUpdate update, boolean applyLocal) { + if (applyLocal) { + authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock()); + } + try { + getClient().notifyHMSUpdate(update); + } catch (Exception e) { + LOGGER.error("Could not send update to Sentry HDFS Service !!", e); + } finally { + lastSentSeqNum = update.getSeqNum(); + LOGGER.debug("#### HMS Path Last update sent : [" + lastSentSeqNum + "]"); + } + } + + private void notifySentry(PathsUpdate update, boolean applyLocal) { + notificiationLock.lock(); + try { + notifySentryNoLock(update, applyLocal); + } finally { + notificiationLock.unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java new file mode 100644 index 0000000..cc849b9 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.hdfs; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; +import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse; +import org.apache.sentry.hdfs.service.thrift.TPathsUpdate; +import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { + + private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceProcessor.class); + + @Override + public TAuthzUpdateResponse get_all_authz_updates_from(long permSeqNum, long pathSeqNum) + throws TException { + TAuthzUpdateResponse retVal = new TAuthzUpdateResponse(); + retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>()); + retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>()); + if (SentryPlugin.instance != null) { + List<PermissionsUpdate> permUpdates = SentryPlugin.instance.getAllPermsUpdatesFrom(permSeqNum); + List<PathsUpdate> pathUpdates = SentryPlugin.instance.getAllPathsUpdatesFrom(pathSeqNum); + try { + for (PathsUpdate update : pathUpdates) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("### Sending PATH preUpdate seq [" + update.getSeqNum() + "] ###"); + LOGGER.debug("### Sending PATH preUpdate [" + update.toThrift() + "] ###"); + } + retVal.getAuthzPathUpdate().add(update.toThrift()); + } + for (PermissionsUpdate update : permUpdates) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("### Sending PERM preUpdate seq [" + update.getSeqNum() + "] ###"); + LOGGER.debug("### Sending PERM preUpdate [" + update.toThrift() + "] ###"); + } + retVal.getAuthzPermUpdate().add(update.toThrift()); + } + if (LOGGER.isDebugEnabled()) { + StringBuilder permSeq = new StringBuilder("<"); + for (PermissionsUpdate permUpdate : permUpdates) { + permSeq.append(permUpdate.getSeqNum()).append(","); + } + permSeq.append(">"); + StringBuilder pathSeq = new StringBuilder("<"); + for (PathsUpdate pathUpdate : pathUpdates) { + pathSeq.append(pathUpdate.getSeqNum()).append(","); + } + pathSeq.append(">"); + LOGGER.debug("#### Updates requested from HDFS [" + + "permReq=" + permSeqNum + ", permResp=" + permSeq + "] " + + "[pathReq=" + pathSeqNum + ", pathResp=" + pathSeq + "]"); + } + } catch (Exception e) { + LOGGER.error("Error Sending updates to downstream Cache", e); + throw new TException(e); + } + } else { + LOGGER.error("SentryPlugin not initialized yet !!"); + } + + return retVal; + } + + @Override + public void handle_hms_notification(TPathsUpdate update) throws TException { + try { + PathsUpdate hmsUpdate = new PathsUpdate(update); + if (SentryPlugin.instance != null) { + SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate); + LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "].."); + } else { + LOGGER.error("SentryPlugin not initialized yet !!"); + } + } catch (Exception e) { + LOGGER.error("Error handling notification from HMS", e); + throw new TException(e); + } + } + + @Override + public long check_hms_seq_num(long pathSeqNum) throws TException { + return SentryPlugin.instance.getLastSeenHMSPathSeqNum(); + } + + /** + * Not implemented for the time being.. + */ + @Override + public Map<String, List<String>> get_all_related_paths(String arg0, + boolean arg1) throws TException { + // TODO Auto-generated method stub + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java new file mode 100644 index 0000000..d35de75 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.hdfs; + +import java.net.Socket; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; +import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface; +import org.apache.sentry.provider.db.log.util.CommandUtil; +import org.apache.sentry.service.thrift.ProcessorFactory; +import org.apache.thrift.TException; +import org.apache.thrift.TMultiplexedProcessor; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SentryHDFSServiceProcessorFactory extends ProcessorFactory{ + + private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceProcessorFactory.class); + + static class ProcessorWrapper extends SentryHDFSService.Processor<SentryHDFSService.Iface> { + + public ProcessorWrapper(Iface iface) { + super(iface); + } + @Override + public boolean process(TProtocol in, TProtocol out) throws TException { + setIpAddress(in); + setImpersonator(in); + return super.process(in, out); + } + + private void setImpersonator(final TProtocol in) { + TTransport transport = in.getTransport(); + if (transport instanceof TSaslServerTransport) { + String impersonator = ((TSaslServerTransport) transport).getSaslServer().getAuthorizationID(); + CommandUtil.setImpersonator(impersonator); + } + } + + private void setIpAddress(final TProtocol in) { + TTransport transport = in.getTransport(); + TSocket tSocket = getUnderlyingSocketFromTransport(transport); + if (tSocket != null) { + setIpAddress(tSocket.getSocket()); + } else { + LOGGER.warn("Unknown Transport, cannot determine ipAddress"); + } + } + + private void setIpAddress(Socket socket) { + CommandUtil.setIpAddress(socket.getInetAddress().toString()); + } + + private TSocket getUnderlyingSocketFromTransport(TTransport transport) { + if (transport != null) { + if (transport instanceof TSaslServerTransport) { + transport = ((TSaslServerTransport) transport).getUnderlyingTransport(); + } else if (transport instanceof TSaslClientTransport) { + transport = ((TSaslClientTransport) transport).getUnderlyingTransport(); + } else { + if (!(transport instanceof TSocket)) { + LOGGER.warn("Transport class [" + transport.getClass().getName() + "] is not of type TSocket"); + return null; + } + } + return (TSocket) transport; + } + return null; + } + } + + public SentryHDFSServiceProcessorFactory(Configuration conf) { + super(conf); + } + + + public boolean register(TMultiplexedProcessor multiplexedProcessor) throws Exception { + SentryHDFSServiceProcessor sentryServiceHandler = + new SentryHDFSServiceProcessor(); + TProcessor processor = new ProcessorWrapper(sentryServiceHandler); + multiplexedProcessor.registerProcessor( + SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME, processor); + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java new file mode 100644 index 0000000..55b7697 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.hdfs; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; +import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever; +import org.apache.sentry.hdfs.service.thrift.TPathChanges; +import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate; +import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; +import org.apache.sentry.hdfs.service.thrift.TRoleChanges; +import org.apache.sentry.provider.db.SentryPolicyStorePlugin; +import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleRevokePrivilegeRequest; +import org.apache.sentry.provider.db.service.thrift.TDropPrivilegesRequest; +import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest; +import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; +import org.apache.sentry.provider.db.service.thrift.TSentryGroup; +import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +public class SentryPlugin implements SentryPolicyStorePlugin { + + private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class); + + public static volatile SentryPlugin instance; + + static class PermImageRetriever implements ExternalImageRetriever<PermissionsUpdate> { + + private final SentryStore sentryStore; + + public PermImageRetriever(SentryStore sentryStore) { + this.sentryStore = sentryStore; + } + + @Override + public PermissionsUpdate retrieveFullImage(long currSeqNum) { + Map<String, HashMap<String, String>> privilegeImage = sentryStore.retrieveFullPrivilegeImage(); + Map<String, LinkedList<String>> roleImage = sentryStore.retrieveFullRoleImage(); + + TPermissionsUpdate tPermUpdate = new TPermissionsUpdate(true, currSeqNum, + new HashMap<String, TPrivilegeChanges>(), + new HashMap<String, TRoleChanges>()); + for (Map.Entry<String, HashMap<String, String>> privEnt : privilegeImage.entrySet()) { + String authzObj = privEnt.getKey(); + HashMap<String,String> privs = privEnt.getValue(); + tPermUpdate.putToPrivilegeChanges(authzObj, new TPrivilegeChanges( + authzObj, privs, new HashMap<String, String>())); + } + for (Map.Entry<String, LinkedList<String>> privEnt : roleImage.entrySet()) { + String role = privEnt.getKey(); + LinkedList<String> groups = privEnt.getValue(); + tPermUpdate.putToRoleChanges(role, new TRoleChanges(role, groups, new LinkedList<String>())); + } + PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate); + permissionsUpdate.setSeqNum(currSeqNum); + return permissionsUpdate; + } + + } + + private UpdateForwarder<PathsUpdate> pathsUpdater; + private UpdateForwarder<PermissionsUpdate> permsUpdater; + private final AtomicLong permSeqNum = new AtomicLong(5); + private PermImageRetriever permImageRetriever; + + long getLastSeenHMSPathSeqNum() { + return pathsUpdater.getLastSeen(); + } + + @Override + public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException { + final String[] pathPrefixes = conf + .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES, + ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT); + final int initUpdateRetryDelayMs = + conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, + ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT); + pathsUpdater = new UpdateForwarder<PathsUpdate>(new UpdateableAuthzPaths( + pathPrefixes), null, 100, initUpdateRetryDelayMs); + permImageRetriever = new PermImageRetriever(sentryStore); + permsUpdater = new UpdateForwarder<PermissionsUpdate>( + new UpdateablePermissions(permImageRetriever), permImageRetriever, + 100, initUpdateRetryDelayMs); + LOGGER.info("Sentry HDFS plugin initialized !!"); + instance = this; + } + + public List<PathsUpdate> getAllPathsUpdatesFrom(long pathSeqNum) { + return pathsUpdater.getAllUpdatesFrom(pathSeqNum); + } + + public List<PermissionsUpdate> getAllPermsUpdatesFrom(long permSeqNum) { + return permsUpdater.getAllUpdatesFrom(permSeqNum); + } + + public void handlePathUpdateNotification(PathsUpdate update) { + pathsUpdater.handleUpdateNotification(update); + LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "].."); + } + + @Override + public void onAlterSentryRoleAddGroups( + TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException { + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName()); + for (TSentryGroup group : request.getGroups()) { + rUpdate.addToAddGroups(group.getGroupName()); + } + permsUpdater.handleUpdateNotification(update); + LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); + } + + @Override + public void onAlterSentryRoleDeleteGroups( + TAlterSentryRoleDeleteGroupsRequest request) + throws SentryPluginException { + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName()); + for (TSentryGroup group : request.getGroups()) { + rUpdate.addToDelGroups(group.getGroupName()); + } + permsUpdater.handleUpdateNotification(update); + LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); + } + + @Override + public void onAlterSentryRoleGrantPrivilege( + TAlterSentryRoleGrantPrivilegeRequest request) + throws SentryPluginException { + String authzObj = getAuthzObj(request.getPrivilege()); + if (authzObj != null) { + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + update.addPrivilegeUpdate(authzObj).putToAddPrivileges( + request.getRoleName(), request.getPrivilege().getAction().toUpperCase()); + permsUpdater.handleUpdateNotification(update); + LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "].."); + } + } + + @Override + public void onRenameSentryPrivilege(TRenamePrivilegesRequest request) + throws SentryPluginException { + String oldAuthz = getAuthzObj(request.getOldAuthorizable()); + String newAuthz = getAuthzObj(request.getNewAuthorizable()); + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); + privUpdate.putToAddPrivileges(newAuthz, newAuthz); + privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); + permsUpdater.handleUpdateNotification(update); + LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "].."); + } + + @Override + public void onAlterSentryRoleRevokePrivilege( + TAlterSentryRoleRevokePrivilegeRequest request) + throws SentryPluginException { + String authzObj = getAuthzObj(request.getPrivilege()); + if (authzObj != null) { + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + update.addPrivilegeUpdate(authzObj).putToDelPrivileges( + request.getRoleName(), request.getPrivilege().getAction().toUpperCase()); + permsUpdater.handleUpdateNotification(update); + LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "].."); + } + } + + @Override + public void onDropSentryRole(TDropSentryRoleRequest request) + throws SentryPluginException { + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + update.addPrivilegeUpdate(PermissionsUpdate.ALL_AUTHZ_OBJ).putToDelPrivileges( + request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ); + update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS); + permsUpdater.handleUpdateNotification(update); + LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); + } + + @Override + public void onDropSentryPrivilege(TDropPrivilegesRequest request) + throws SentryPluginException { + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + String authzObj = getAuthzObj(request.getAuthorizable()); + update.addPrivilegeUpdate(authzObj).putToDelPrivileges( + PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); + permsUpdater.handleUpdateNotification(update); + LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "].."); + } + + private String getAuthzObj(TSentryPrivilege privilege) { + String authzObj = null; + if (!SentryStore.isNULL(privilege.getDbName())) { + String dbName = privilege.getDbName(); + String tblName = privilege.getTableName(); + if (SentryStore.isNULL(tblName)) { + authzObj = dbName; + } else { + authzObj = dbName + "." + tblName; + } + } + return authzObj; + } + + private String getAuthzObj(TSentryAuthorizable authzble) { + String authzObj = null; + if (!SentryStore.isNULL(authzble.getDb())) { + String dbName = authzble.getDb(); + String tblName = authzble.getTable(); + if (SentryStore.isNULL(tblName)) { + authzObj = dbName; + } else { + authzObj = dbName + "." + tblName; + } + } + return authzObj; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java new file mode 100644 index 0000000..f321d3d --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class UpdateForwarder<K extends Updateable.Update> implements + Updateable<K> { + + public static interface ExternalImageRetriever<K> { + + public K retrieveFullImage(long currSeqNum); + + } + + private final AtomicLong lastSeenSeqNum = new AtomicLong(0); + private final AtomicLong lastCommittedSeqNum = new AtomicLong(0); + // Updates should be handled in order + private final Executor updateHandler = Executors.newSingleThreadExecutor(); + + // Update log is used when propagate updates to a downstream cache. + // The preUpdate log stores all commits that were applied to this cache. + // When the update log is filled to capacity (updateLogSize), all + // entries are cleared and a compact image if the state of the cache is + // appended to the log. + // The first entry in an update log (consequently the first preUpdate a + // downstream cache sees) will be a full image. All subsequent entries are + // partial edits + private final LinkedList<K> updateLog = new LinkedList<K>(); + // UpdateLog is disabled when updateLogSize = 0; + private final int updateLogSize; + + private final ExternalImageRetriever<K> imageRetreiver; + + private volatile Updateable<K> updateable; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private static final long INIT_SEQ_NUM = -2; + + private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class); + + public UpdateForwarder(Updateable<K> updateable, + ExternalImageRetriever<K> imageRetreiver, int updateLogSize) { + this(updateable, imageRetreiver, updateLogSize, 5000); + } + public UpdateForwarder(Updateable<K> updateable, + ExternalImageRetriever<K> imageRetreiver, int updateLogSize, + int initUpdateRetryDelay) { + this.updateLogSize = updateLogSize; + this.imageRetreiver = imageRetreiver; + if (imageRetreiver != null) { + spawnInitialUpdater(updateable, initUpdateRetryDelay); + } else { + this.updateable = updateable; + } + } + + private void spawnInitialUpdater(final Updateable<K> updateable, + final int initUpdateRetryDelay) { + K firstFullImage = null; + try { + firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM); + } catch (Exception e) { + LOGGER.warn("InitialUpdater encountered exception !! ", e); + firstFullImage = null; + Thread initUpdater = new Thread() { + @Override + public void run() { + while (UpdateForwarder.this.updateable == null) { + try { + Thread.sleep(initUpdateRetryDelay); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted !! ", e); + break; + } + K fullImage = null; + try { + fullImage = + UpdateForwarder.this.imageRetreiver + .retrieveFullImage(INIT_SEQ_NUM); + appendToUpdateLog(fullImage); + } catch (Exception e) { + LOGGER.warn("InitialUpdater encountered exception !! ", e); + } + if (fullImage != null) { + UpdateForwarder.this.updateable = updateable.updateFull(fullImage); + } + } + } + }; + initUpdater.start(); + } + if (firstFullImage != null) { + appendToUpdateLog(firstFullImage); + this.updateable = updateable.updateFull(firstFullImage); + } + } + /** + * Handle notifications from HMS plug-in or upstream Cache + * @param update + */ + public void handleUpdateNotification(final K update) { + // Correct the seqNums on the first update + if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) { + K firstUpdate = updateLog.peek(); + long firstSeqNum = update.getSeqNum() - 1; + if (firstUpdate != null) { + firstUpdate.setSeqNum(firstSeqNum); + } + lastCommittedSeqNum.set(firstSeqNum); + lastSeenSeqNum.set(firstSeqNum); + } + final boolean editNotMissed = + lastSeenSeqNum.incrementAndGet() == update.getSeqNum(); + if (!editNotMissed) { + lastSeenSeqNum.set(update.getSeqNum()); + } + Runnable task = new Runnable() { + @Override + public void run() { + K toUpdate = update; + if (update.hasFullImage()) { + updateable = updateable.updateFull(update); + } else { + if (editNotMissed) { + // apply partial preUpdate + updateable.updatePartial(Lists.newArrayList(update), lock); + } else { + // Retrieve full update from External Source and + if (imageRetreiver != null) { + toUpdate = imageRetreiver + .retrieveFullImage(update.getSeqNum()); + updateable = updateable.updateFull(toUpdate); + } + } + } + appendToUpdateLog(toUpdate); + } + }; + updateHandler.execute(task); + } + + private void appendToUpdateLog(K update) { + synchronized (updateLog) { + boolean logCompacted = false; + if (updateLogSize > 0) { + if (update.hasFullImage() || (updateLog.size() == updateLogSize)) { + // Essentially a log compaction + updateLog.clear(); + updateLog.add(update.hasFullImage() ? update + : createFullImageUpdate(update.getSeqNum())); + logCompacted = true; + } else { + updateLog.add(update); + } + } + lastCommittedSeqNum.set(update.getSeqNum()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### Appending to Update Log [" + + "type=" + update.getClass() + ", " + + "lastCommit=" + lastCommittedSeqNum.get() + ", " + + "lastSeen=" + lastSeenSeqNum.get() + ", " + + "logCompacted=" + logCompacted + "]"); + } + } + } + + /** + * Return all updates from requested seqNum (inclusive) + * @param seqNum + * @return + */ + public List<K> getAllUpdatesFrom(long seqNum) { + List<K> retVal = new LinkedList<K>(); + synchronized (updateLog) { + long currSeqNum = lastCommittedSeqNum.get(); + if (LOGGER.isDebugEnabled() && (updateable != null)) { + LOGGER.debug("#### GetAllUpdatesFrom [" + + "type=" + updateable.getClass() + ", " + + "reqSeqNum=" + seqNum + ", " + + "lastCommit=" + lastCommittedSeqNum.get() + ", " + + "lastSeen=" + lastSeenSeqNum.get() + ", " + + "updateLogSize=" + updateLog.size() + "]"); + } + if (updateLogSize == 0) { + // no updatelog configured.. + return retVal; + } + K head = updateLog.peek(); + if (head == null) { + return retVal; + } + if (seqNum > currSeqNum + 1) { + // This process has probably restarted since downstream + // recieved last update + retVal.addAll(updateLog); + return retVal; + } + if (head.getSeqNum() > seqNum) { + // Caller has diverged greatly.. + if (head.hasFullImage()) { + // head is a refresh(full) image + // Send full image along with partial updates + for (K u : updateLog) { + retVal.add(u); + } + } else { + // Create a full image + // clear updateLog + // add fullImage to head of Log + // NOTE : This should ideally never happen + K fullImage = createFullImageUpdate(currSeqNum); + updateLog.clear(); + updateLog.add(fullImage); + retVal.add(fullImage); + } + } else { + // increment iterator to requested seqNum + Iterator<K> iter = updateLog.iterator(); + while (iter.hasNext()) { + K elem = iter.next(); + if (elem.getSeqNum() >= seqNum) { + retVal.add(elem); + } + } + } + } + return retVal; + } + + public boolean areAllUpdatesCommited() { + return lastCommittedSeqNum.get() == lastSeenSeqNum.get(); + } + + public long getLastCommitted() { + return lastCommittedSeqNum.get(); + } + + public long getLastSeen() { + return lastSeenSeqNum.get(); + } + + @Override + public Updateable<K> updateFull(K update) { + return (updateable != null) ? updateable.updateFull(update) : null; + } + + @Override + public void updatePartial(Iterable<K> updates, ReadWriteLock lock) { + if (updateable != null) { + updateable.updatePartial(updates, lock); + } + } + + @Override + public long getLastUpdatedSeqNum() { + return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM; + } + + @Override + public K createFullImageUpdate(long currSeqNum) { + return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null; + } + +}
