http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java new file mode 100644 index 0000000..6b3e2e2 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; + +import org.apache.sentry.hdfs.PermissionsUpdate; +import org.apache.sentry.hdfs.Updateable; +import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever; + +public class UpdateablePermissions implements Updateable<PermissionsUpdate>{ + + private AtomicLong seqNum = new AtomicLong(); + private final ExternalImageRetriever<PermissionsUpdate> imageRetreiver; + + public UpdateablePermissions( + ExternalImageRetriever<PermissionsUpdate> imageRetreiver) { + this.imageRetreiver = imageRetreiver; + } + + @Override + public PermissionsUpdate createFullImageUpdate(long currSeqNum) { + return imageRetreiver.retrieveFullImage(currSeqNum); + } + + @Override + public long getLastUpdatedSeqNum() { + return seqNum.get(); + } + + @Override + public void updatePartial(Iterable<PermissionsUpdate> update, + ReadWriteLock lock) { + for (PermissionsUpdate permsUpdate : update) { + seqNum.set(permsUpdate.getSeqNum()); + } + } + + @Override + public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) { + UpdateablePermissions other = new UpdateablePermissions(imageRetreiver); + other.seqNum.set(update.getSeqNum()); + return other; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java new file mode 100644 index 0000000..0c55bb1 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.hdfs; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; + +import junit.framework.Assert; + +import org.apache.sentry.hdfs.UpdateForwarder; +import org.apache.sentry.hdfs.Updateable; +import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever; +import org.apache.sentry.hdfs.Updateable.Update; +import org.junit.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +public class TestUpdateForwarder { + + static class DummyUpdate implements Update { + private long seqNum = 0; + private boolean hasFullUpdate = false; + private String state; + public DummyUpdate(long seqNum, boolean hasFullUpdate) { + this.seqNum = seqNum; + this.hasFullUpdate = hasFullUpdate; + } + public String getState() { + return state; + } + public DummyUpdate setState(String stuff) { + this.state = stuff; + return this; + } + @Override + public boolean hasFullImage() { + return hasFullUpdate; + } + @Override + public long getSeqNum() { + return seqNum; + } + @Override + public void setSeqNum(long seqNum) { + this.seqNum = seqNum; + } + } + + static class DummyUpdatable implements Updateable<DummyUpdate> { + + private List<String> state = new LinkedList<String>(); + private long lastUpdatedSeqNum = 0; + + @Override + public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) { + for (DummyUpdate u : update) { + state.add(u.getState()); + lastUpdatedSeqNum = u.seqNum; + } + } + + @Override + public Updateable<DummyUpdate> updateFull(DummyUpdate update) { + DummyUpdatable retVal = new DummyUpdatable(); + retVal.lastUpdatedSeqNum = update.seqNum; + retVal.state = Lists.newArrayList(update.state.split(",")); + return retVal; + } + + @Override + public long getLastUpdatedSeqNum() { + return lastUpdatedSeqNum; + } + + @Override + public DummyUpdate createFullImageUpdate(long currSeqNum) { + DummyUpdate retVal = new DummyUpdate(currSeqNum, true); + retVal.state = Joiner.on(",").join(state); + return retVal; + } + + public String getState() { + return Joiner.on(",").join(state); + } + } + + static class DummyImageRetreiver implements ExternalImageRetriever<DummyUpdate> { + + private String state; + public void setState(String state) { + this.state = state; + } + @Override + public DummyUpdate retrieveFullImage(long currSeqNum) { + DummyUpdate retVal = new DummyUpdate(currSeqNum, true); + retVal.state = state; + return retVal; + } + } + + @Test + public void testInit() { + DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); + imageRetreiver.setState("a,b,c"); + UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( + new DummyUpdatable(), imageRetreiver, 10); + Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum()); + List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertTrue(allUpdates.size() == 1); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); + + // If the current process has restarted the input seqNum will be > currSeq + allUpdates = updateForwarder.getAllUpdatesFrom(100); + Assert.assertTrue(allUpdates.size() == 1); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); + Assert.assertEquals(-2, allUpdates.get(0).getSeqNum()); + allUpdates = updateForwarder.getAllUpdatesFrom(-1); + Assert.assertEquals(0, allUpdates.size()); + } + + @Test + public void testUpdateReceive() throws Exception { + DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); + imageRetreiver.setState("a,b,c"); + UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( + new DummyUpdatable(), imageRetreiver, 5); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum()); + List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(2, allUpdates.size()); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); + Assert.assertEquals("d", allUpdates.get(1).getState()); + } + + // This happens when we the first update from HMS is a -1 (If the heartbeat + // thread checks Sentry's current seqNum before any update has come in).. + // This will lead the first and second entries in the updatelog to differ + // by more than +1.. + @Test + public void testUpdateReceiveWithNullImageRetriver() throws Exception { + UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( + new DummyUpdatable(), null, 5); + updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1); + Assert.assertEquals("a", allUpdates.get(0).getState()); + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); + allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(2, allUpdates.size()); + Assert.assertEquals("b", allUpdates.get(0).getState()); + Assert.assertEquals("c", allUpdates.get(1).getState()); + } + + @Test + public void testGetUpdates() throws Exception { + DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); + imageRetreiver.setState("a,b,c"); + UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( + new DummyUpdatable(), imageRetreiver, 5); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum()); + List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(2, allUpdates.size()); + + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); + + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); + allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(4, allUpdates.size()); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); + Assert.assertEquals(4, allUpdates.get(0).getSeqNum()); + Assert.assertEquals("d", allUpdates.get(1).getState()); + Assert.assertEquals(5, allUpdates.get(1).getSeqNum()); + Assert.assertEquals("e", allUpdates.get(2).getState()); + Assert.assertEquals(6, allUpdates.get(2).getSeqNum()); + Assert.assertEquals("f", allUpdates.get(3).getState()); + Assert.assertEquals(7, allUpdates.get(3).getSeqNum()); + + updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum()); + allUpdates = updateForwarder.getAllUpdatesFrom(8); + Assert.assertEquals(1, allUpdates.size()); + Assert.assertEquals("g", allUpdates.get(0).getState()); + } + + @Test + public void testGetUpdatesAfterExternalEntityReset() throws Exception { + DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); + imageRetreiver.setState("a,b,c"); + UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( + new DummyUpdatable(), imageRetreiver, 5); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); + + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); + List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(4, allUpdates.size()); + Assert.assertEquals("f", allUpdates.get(3).getState()); + Assert.assertEquals(7, allUpdates.get(3).getSeqNum()); + + updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum()); + allUpdates = updateForwarder.getAllUpdatesFrom(8); + Assert.assertEquals(1, allUpdates.size()); + Assert.assertEquals("g", allUpdates.get(0).getState()); + + imageRetreiver.setState("a,b,c,d,e,f,g,h"); + + // New update comes with SeqNum = 1 + updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + // NN plugin asks for next update + allUpdates = updateForwarder.getAllUpdatesFrom(9); + Assert.assertEquals(1, allUpdates.size()); + Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState()); + Assert.assertEquals(1, allUpdates.get(0).getSeqNum()); + } + + @Test + public void testUpdateLogCompression() throws Exception { + DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); + imageRetreiver.setState("a,b,c"); + UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( + new DummyUpdatable(), imageRetreiver, 5); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum()); + List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(2, allUpdates.size()); + + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); + updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); + updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h")); + updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i")); + updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j")); + + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum()); + allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(3, allUpdates.size()); + Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState()); + Assert.assertEquals(9, allUpdates.get(0).getSeqNum()); + Assert.assertEquals("i", allUpdates.get(1).getState()); + Assert.assertEquals(10, allUpdates.get(1).getSeqNum()); + Assert.assertEquals("j", allUpdates.get(2).getState()); + Assert.assertEquals(11, allUpdates.get(2).getSeqNum()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/pom.xml b/sentry-provider/sentry-provider-db/pom.xml index fbf831a..e2f035f 100644 --- a/sentry-provider/sentry-provider-db/pom.xml +++ b/sentry-provider/sentry-provider-db/pom.xml @@ -42,6 +42,11 @@ limitations under the License. <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <version>2.5.0</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> @@ -89,6 +94,11 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>0.13.1-cdh5.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-shims</artifactId> <scope>provided</scope> </dependency> @@ -163,6 +173,11 @@ limitations under the License. <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + </dependency> </dependencies> <build> @@ -214,68 +229,5 @@ limitations under the License. </plugin> </plugins> </build> - <profiles> - <profile> - <id>thriftif</id> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <id>generate-thrift-sources</id> - <phase>generate-sources</phase> - <configuration> - <target> - <taskdef name="for" classname="net.sf.antcontrib.logic.ForTask" - classpathref="maven.plugin.classpath" /> - <property name="thrift.args" value="-I ${thrift.home} --gen java:beans,hashcode"/> - <property name="thrift.gen.dir" value="${basedir}/src/gen/thrift"/> - <delete dir="${thrift.gen.dir}"/> - <mkdir dir="${thrift.gen.dir}"/> - <for param="thrift.file"> - <path> - <fileset dir="${basedir}/src/main/resources/" includes="**/*.thrift" /> - </path> - <sequential> - <echo message="Generating Thrift code for @{thrift.file}"/> - <exec executable="${thrift.home}/bin/thrift" failonerror="true" dir="."> - <arg line="${thrift.args} -I ${basedir}/src/main/resources/ -o ${thrift.gen.dir} @{thrift.file} " /> - </exec> - </sequential> - </for> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-enforcer-plugin</artifactId> - <executions> - <execution> - <id>enforce-property</id> - <goals> - <goal>enforce</goal> - </goals> - <configuration> - <rules> - <requireProperty> - <property>thrift.home</property> - </requireProperty> - </rules> - <fail>true</fail> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java new file mode 100644 index 0000000..79cf4a4 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.provider.db; + +import java.util.LinkedList; +import java.util.List; + +/** + * Plugin interface providing hooks to implementing classes, which are invoked + * on path creation/updation and deletion + */ +public abstract class SentryMetastoreListenerPlugin { + + private static List<SentryMetastoreListenerPlugin> registry = new LinkedList<SentryMetastoreListenerPlugin>(); + + public static void addToRegistry(SentryMetastoreListenerPlugin plugin) { + registry.add(plugin); + } + + public static List<SentryMetastoreListenerPlugin> getPlugins() { + return registry; + } + + public abstract void renameAuthzObject(String oldName, String oldPath, + String newName, String newPath); + + public abstract void addPath(String authzObj, String path); + + public abstract void removePath(String authzObj, String path); + + public abstract void removeAllPaths(String authzObj, List<String> childObjects); + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java new file mode 100644 index 0000000..998a48b --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.SentryUserException; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest; +import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleRevokePrivilegeRequest; +import org.apache.sentry.provider.db.service.thrift.TDropPrivilegesRequest; +import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest; +import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest; + +public interface SentryPolicyStorePlugin { + + @SuppressWarnings("serial") + public static class SentryPluginException extends SentryUserException { + public SentryPluginException(String msg) { + super(msg); + } + public SentryPluginException(String msg, Throwable t) { + super(msg, t); + } + } + + public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException; + + public void onAlterSentryRoleAddGroups(TAlterSentryRoleAddGroupsRequest tRequest) throws SentryPluginException; + + public void onAlterSentryRoleDeleteGroups(TAlterSentryRoleDeleteGroupsRequest tRequest) throws SentryPluginException; + + public void onAlterSentryRoleGrantPrivilege(TAlterSentryRoleGrantPrivilegeRequest tRequest) throws SentryPluginException; + + public void onAlterSentryRoleRevokePrivilege(TAlterSentryRoleRevokePrivilegeRequest tRequest) throws SentryPluginException; + + public void onDropSentryRole(TDropSentryRoleRequest tRequest) throws SentryPluginException; + + public void onRenameSentryPrivilege(TRenamePrivilegesRequest request) throws SentryPluginException; + + public void onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException; + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java index b66037a..5f34b4c 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java @@ -39,9 +39,10 @@ public class SimpleDBProviderBackend implements ProviderBackend { private static final Logger LOGGER = LoggerFactory .getLogger(SimpleDBProviderBackend.class); - private final SentryPolicyServiceClient policyServiceClient; + private SentryPolicyServiceClient policyServiceClient; private volatile boolean initialized; + private Configuration conf; public SimpleDBProviderBackend(Configuration conf, String resourcePath) throws IOException { // DB Provider doesn't use policy file path @@ -50,6 +51,8 @@ public class SimpleDBProviderBackend implements ProviderBackend { public SimpleDBProviderBackend(Configuration conf) throws IOException { this(new SentryPolicyServiceClient(conf)); + this.initialized = false; + this.conf = conf; } @VisibleForTesting @@ -74,14 +77,28 @@ public class SimpleDBProviderBackend implements ProviderBackend { */ @Override public ImmutableSet<String> getPrivileges(Set<String> groups, ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) { + return getPrivileges(1, groups, roleSet, authorizableHierarchy); + } + + private ImmutableSet<String> getPrivileges(int retryCount, Set<String> groups, ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) { if (!initialized) { throw new IllegalStateException("Backend has not been properly initialized"); } try { - return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, roleSet, authorizableHierarchy)); - } catch (SentryUserException e) { - String msg = "Unable to obtain privileges from server: " + e.getMessage(); - LOGGER.error(msg, e); + return ImmutableSet.copyOf(getSentryClient().listPrivilegesForProvider(groups, roleSet, authorizableHierarchy)); + } catch (Exception e) { + policyServiceClient = null; + if (retryCount > 0) { + return getPrivileges(retryCount - 1, groups, roleSet, authorizableHierarchy); + } else { + String msg = "Unable to obtain privileges from server: " + e.getMessage(); + LOGGER.error(msg, e); + try { + policyServiceClient.close(); + } catch (Exception ex2) { + // Ignore + } + } } return ImmutableSet.of(); } @@ -101,6 +118,19 @@ public class SimpleDBProviderBackend implements ProviderBackend { } } + private SentryPolicyServiceClient getSentryClient() { + if (policyServiceClient == null) { + try { + policyServiceClient = new SentryPolicyServiceClient(conf); + } catch (Exception e) { + LOGGER.error("Error connecting to Sentry ['{}'] !!", + e.getMessage()); + policyServiceClient = null; + return null; + } + } + return policyServiceClient; + } /** * SimpleDBProviderBackend does not implement validatePolicy() */ http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index f6699d2..743900b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -24,6 +24,7 @@ import static org.apache.sentry.provider.common.ProviderConstants.KV_JOINER; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -86,6 +87,7 @@ public class SentryStore { public static String NULL_COL = "__NULL__"; static final String DEFAULT_DATA_DIR = "sentry_policy_db"; + /** * Commit order sequence id. This is used by notification handlers * to know the order in which events where committed to the database. @@ -762,7 +764,6 @@ public class SentryStore { } } - List<MSentryPrivilege> getMSentryPrivileges(Set<String> roleNames, TSentryAuthorizable authHierarchy) { if ((roleNames.size() == 0)||(roleNames == null)) return new ArrayList<MSentryPrivilege>(); boolean rollbackTransaction = true; @@ -1506,4 +1507,82 @@ public class SentryStore { return Sets.newHashSet(conf.getStrings( ServerConfig.ADMIN_GROUPS, new String[]{})); } + + /** + * This returns a Mapping of AuthZObj(db/table) -> (Role -> permission) + */ + public Map<String, HashMap<String, String>> retrieveFullPrivilegeImage() { + Map<String, HashMap<String, String>> retVal = new HashMap<String, HashMap<String,String>>(); + boolean rollbackTransaction = true; + PersistenceManager pm = null; + try { + pm = openTransaction(); + Query query = pm.newQuery(MSentryPrivilege.class); + String filters = "(serverName != \"__NULL__\") " + + "&& (dbName != \"__NULL__\") " + "&& (URI == \"__NULL__\")"; + query.setFilter(filters.toString()); + query + .setOrdering("serverName ascending, dbName ascending, tableName ascending"); + List<MSentryPrivilege> privileges = (List<MSentryPrivilege>) query + .execute(); + rollbackTransaction = false; + for (MSentryPrivilege mPriv : privileges) { + String authzObj = mPriv.getDbName(); + if (!isNULL(mPriv.getTableName())) { + authzObj = authzObj + "." + mPriv.getTableName(); + } + HashMap<String, String> pUpdate = retVal.get(authzObj); + if (pUpdate == null) { + pUpdate = new HashMap<String, String>(); + retVal.put(authzObj, pUpdate); + } + for (MSentryRole mRole : mPriv.getRoles()) { + String existingPriv = pUpdate.get(mRole.getRoleName()); + if (existingPriv == null) { + pUpdate.put(mRole.getRoleName(), mPriv.getAction().toUpperCase()); + } else { + pUpdate.put(mRole.getRoleName(), existingPriv + "," + + mPriv.getAction().toUpperCase()); + } + } + } + commitTransaction(pm); + return retVal; + } finally { + if (rollbackTransaction) { + rollbackTransaction(pm); + } + } + } + + /** + * This returns a Mapping of Role -> [Groups] + */ + public Map<String, LinkedList<String>> retrieveFullRoleImage() { + Map<String, LinkedList<String>> retVal = new HashMap<String, LinkedList<String>>(); + boolean rollbackTransaction = true; + PersistenceManager pm = null; + try { + pm = openTransaction(); + Query query = pm.newQuery(MSentryGroup.class); + List<MSentryGroup> groups = (List<MSentryGroup>) query.execute(); + for (MSentryGroup mGroup : groups) { + for (MSentryRole role : mGroup.getRoles()) { + LinkedList<String> rUpdate = retVal.get(role.getRoleName()); + if (rUpdate == null) { + rUpdate = new LinkedList<String>(); + retVal.put(role.getRoleName(), rUpdate); + } + rUpdate.add(mGroup.getGroupName()); + } + } + commitTransaction(pm); + return retVal; + } finally { + if (rollbackTransaction) { + rollbackTransaction(pm); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index b20e71e..4774b90 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -18,16 +18,26 @@ package org.apache.sentry.provider.db.service.thrift; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.sentry.SentryUserException; import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.provider.common.GroupMappingService; @@ -35,12 +45,16 @@ import org.apache.sentry.provider.db.SentryAccessDeniedException; import org.apache.sentry.provider.db.SentryAlreadyExistsException; import org.apache.sentry.provider.db.SentryInvalidInputException; import org.apache.sentry.provider.db.SentryNoSuchObjectException; +import org.apache.sentry.provider.db.SentryPolicyStorePlugin; +import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException; import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory; import org.apache.sentry.provider.db.log.util.Constants; import org.apache.sentry.provider.db.service.persistent.CommitContext; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig; +import org.apache.sentry.service.thrift.ServiceConstants.ConfUtilties; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; +import org.apache.sentry.service.thrift.ProcessorFactory; import org.apache.sentry.service.thrift.Status; import org.apache.sentry.service.thrift.TSentryResponseStatus; import org.apache.thrift.TException; @@ -62,6 +76,8 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { public static final String SENTRY_POLICY_SERVICE_NAME = "SentryPolicyService"; + public static volatile SentryPolicyStoreProcessor instance; + private final String name; private final Configuration conf; private final SentryStore sentryStore; @@ -70,6 +86,8 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { private boolean isReady; SentryMetrics sentryMetrics; + private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>(); + public SentryPolicyStoreProcessor(String name, Configuration conf) throws Exception { super(); this.name = name; @@ -81,6 +99,23 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { isReady = true; adminGroups = ImmutableSet.copyOf(toTrimedLower(Sets.newHashSet(conf.getStrings( ServerConfig.ADMIN_GROUPS, new String[]{})))); + Iterable<String> pluginClasses = ConfUtilties.CLASS_SPLITTER + .split(conf.get(ServerConfig.SENTRY_POLICY_STORE_PLUGINS, + ServerConfig.SENTRY_POLICY_STORE_PLUGINS_DEFAULT).trim()); + for (String pluginClassStr : pluginClasses) { + Class<?> clazz = conf.getClassByName(pluginClassStr); + if (!SentryPolicyStorePlugin.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("Sentry Plugin [" + + pluginClassStr + "] is not a " + + SentryPolicyStorePlugin.class.getName()); + } + SentryPolicyStorePlugin plugin = (SentryPolicyStorePlugin)clazz.newInstance(); + plugin.initialize(conf, sentryStore); + sentryPlugins.add(plugin); + } + if (instance == null) { + instance = this; + } initMetrics(); } @@ -108,6 +143,11 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { } } + public void registerPlugin(SentryPolicyStorePlugin plugin) throws SentryPluginException { + plugin.initialize(conf, sentryStore); + sentryPlugins.add(plugin); + } + @VisibleForTesting static List<NotificationHandler> createHandlers(Configuration conf) throws SentryConfigurationException { @@ -211,6 +251,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { response.setPrivilege(request.getPrivilege()); notificationHandlerInvoker.alter_sentry_role_grant_privilege(commitContext, request, response); + for (SentryPolicyStorePlugin plugin : sentryPlugins) { + plugin.onAlterSentryRoleGrantPrivilege(request); + } } catch (SentryNoSuchObjectException e) { String msg = "Role: " + request.getRoleName() + " doesn't exist."; LOGGER.error(msg, e); @@ -246,12 +289,15 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { response.setStatus(Status.OK()); notificationHandlerInvoker.alter_sentry_role_revoke_privilege(commitContext, request, response); + for (SentryPolicyStorePlugin plugin : sentryPlugins) { + plugin.onAlterSentryRoleRevokePrivilege(request); + } } catch (SentryNoSuchObjectException e) { String msg = "Privilege: [server=" + request.getPrivilege().getServerName() + - ",db=" + request.getPrivilege().getDbName() + - ",table=" + request.getPrivilege().getTableName() + - ",URI=" + request.getPrivilege().getURI() + - ",action=" + request.getPrivilege().getAction() + "] doesn't exist."; + ",db=" + request.getPrivilege().getDbName() + + ",table=" + request.getPrivilege().getTableName() + + ",URI=" + request.getPrivilege().getURI() + + ",action=" + request.getPrivilege().getAction() + "] doesn't exist."; LOGGER.error(msg, e); response.setStatus(Status.NoSuchObject(msg, e)); } catch (SentryInvalidInputException e) { @@ -287,6 +333,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { response.setStatus(Status.OK()); notificationHandlerInvoker.drop_sentry_role(commitContext, request, response); + for (SentryPolicyStorePlugin plugin : sentryPlugins) { + plugin.onDropSentryRole(request); + } } catch (SentryNoSuchObjectException e) { String msg = "Role :" + request + " does not exist."; LOGGER.error(msg, e); @@ -320,6 +369,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { response.setStatus(Status.OK()); notificationHandlerInvoker.alter_sentry_role_add_groups(commitContext, request, response); + for (SentryPolicyStorePlugin plugin : sentryPlugins) { + plugin.onAlterSentryRoleAddGroups(request); + } } catch (SentryNoSuchObjectException e) { String msg = "Role: " + request + " does not exist."; LOGGER.error(msg, e); @@ -353,6 +405,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { response.setStatus(Status.OK()); notificationHandlerInvoker.alter_sentry_role_delete_groups(commitContext, request, response); + for (SentryPolicyStorePlugin plugin : sentryPlugins) { + plugin.onAlterSentryRoleDeleteGroups(request); + } } catch (SentryNoSuchObjectException e) { String msg = "Role: " + request + " does not exist."; LOGGER.error(msg, e); @@ -548,7 +603,10 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { try { authorize(request.getRequestorUserName(), adminGroups); sentryStore.dropPrivilege(request.getAuthorizable()); - response.setStatus(Status.OK()); + for (SentryPolicyStorePlugin plugin : sentryPlugins) { + plugin.onDropSentryPrivilege(request); + } + response.setStatus(Status.OK()); } catch (SentryAccessDeniedException e) { LOGGER.error(e.getMessage(), e); response.setStatus(Status.AccessDenied(e.getMessage(), e)); @@ -572,6 +630,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { authorize(request.getRequestorUserName(), adminGroups); sentryStore.renamePrivilege(request.getOldAuthorizable(), request.getNewAuthorizable()); + for (SentryPolicyStorePlugin plugin : sentryPlugins) { + plugin.onRenameSentryPrivilege(request); + } response.setStatus(Status.OK()); } catch (SentryAccessDeniedException e) { LOGGER.error(e.getMessage(), e); @@ -633,6 +694,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { } response.setPrivilegesMapByAuth(authRoleMap); response.setStatus(Status.OK()); + // TODO : Sentry - HDFS : Have to handle this } catch (SentryAccessDeniedException e) { LOGGER.error(e.getMessage(), e); response.setStatus(Status.AccessDenied(e.getMessage(), e)); http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 1e20ff1..b19b79c 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -54,6 +54,7 @@ import org.apache.sentry.service.thrift.ServiceConstants.ConfUtilties; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TSaslServerTransport; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index bc86963..03ed378 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -67,6 +67,13 @@ public class ServiceConstants { public static final String RPC_MIN_THREADS = "sentry.service.server-min-threads"; public static final int RPC_MIN_THREADS_DEFAULT = 10; public static final String ALLOW_CONNECT = "sentry.service.allow.connect"; + + public static final String SENTRY_POLICY_STORE_PLUGINS = "sentry.policy.store.plugins"; + public static final String SENTRY_POLICY_STORE_PLUGINS_DEFAULT = ""; + + public static final String SENTRY_METASTORE_PLUGINS = "sentry.metastore.plugins"; + public static final String SENTRY_METASTORE_PLUGINS_DEFAULT = ""; + public static final String PROCESSOR_FACTORIES = "sentry.service.processor.factories"; public static final String PROCESSOR_FACTORIES_DEFAULT = "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory"; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java index 46f8fb8..ea4e967 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java @@ -21,6 +21,7 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig; +import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java index e5238a6..777c6d8 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java @@ -18,7 +18,6 @@ package org.apache.sentry.provider.db.service.thrift; import static junit.framework.Assert.assertEquals; -import static org.junit.Assert.assertEquals; import java.util.HashSet; import java.util.Set; @@ -77,7 +76,6 @@ public class TestSentryServerWithoutKerberos extends SentryServiceIntegrationBas client.grantTablePrivilege(requestorUserName, roleName1, "server", "db2", "table3", "ALL"); client.grantTablePrivilege(requestorUserName, roleName1, "server", "db2", "table4", "ALL"); - client.dropRoleIfExists(requestorUserName, roleName2); client.createRole(requestorUserName, roleName2); client.grantRoleToGroup(requestorUserName, group1, roleName2); @@ -89,6 +87,7 @@ public class TestSentryServerWithoutKerberos extends SentryServiceIntegrationBas client.grantTablePrivilege(requestorUserName, roleName2, "server", "db2", "table4", "ALL"); client.grantTablePrivilege(requestorUserName, roleName2, "server", "db3", "table5", "ALL"); + Set<TSentryPrivilege> listPrivilegesByRoleName = client.listPrivilegesByRoleName(requestorUserName, roleName2, Lists.newArrayList(new Server("server"), new Database("db1"))); assertEquals("Privilege not assigned to role2 !!", 2, listPrivilegesByRoleName.size()); @@ -162,4 +161,5 @@ public class TestSentryServerWithoutKerberos extends SentryServiceIntegrationBas assertEquals(0, client.listPrivilegesForProvider(requestorUserGroupNames, ActiveRoleSet.ALL).size()); } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-tests/sentry-tests-hive/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/pom.xml b/sentry-tests/sentry-tests-hive/pom.xml index 10415fc..a3c3295 100644 --- a/sentry-tests/sentry-tests-hive/pom.xml +++ b/sentry-tests/sentry-tests-hive/pom.xml @@ -222,6 +222,21 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sentry</groupId> + <artifactId>sentry-hdfs-common</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-hdfs-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-hdfs-namenode-plugin</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> <artifactId>sentry-policy-db</artifactId> <scope>test</scope> </dependency> @@ -229,12 +244,14 @@ limitations under the License. <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <scope>test</scope> +<!-- <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> </exclusion> </exclusions> +--> </dependency> <dependency> <groupId>org.hamcrest</groupId> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java new file mode 100644 index 0000000..a488c94 --- /dev/null +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java @@ -0,0 +1,787 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.hdfs; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.URL; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.TimeoutException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.MiniMRClientCluster; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl; +import org.apache.sentry.binding.hive.conf.HiveAuthzConf; +import org.apache.sentry.hdfs.SentryAuthorizationProvider; +import org.apache.sentry.provider.db.SimpleDBProviderBackend; +import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider; +import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.service.thrift.SentryService; +import org.apache.sentry.service.thrift.SentryServiceFactory; +import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; +import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; +import org.apache.sentry.tests.e2e.hive.StaticUserGroup; +import org.apache.sentry.tests.e2e.hive.fs.MiniDFS; +import org.apache.sentry.tests.e2e.hive.hiveserver.HiveServerFactory; +import org.apache.sentry.tests.e2e.hive.hiveserver.InternalHiveServer; +import org.apache.sentry.tests.e2e.hive.hiveserver.InternalMetastoreServer; +import org.fest.reflect.core.Reflection; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.google.common.io.Resources; + +public class TestHDFSIntegration { + + public static class WordCountMapper extends MapReduceBase implements + Mapper<LongWritable, Text, String, Long> { + + public void map(LongWritable key, Text value, + OutputCollector<String, Long> output, Reporter reporter) + throws IOException { + StringTokenizer st = new StringTokenizer(value.toString()); + while (st.hasMoreTokens()) { + output.collect(st.nextToken(), 1L); + } + } + + } + + public static class SumReducer extends MapReduceBase implements + Reducer<Text, Long, Text, Long> { + + public void reduce(Text key, Iterator<Long> values, + OutputCollector<Text, Long> output, Reporter reporter) + throws IOException { + + long sum = 0; + while (values.hasNext()) { + sum += values.next(); + } + output.collect(key, sum); + } + + } + + private static final int NUM_RETRIES = 10; + private static final int RETRY_WAIT = 1000; + + private MiniDFSCluster miniDFS; + private MiniMRClientCluster miniMR; + private InternalHiveServer hiveServer2; + private InternalMetastoreServer metastore; + private SentryService sentryService; + private String fsURI; + private int hmsPort; + private int sentryPort = -1; + private File baseDir; + private File policyFileLocation; + private UserGroupInformation adminUgi; + private UserGroupInformation hiveUgi; + + protected static File assertCreateDir(File dir) { + if(!dir.isDirectory()) { + Assert.assertTrue("Failed creating " + dir, dir.mkdirs()); + } + return dir; + } + + private static int findPort() throws IOException { + ServerSocket socket = new ServerSocket(0); + int port = socket.getLocalPort(); + socket.close(); + return port; + } + + private void waitOnSentryService() throws Exception { + sentryService.start(); + final long start = System.currentTimeMillis(); + while (!sentryService.isRunning()) { + Thread.sleep(1000); + if (System.currentTimeMillis() - start > 60000L) { + throw new TimeoutException("Server did not start after 60 seconds"); + } + } + } + + @Before + public void setup() throws Exception { + Class.forName("org.apache.hive.jdbc.HiveDriver"); + baseDir = Files.createTempDir(); + policyFileLocation = new File(baseDir, HiveServerFactory.AUTHZ_PROVIDER_FILENAME); + PolicyFile policyFile = PolicyFile.setAdminOnServer1("hive") + .setUserGroupMapping(StaticUserGroup.getStaticMapping()); + policyFile.write(policyFileLocation); + + adminUgi = UserGroupInformation.createUserForTesting( + System.getProperty("user.name"), new String[] { "supergroup" }); + + hiveUgi = UserGroupInformation.createUserForTesting( + "hive", new String[] { "hive" }); + + // Start Sentry + startSentry(); + + // Start HDFS and MR + startDFSandYARN(); + + // Start HiveServer2 and Metastore + startHiveAndMetastore(); + + } + + private void startHiveAndMetastore() throws IOException, InterruptedException { + hiveUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + HiveConf hiveConf = new HiveConf(); + hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin"); + hiveConf.set("sentry.service.client.server.rpc-address", "localhost"); + hiveConf.set("sentry.hdfs.service.client.server.rpc-address", "localhost"); + hiveConf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort)); + hiveConf.set("sentry.service.client.server.rpc-port", String.valueOf(sentryPort)); +// hiveConf.set("sentry.service.server.compact.transport", "true"); +// hiveConf.set("sentry.service.client.compact.transport", "true"); + hiveConf.set("sentry.service.security.mode", "none"); + hiveConf.set("sentry.hdfs.service.security.mode", "none"); + hiveConf.set("sentry.hdfs.init.update.retry.delay.ms", "500"); + hiveConf.set("sentry.hive.provider.backend", "org.apache.sentry.provider.db.SimpleDBProviderBackend"); + hiveConf.set("sentry.provider", LocalGroupResourceAuthorizationProvider.class.getName()); + hiveConf.set("sentry.hive.provider", LocalGroupResourceAuthorizationProvider.class.getName()); + hiveConf.set("sentry.hive.provider.resource", policyFileLocation.getPath()); + hiveConf.set("sentry.hive.testing.mode", "true"); + hiveConf.set("sentry.hive.server", "server1"); + + hiveConf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); + hiveConf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath()); + hiveConf.set("fs.defaultFS", fsURI); + hiveConf.set("fs.default.name", fsURI); + hiveConf.set("hive.metastore.execute.setugi", "true"); + hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=" + baseDir.getAbsolutePath() + "/metastore_db;create=true"); + hiveConf.set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); + hiveConf.set("javax.jdo.option.ConnectionUserName", "hive"); + hiveConf.set("javax.jdo.option.ConnectionPassword", "hive"); + hiveConf.set("datanucleus.autoCreateSchema", "true"); + hiveConf.set("datanucleus.fixedDatastore", "false"); + hiveConf.set("datanucleus.autoStartMechanism", "SchemaTable"); + hmsPort = findPort(); + System.out.println("\n\n HMS port : " + hmsPort + "\n\n"); + hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort); + hiveConf.set("hive.metastore.pre.event.listeners", "org.apache.sentry.binding.metastore.MetastoreAuthzBinding"); + hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListener"); + hiveConf.set("hive.security.authorization.task.factory", "org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl"); + hiveConf.set("hive.server2.session.hook", "org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook"); + + HiveAuthzConf authzConf = new HiveAuthzConf(Resources.getResource("sentry-site.xml")); + authzConf.addResource(hiveConf); + File confDir = assertCreateDir(new File(baseDir, "etc")); + File accessSite = new File(confDir, HiveAuthzConf.AUTHZ_SITE_FILE); + OutputStream out = new FileOutputStream(accessSite); + authzConf.set("fs.defaultFS", fsURI); + authzConf.writeXml(out); + out.close(); + + hiveConf.set("hive.sentry.conf.url", accessSite.getPath()); + System.out.println("Sentry client file : " + accessSite.getPath()); + + File hiveSite = new File(confDir, "hive-site.xml"); + hiveConf.set("hive.server2.enable.doAs", "false"); + hiveConf.set(HiveAuthzConf.HIVE_SENTRY_CONF_URL, accessSite.toURI().toURL() + .toExternalForm()); + out = new FileOutputStream(hiveSite); + hiveConf.writeXml(out); + out.close(); + + Reflection.staticField("hiveSiteURL") + .ofType(URL.class) + .in(HiveConf.class) + .set(hiveSite.toURI().toURL()); + + metastore = new InternalMetastoreServer(hiveConf); + new Thread() { + @Override + public void run() { + try { + metastore.start(); + while(true){} + } catch (Exception e) { + System.out.println("Could not start Hive Server"); + } + } + }.start(); + + hiveServer2 = new InternalHiveServer(hiveConf); + new Thread() { + @Override + public void run() { + try { + hiveServer2.start(); + while(true){} + } catch (Exception e) { + System.out.println("Could not start Hive Server"); + } + } + }.start(); + + Thread.sleep(10000); + return null; + } + }); + } + + private void startDFSandYARN() throws IOException, + InterruptedException { + adminUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data"); + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY, + SentryAuthorizationProvider.class.getName()); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + File dfsDir = assertCreateDir(new File(baseDir, "dfs")); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath()); + conf.set("hadoop.security.group.mapping", + MiniDFS.PseudoGroupMappingService.class.getName()); + Configuration.addDefaultResource("test.xml"); + + conf.set("sentry.authorization-provider.hdfs-path-prefixes", "/user/hive/warehouse,/tmp/external"); + conf.set("sentry.authorization-provider.cache-refresh-retry-wait.ms", "5000"); + conf.set("sentry.authorization-provider.cache-stale-threshold.ms", "3000"); + + conf.set("sentry.hdfs.service.security.mode", "none"); + conf.set("sentry.hdfs.service.client.server.rpc-address", "localhost"); + conf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort)); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + miniDFS = new MiniDFSCluster.Builder(conf).build(); + Path tmpPath = new Path("/tmp"); + Path hivePath = new Path("/user/hive"); + Path warehousePath = new Path(hivePath, "warehouse"); + miniDFS.getFileSystem().mkdirs(warehousePath); + boolean directory = miniDFS.getFileSystem().isDirectory(warehousePath); + System.out.println("\n\n Is dir :" + directory + "\n\n"); + System.out.println("\n\n DefaultFS :" + miniDFS.getFileSystem().getUri() + "\n\n"); + fsURI = miniDFS.getFileSystem().getUri().toString(); + conf.set("fs.defaultFS", fsURI); + + // Create Yarn cluster + // miniMR = MiniMRClientClusterFactory.create(this.getClass(), 1, conf); + + miniDFS.getFileSystem().mkdirs(tmpPath); + miniDFS.getFileSystem().setPermission(tmpPath, FsPermission.valueOf("drwxrwxrwx")); + miniDFS.getFileSystem().setOwner(hivePath, "hive", "hive"); + miniDFS.getFileSystem().setOwner(warehousePath, "hive", "hive"); + System.out.println("\n\n Owner :" + + miniDFS.getFileSystem().getFileStatus(warehousePath).getOwner() + + ", " + + miniDFS.getFileSystem().getFileStatus(warehousePath).getGroup() + + "\n\n"); + System.out.println("\n\n Owner tmp :" + + miniDFS.getFileSystem().getFileStatus(tmpPath).getOwner() + ", " + + miniDFS.getFileSystem().getFileStatus(tmpPath).getGroup() + ", " + + miniDFS.getFileSystem().getFileStatus(tmpPath).getPermission() + ", " + + "\n\n"); + + int dfsSafeCheckRetry = 30; + boolean hasStarted = false; + for (int i = dfsSafeCheckRetry; i > 0; i--) { + if (!miniDFS.getFileSystem().isInSafeMode()) { + hasStarted = true; + System.out.println("HDFS safemode check num times : " + (31 - i)); + break; + } + } + if (!hasStarted) { + throw new RuntimeException("HDFS hasnt exited safe mode yet.."); + } + + return null; + } + }); + } + + private void startSentry() throws IOException, + InterruptedException { + hiveUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Configuration sentryConf = new Configuration(false); + Map<String, String> properties = Maps.newHashMap(); + properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND, + SimpleDBProviderBackend.class.getName()); + properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname, + SentryHiveAuthorizationTaskFactoryImpl.class.getName()); + properties + .put(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS.varname, "2"); + properties.put("hive.metastore.uris", "thrift://localhost:" + hmsPort); + properties.put(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE); +// properties.put("sentry.service.server.compact.transport", "true"); + properties.put("sentry.hive.testing.mode", "true"); + properties.put("sentry.service.reporting", "JMX"); + properties.put(ServerConfig.ADMIN_GROUPS, "hive,admin"); + properties.put(ServerConfig.RPC_ADDRESS, "localhost"); + properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort < 0 ? 0 : sentryPort)); + properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); + + properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); + properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath()); + properties.put(ServerConfig.SENTRY_STORE_JDBC_URL, + "jdbc:derby:;databaseName=" + baseDir.getPath() + + "/sentrystore_db;create=true"); + properties.put("sentry.service.processor.factories", + "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin"); + properties.put(ServerConfig.RPC_MIN_THREADS, "3"); + for (Map.Entry<String, String> entry : properties.entrySet()) { + sentryConf.set(entry.getKey(), entry.getValue()); + } + sentryService = new SentryServiceFactory().create(sentryConf); + properties.put(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress() + .getHostName()); + sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress() + .getHostName()); + properties.put(ClientConfig.SERVER_RPC_PORT, + String.valueOf(sentryService.getAddress().getPort())); + sentryConf.set(ClientConfig.SERVER_RPC_PORT, + String.valueOf(sentryService.getAddress().getPort())); + waitOnSentryService(); + sentryPort = sentryService.getAddress().getPort(); + System.out.println("\n\n Sentry port : " + sentryPort + "\n\n"); + return null; + } + }); + } + + @After + public void cleanUp() throws Exception { + try { + if (miniDFS != null) { + miniDFS.shutdown(); + } + } finally { + try { + if (hiveServer2 != null) { + hiveServer2.shutdown(); + } + } finally { + if (metastore != null) { + metastore.shutdown(); + } + } + } + } + + @Test + public void testEnd2End() throws Throwable { + + Connection conn = hiveServer2.createConnection("hive", "hive"); + Statement stmt = conn.createStatement(); + stmt.execute("create role admin_role"); + stmt.execute("grant role admin_role to group hive"); + stmt.execute("grant all on server server1 to role admin_role"); + stmt.execute("create table p1 (s string) partitioned by (month int, day int)"); + stmt.execute("alter table p1 add partition (month=1, day=1)"); + stmt.execute("alter table p1 add partition (month=1, day=2)"); + stmt.execute("alter table p1 add partition (month=2, day=1)"); + stmt.execute("alter table p1 add partition (month=2, day=2)"); + + stmt.execute("create role p1_admin"); + stmt.execute("grant role p1_admin to group hbase"); + + verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false); + + loadData(stmt); + + verifyHDFSandMR(stmt); + + stmt.execute("revoke select on table p1 from role p1_admin"); + verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false); + + stmt.execute("grant all on table p1 to role p1_admin"); + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.ALL, "hbase", true); + + stmt.execute("revoke select on table p1 from role p1_admin"); + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true); + + // Verify table rename works + stmt.execute("alter table p1 rename to p3"); + verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true); + + stmt.execute("alter table p3 partition (month=1, day=1) rename to partition (month=1, day=3)"); + verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true); + verifyOnAllSubDirs("/user/hive/warehouse/p3/month=1/day=3", FsAction.WRITE_EXECUTE, "hbase", true); + + sentryService.stop(); + // Verify that Sentry permission are still enforced for the "stale" period + verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true); + + // Verify that Sentry permission are NOT enforced AFTER "stale" period + verifyOnAllSubDirs("/user/hive/warehouse/p3", null, "hbase", false); + + startSentry(); + // Verify that After Sentry restart permissions are re-enforced + verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true); + + // Create new table and verify everything is fine after restart... + stmt.execute("create table p2 (s string) partitioned by (month int, day int)"); + stmt.execute("alter table p2 add partition (month=1, day=1)"); + stmt.execute("alter table p2 add partition (month=1, day=2)"); + stmt.execute("alter table p2 add partition (month=2, day=1)"); + stmt.execute("alter table p2 add partition (month=2, day=2)"); + + verifyOnAllSubDirs("/user/hive/warehouse/p2", null, "hbase", false); + + stmt.execute("grant select on table p2 to role p1_admin"); + verifyOnAllSubDirs("/user/hive/warehouse/p2", FsAction.READ_EXECUTE, "hbase", true); + + stmt.execute("grant select on table p2 to role p1_admin"); + verifyOnAllSubDirs("/user/hive/warehouse/p2", FsAction.READ_EXECUTE, "hbase", true); + + // Create external table + writeToPath("/tmp/external/ext1", 5, "foo", "bar"); + + stmt.execute("create table ext1 (s string) location \'/tmp/external/ext1\'"); + verifyQuery(stmt, "ext1", 5); + + // Ensure existing group permissions are never returned.. + verifyOnAllSubDirs("/tmp/external/ext1", null, "bar", false); + verifyOnAllSubDirs("/tmp/external/ext1", null, "hbase", false); + + stmt.execute("grant all on table ext1 to role p1_admin"); + verifyOnAllSubDirs("/tmp/external/ext1", FsAction.ALL, "hbase", true); + + stmt.execute("revoke select on table ext1 from role p1_admin"); + verifyOnAllSubDirs("/tmp/external/ext1", FsAction.WRITE_EXECUTE, "hbase", true); + + // Verify database operations works correctly + stmt.execute("create database db1"); + verifyOnAllSubDirs("/user/hive/warehouse/db1.db", null, "hbase", false); + + stmt.execute("create table db1.tbl1 (s string)"); + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", null, "hbase", false); + stmt.execute("create table db1.tbl2 (s string)"); + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", null, "hbase", false); + + // Verify db privileges are propagated to tables + stmt.execute("grant select on database db1 to role p1_admin"); + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", FsAction.READ_EXECUTE, "hbase", true); + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", FsAction.READ_EXECUTE, "hbase", true); + + stmt.execute("use db1"); + stmt.execute("grant all on table tbl1 to role p1_admin"); + + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", FsAction.ALL, "hbase", true); + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", FsAction.READ_EXECUTE, "hbase", true); + + // Verify recursive revoke + stmt.execute("revoke select on database db1 from role p1_admin"); + + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", FsAction.WRITE_EXECUTE, "hbase", true); + verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", null, "hbase", false); + + // Verify cleanup.. + stmt.execute("drop table tbl1"); + Assert.assertFalse(miniDFS.getFileSystem().exists(new Path("/user/hive/warehouse/db1.db/tbl1"))); + + stmt.execute("drop table tbl2"); + Assert.assertFalse(miniDFS.getFileSystem().exists(new Path("/user/hive/warehouse/db1.db/tbl2"))); + + stmt.execute("use default"); + stmt.execute("drop database db1"); + Assert.assertFalse(miniDFS.getFileSystem().exists(new Path("/user/hive/warehouse/db1.db"))); + + // START : Verify external table set location.. + writeToPath("/tmp/external/tables/ext2_before/i=1", 5, "foo", "bar"); + writeToPath("/tmp/external/tables/ext2_before/i=2", 5, "foo", "bar"); + + stmt.execute("create external table ext2 (s string) partitioned by (i int) location \'/tmp/external/tables/ext2_before\'"); + stmt.execute("alter table ext2 add partition (i=1)"); + stmt.execute("alter table ext2 add partition (i=2)"); + verifyQuery(stmt, "ext2", 10); + verifyOnAllSubDirs("/tmp/external/tables/ext2_before", null, "hbase", false); + stmt.execute("grant all on table ext2 to role p1_admin"); + verifyOnPath("/tmp/external/tables/ext2_before", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_before/i=1", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_before/i=2", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_before/i=1/stuff.txt", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_before/i=2/stuff.txt", FsAction.ALL, "hbase", true); + + writeToPath("/tmp/external/tables/ext2_after/i=1", 6, "foo", "bar"); + writeToPath("/tmp/external/tables/ext2_after/i=2", 6, "foo", "bar"); + + stmt.execute("alter table ext2 set location \'hdfs:///tmp/external/tables/ext2_after\'"); + // Even though table location is altered, partition location is still old (still 10 rows) + verifyQuery(stmt, "ext2", 10); + // You have to explicitly alter partition location.. + verifyOnPath("/tmp/external/tables/ext2_before", null, "hbase", false); + verifyOnPath("/tmp/external/tables/ext2_before/i=1", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_before/i=2", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_before/i=1/stuff.txt", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_before/i=2/stuff.txt", FsAction.ALL, "hbase", true); + + stmt.execute("alter table ext2 partition (i=1) set location \'hdfs:///tmp/external/tables/ext2_after/i=1\'"); + stmt.execute("alter table ext2 partition (i=2) set location \'hdfs:///tmp/external/tables/ext2_after/i=2\'"); + // Now that partition location is altered, it picks up new data (12 rows instead of 10) + verifyQuery(stmt, "ext2", 12); + + verifyOnPath("/tmp/external/tables/ext2_before", null, "hbase", false); + verifyOnPath("/tmp/external/tables/ext2_before/i=1", null, "hbase", false); + verifyOnPath("/tmp/external/tables/ext2_before/i=2", null, "hbase", false); + verifyOnPath("/tmp/external/tables/ext2_before/i=1/stuff.txt", null, "hbase", false); + verifyOnPath("/tmp/external/tables/ext2_before/i=2/stuff.txt", null, "hbase", false); + verifyOnPath("/tmp/external/tables/ext2_after", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_after/i=1", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_after/i=2", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_after/i=1/stuff.txt", FsAction.ALL, "hbase", true); + verifyOnPath("/tmp/external/tables/ext2_after/i=2/stuff.txt", FsAction.ALL, "hbase", true); + // END : Verify external table set location.. + + stmt.close(); + conn.close(); + } + + private void verifyQuery(Statement stmt, String table, int n) throws Throwable { + verifyQuery(stmt, table, n, NUM_RETRIES); + } + + private void verifyQuery(Statement stmt, String table, int n, int retry) throws Throwable { + ResultSet rs = null; + try { + rs = stmt.executeQuery("select * from " + table); + int numRows = 0; + while (rs.next()) { numRows++; } + Assert.assertEquals(n, numRows); + } catch (Throwable th) { + if (retry > 0) { + Thread.sleep(RETRY_WAIT); + verifyQuery(stmt, table, n, retry - 1); + } else { + throw th; + } + } + } + + private void loadData(Statement stmt) throws IOException, SQLException { + FSDataOutputStream f1 = miniDFS.getFileSystem().create(new Path("/tmp/f1.txt")); + f1.writeChars("m1d1_t1\n"); + f1.writeChars("m1d1_t2\n"); + f1.writeChars("m1d1_t3\n"); + f1.flush(); + f1.close(); + stmt.execute("load data inpath \'/tmp/f1.txt\' overwrite into table p1 partition (month=1, day=1)"); + FSDataOutputStream f2 = miniDFS.getFileSystem().create(new Path("/tmp/f2.txt")); + f2.writeChars("m2d2_t4\n"); + f2.writeChars("m2d2_t5\n"); + f2.writeChars("m2d2_t6\n"); + f2.flush(); + f2.close(); + stmt.execute("load data inpath \'/tmp/f2.txt\' overwrite into table p1 partition (month=2, day=2)"); + ResultSet rs = stmt.executeQuery("select * from p1"); + List<String> vals = new ArrayList<String>(); + while (rs.next()) { + vals.add(rs.getString(1)); + } + Assert.assertEquals(6, vals.size()); + rs.close(); + } + + private void writeToPath(String path, int numRows, String user, String group) throws IOException { + Path p = new Path(path); + miniDFS.getFileSystem().mkdirs(p); + miniDFS.getFileSystem().setOwner(p, user, group); +// miniDFS.getFileSystem().setPermission(p, FsPermission.valueOf("-rwxrwx---")); + FSDataOutputStream f1 = miniDFS.getFileSystem().create(new Path(path + "/stuff.txt")); + for (int i = 0; i < numRows; i++) { + f1.writeChars("random" + i + "\n"); + } + f1.flush(); + f1.close(); + miniDFS.getFileSystem().setOwner(new Path(path + "/stuff.txt"), "asuresh", "supergroup"); + miniDFS.getFileSystem().setPermission(new Path(path + "/stuff.txt"), FsPermission.valueOf("-rwxrwx---")); + } + + private void verifyHDFSandMR(Statement stmt) throws Throwable { + // hbase user should not be allowed to read... + UserGroupInformation hbaseUgi = UserGroupInformation.createUserForTesting("hbase", new String[] {"hbase"}); + hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try { + miniDFS.getFileSystem().open(new Path("/user/hive/warehouse/p1/month=1/day=1/f1.txt")); + Assert.fail("Should not be allowed !!"); + } catch (Exception e) { + Assert.assertEquals("Wrong Error : " + e.getMessage(), true, e.getMessage().contains("Permission denied: user=hbase")); + } + return null; + } + }); + + // WordCount should fail.. + // runWordCount(new JobConf(miniMR.getConfig()), "/user/hive/warehouse/p1/month=1/day=1", "/tmp/wc_out"); + + stmt.execute("grant select on table p1 to role p1_admin"); + + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE, "hbase", true); + // hbase user should now be allowed to read... + hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Path p = new Path("/user/hive/warehouse/p1/month=2/day=2/f2.txt"); + BufferedReader in = new BufferedReader(new InputStreamReader(miniDFS.getFileSystem().open(p))); + String line = null; + List<String> lines = new ArrayList<String>(); + do { + line = in.readLine(); + if (line != null) lines.add(line); + } while (line != null); + Assert.assertEquals(3, lines.size()); + in.close(); + return null; + } + }); + + } + + private void verifyOnAllSubDirs(String path, FsAction fsAction, String group, boolean groupShouldExist) throws Throwable { + verifyOnAllSubDirs(path, fsAction, group, groupShouldExist, true); + } + + private void verifyOnPath(String path, FsAction fsAction, String group, boolean groupShouldExist) throws Throwable { + verifyOnAllSubDirs(path, fsAction, group, groupShouldExist, false); + } + + private void verifyOnAllSubDirs(String path, FsAction fsAction, String group, boolean groupShouldExist, boolean recurse) throws Throwable { + verifyOnAllSubDirs(new Path(path), fsAction, group, groupShouldExist, recurse, NUM_RETRIES); + } + + private void verifyOnAllSubDirs(Path p, FsAction fsAction, String group, boolean groupShouldExist, boolean recurse, int retry) throws Throwable { + FileStatus fStatus = null; + try { + fStatus = miniDFS.getFileSystem().getFileStatus(p); + if (groupShouldExist) { + Assert.assertEquals(fsAction, getAcls(p).get(group)); + } else { + Assert.assertFalse(getAcls(p).containsKey(group)); + } + } catch (Throwable th) { + if (retry > 0) { + Thread.sleep(RETRY_WAIT); + verifyOnAllSubDirs(p, fsAction, group, groupShouldExist, recurse, retry - 1); + } else { + throw th; + } + } + if (recurse) { + if (fStatus.isDirectory()) { + FileStatus[] children = miniDFS.getFileSystem().listStatus(p); + for (FileStatus fs : children) { + verifyOnAllSubDirs(fs.getPath(), fsAction, group, groupShouldExist, recurse, NUM_RETRIES); + } + } + } + } + + private Map<String, FsAction> getAcls(Path path) throws Exception { + AclStatus aclStatus = miniDFS.getFileSystem().getAclStatus(path); + Map<String, FsAction> acls = new HashMap<String, FsAction>(); + for (AclEntry ent : aclStatus.getEntries()) { + if (ent.getType().equals(AclEntryType.GROUP)) { + acls.put(ent.getName(), ent.getPermission()); + } + } + return acls; + } + + private void runWordCount(JobConf job, String inPath, String outPath) throws Exception { + Path in = new Path(inPath); + Path out = new Path(outPath); + miniDFS.getFileSystem().delete(out, true); + job.setJobName("TestWC"); + JobClient jobClient = new JobClient(job); + RunningJob submittedJob = null; + FileInputFormat.setInputPaths(job, in); + FileOutputFormat.setOutputPath(job, out); + job.set("mapreduce.output.textoutputformat.separator", " "); + job.setInputFormat(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(SumReducer.class); + job.setOutputFormat(TextOutputFormat.class); + job.setNumReduceTasks(1); + job.setInt("mapreduce.map.maxattempts", 1); + job.setInt("mapreduce.reduce.maxattempts", 1); + + submittedJob = jobClient.submitJob(job); + if (!jobClient.monitorAndPrintJob(job, submittedJob)) { + throw new IOException("job Failed !!"); + } + + } +}
