Repository: incubator-sentry Updated Branches: refs/heads/master 8e16e87ce -> fd31d2cd4
SENTRY-696: Improve Metastoreplugin Cache Initialization time (Arun Suresh via Prasad Mujumdar) Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/fd31d2cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/fd31d2cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/fd31d2cd Branch: refs/heads/master Commit: fd31d2cd4b7d0d63dc6f8a61d2cf6b3cf4d4d72e Parents: 8e16e87 Author: Prasad Mujumdar <[email protected]> Authored: Sat Apr 18 10:05:39 2015 -0700 Committer: Prasad Mujumdar <[email protected]> Committed: Sat Apr 18 10:05:39 2015 -0700 ---------------------------------------------------------------------- .../apache/sentry/hdfs/ServiceConstants.java | 8 + sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml | 4 + sentry-hdfs/sentry-hdfs-service/pom.xml | 5 + .../sentry/hdfs/MetastoreCacheInitializer.java | 252 +++++++++++++++++++ .../org/apache/sentry/hdfs/MetastorePlugin.java | 150 +++++++---- .../sentry/hdfs/MetastorePluginWithHA.java | 2 +- .../hdfs/TestMetastoreCacheInitializer.java | 133 ++++++++++ 7 files changed, 498 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index 489d165..19b0b49 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -51,7 +51,15 @@ public class ServiceConstants { public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_hdfs"; public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE = "sentry.metastore.ha.zookeeper.namespace"; public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_metastore"; + public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS = "sentry.hdfs.sync.metastore.cache.init.threads"; + public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT = 10; + public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE = "sentry.hdfs.sync.metastore.cache.async-init.enable"; + public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT = false; + public static String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-partitions-per-rpc"; + public static int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT = 100; + public static String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-tables-per-rpc"; + public static int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT = 100; } public static class ClientConfig { http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml b/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml index f35baf4..04b79d8 100644 --- a/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml @@ -54,6 +54,10 @@ limitations under the License. <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/pom.xml b/sentry-hdfs/sentry-hdfs-service/pom.xml index 4d65edf..5d5d525 100644 --- a/sentry-hdfs/sentry-hdfs-service/pom.xml +++ b/sentry-hdfs/sentry-hdfs-service/pom.xml @@ -33,6 +33,11 @@ limitations under the License. <artifactId>sentry-binding-hive</artifactId> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java new file mode 100644 index 0000000..093d21a --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.service.thrift.TPathChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +class MetastoreCacheInitializer implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger + (MetastoreCacheInitializer.class); + + static class CallResult { + final Exception failure; + + CallResult(Exception ex) { + failure = null; + } + } + + abstract class BaseTask implements Callable<CallResult> { + + BaseTask() { taskCounter.incrementAndGet(); } + + @Override + public CallResult call() throws Exception { + try { + doTask(); + } catch (Exception ex) { + // Ignore if object requested does not exists + return new CallResult( + (ex instanceof NoSuchObjectException) ? null : ex); + } finally { + taskCounter.decrementAndGet(); + } + return new CallResult(null); + } + + abstract void doTask() throws Exception; + } + + class PartitionTask extends BaseTask { + private final String dbName; + private final String tblName; + private final List<String> partNames; + private final TPathChanges tblPathChange; + + PartitionTask(String dbName, String tblName, List<String> partNames, + TPathChanges tblPathChange) { + super(); + this.dbName = dbName; + this.tblName = tblName; + this.partNames = partNames; + this.tblPathChange = tblPathChange; + } + + @Override + public void doTask() throws Exception { + List<Partition> tblParts = + hmsHandler.get_partitions_by_names(dbName, tblName, partNames); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### Fetching partitions " + + "[" + dbName + "." + tblName + "]" + "[" + partNames + "]"); + } + for (Partition part : tblParts) { + List<String> partPath = PathsUpdate.parsePath(part.getSd() + .getLocation()); + if (partPath != null) { + synchronized (tblPathChange) { + tblPathChange.addToAddPaths(partPath); + } + } + } + } + } + + class TableTask extends BaseTask { + private final Database db; + private final List<String> tableNames; + private final PathsUpdate update; + + TableTask(Database db, List<String> tableNames, PathsUpdate update) { + super(); + this.db = db; + this.tableNames = tableNames; + this.update = update; + } + + @Override + public void doTask() throws Exception { + List<Table> tables = + hmsHandler.get_table_objects_by_name(db.getName(), tableNames); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### Fetching tables [" + db.getName() + "][" + + tableNames + "]"); + } + for (Table tbl : tables) { + TPathChanges tblPathChange; + synchronized (update) { + tblPathChange = update.newPathChange(tbl.getDbName() + "." + tbl + .getTableName()); + } + if (tbl.getSd().getLocation() != null) { + List<String> tblPath = + PathsUpdate.parsePath(tbl.getSd().getLocation()); + tblPathChange.addToAddPaths(tblPath); + List<String> tblPartNames = + hmsHandler.get_partition_names(db.getName(), tbl + .getTableName(), (short) -1); + for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { + List<String> partsToFetch = + tblPartNames.subList(i, Math.min( + i + maxPartitionsPerCall, tblPartNames.size())); + Callable<CallResult> partTask = + new PartitionTask(db.getName(), tbl.getTableName(), + partsToFetch, tblPathChange); + synchronized (results) { + results.add(threadPool.submit(partTask)); + } + } + } + } + } + } + + class DbTask extends BaseTask { + + private final PathsUpdate update; + private final String dbName; + + DbTask(PathsUpdate update, String dbName) { + super(); + this.update = update; + this.dbName = dbName; + } + + @Override + public void doTask() throws Exception { + Database db = hmsHandler.get_database(dbName); + List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri()); + if (dbPath != null) { + synchronized (update) { + update.newPathChange(db.getName()).addToAddPaths(dbPath); + } + } + List<String> allTblStr = hmsHandler.get_all_tables(db.getName()); + for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) { + List<String> tablesToFetch = + allTblStr.subList(i, Math.min( + i + maxTablesPerCall, allTblStr.size())); + Callable<CallResult> tableTask = + new TableTask(db, tablesToFetch, update); + synchronized (results) { + results.add(threadPool.submit(tableTask)); + } + } + } + } + + private final ExecutorService threadPool; + private final IHMSHandler hmsHandler; + private final int maxPartitionsPerCall; + private final int maxTablesPerCall; + private final List<Future<CallResult>> results = + new ArrayList<Future<CallResult>>(); + private final AtomicInteger taskCounter = new AtomicInteger(0); + + MetastoreCacheInitializer(IHMSHandler hmsHandler, Configuration conf) { + this.hmsHandler = hmsHandler; + this.maxPartitionsPerCall = conf.getInt( + ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, + ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); + this.maxTablesPerCall = conf.getInt( + ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, + ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); + threadPool = Executors.newFixedThreadPool(conf.getInt( + ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, + ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); + } + + UpdateableAuthzPaths createInitialUpdate() throws + Exception { + UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new + String[]{"/"}); + PathsUpdate tempUpdate = new PathsUpdate(-1, false); + List<String> allDbStr = hmsHandler.get_all_databases(); + List<Future<CallResult>> results = new ArrayList<Future<CallResult>>(); + for (String dbName : allDbStr) { + Callable<CallResult> dbTask = new DbTask(tempUpdate, dbName); + results.add(threadPool.submit(dbTask)); + } + + while (taskCounter.get() > 0) { + Thread.sleep(1000); + // Wait until no more tasks remain + } + for (Future<CallResult> result : results) { + CallResult callResult = result.get(); + if (callResult.failure != null) { + throw new RuntimeException(callResult.failure); + } + } + authzPaths.updatePartial(Lists.newArrayList(tempUpdate), + new ReentrantReadWriteLock()); + return authzPaths; + } + + + @Override + public void close() throws IOException { + if (threadPool != null) { + threadPool.shutdownNow(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java index 7106e74..d7b5d5a 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java @@ -17,8 +17,11 @@ */ package org.apache.sentry.hdfs; -import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -31,14 +34,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.MetaStorePreEventListener; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; -import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +59,11 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { // No need to sync.. as metastore is in the process of pushing an update.. return; } + if (MetastorePlugin.this.authzPaths == null) { + LOGGER.info("#### Metastore Plugin cache has not finished" + + "initialization."); + return; + } try { long lastSeenBySentry = MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum(); @@ -85,7 +88,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { private final Configuration conf; private SentryHDFSServiceClient sentryClient; - private UpdateableAuthzPaths authzPaths; + private volatile UpdateableAuthzPaths authzPaths; private Lock notificiationLock; // Initialized to some value > 1. @@ -94,6 +97,11 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { // Has to match the value of seqNum protected static volatile long lastSentSeqNum = seqNum.get(); private volatile boolean syncSent = false; + private volatile boolean initComplete = false; + private volatile boolean queueFlushComplete = false; + private volatile Throwable initError = null; + private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>(); + private final ExecutorService threadPool; private final Configuration sentryConf; @@ -111,11 +119,53 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname); this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname); this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname); - try { - this.authzPaths = createInitialUpdate(new ProxyHMSHandler("sentry.hdfs", (HiveConf)this.conf)); - } catch (Exception e1) { - LOGGER.error("Could not create Initial AuthzPaths or HMSHandler !!", e1); - throw new RuntimeException(e1); + Thread initUpdater = new Thread() { + @Override + public void run() { + MetastoreCacheInitializer cacheInitializer = null; + try { + cacheInitializer = + new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs", + (HiveConf) MetastorePlugin.this.conf), + MetastorePlugin.this.conf); + MetastorePlugin.this.authzPaths = + cacheInitializer.createInitialUpdate(); + LOGGER.info("#### Metastore Plugin initialization complete !!"); + synchronized (updateQueue) { + while (!updateQueue.isEmpty()) { + PathsUpdate update = updateQueue.poll(); + if (update != null) { + processUpdate(update); + } + } + queueFlushComplete = true; + } + LOGGER.info("#### Finished flushing queued updates to Sentry !!"); + } catch (Exception e) { + LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e); + initError = e; + } finally { + if (cacheInitializer != null) { + try { + cacheInitializer.close(); + } catch (Exception e) { + LOGGER.info("#### Exception while closing cacheInitializer !!", e); + } + } + initComplete = true; + } + } + }; + if (this.conf.getBoolean( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE, + ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) { + LOGGER.warn("#### Metastore Cache initialization is set to aync..." + + "HDFS ACL synchronization will not happen until metastore" + + "cache initialization is completed !!"); + initUpdater.start(); + } else { + initUpdater.run(); } try { sentryClient = SentryHDFSServiceClientFactory.create(sentryConf); @@ -125,49 +175,15 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { } ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1); threadPool.scheduleWithFixedDelay(new SyncTask(), - this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, - ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), - this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS, - ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT), - TimeUnit.MILLISECONDS); + this.conf.getLong(ServerConfig + .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, + ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), + this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS, + ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT), + TimeUnit.MILLISECONDS); this.threadPool = threadPool; } - private UpdateableAuthzPaths createInitialUpdate(IHMSHandler hmsHandler) throws Exception { - UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new String[] {"/"}); - PathsUpdate tempUpdate = new PathsUpdate(-1, false); - List<String> allDbStr = hmsHandler.get_all_databases(); - for (String dbName : allDbStr) { - Database db = hmsHandler.get_database(dbName); - List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri()); - if(dbPath != null) { - tempUpdate.newPathChange(db.getName()).addToAddPaths(dbPath); - } - List<String> allTblStr = hmsHandler.get_all_tables(db.getName()); - for (String tblName : allTblStr) { - Table tbl = hmsHandler.get_table(db.getName(), tblName); - TPathChanges tblPathChange = tempUpdate.newPathChange(tbl - .getDbName() + "." + tbl.getTableName()); - List<Partition> tblParts = - hmsHandler.get_partitions(db.getName(), tbl.getTableName(), (short) -1); - List<String> tb1Path = PathsUpdate.parsePath(tbl.getSd().getLocation() == null ? - db.getLocationUri() : tbl.getSd().getLocation()); - if(tb1Path != null) { - tblPathChange.addToAddPaths(tb1Path); - } - for (Partition part : tblParts) { - List<String> partPath = PathsUpdate.parsePath(part.getSd().getLocation()); - if(partPath != null) { - tblPathChange.addToAddPaths(partPath); - } - } - } - } - authzPaths.updatePartial(Lists.newArrayList(tempUpdate), - new ReentrantReadWriteLock()); - return authzPaths; - } - @Override public void addPath(String authzObj, String path) { List<String> pathTree = PathsUpdate.parsePath(path); @@ -197,7 +213,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { } } update.newPathChange(authzObj).addToDelPaths( - Lists.newArrayList(PathsUpdate.ALL_PATHS)); + Lists.newArrayList(PathsUpdate.ALL_PATHS)); notifySentryAndApplyLocal(update); } @@ -247,7 +263,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { sentryClient = SentryHDFSServiceClientFactory.create(sentryConf); } catch (Exception e) { sentryClient = null; - LOGGER.error("Could not connect to Sentry HDFS Service !!", e); + LOGGER.error("#### Could not connect to Sentry HDFS Service !!", e); } } return sentryClient; @@ -285,7 +301,31 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock()); } - protected void notifySentryAndApplyLocal(PathsUpdate update) { + private void notifySentryAndApplyLocal(PathsUpdate update) { + if (initComplete) { + processUpdate(update); + } else { + if (initError == null) { + synchronized (updateQueue) { + if (!queueFlushComplete) { + updateQueue.add(update); + } else { + processUpdate(update); + } + } + } else { + StringWriter sw = new StringWriter(); + initError.printStackTrace(new PrintWriter(sw)); + LOGGER.error("#### Error initializing Metastore Plugin" + + "[" + sw.toString() + "] !!"); + throw new RuntimeException(initError); + } + LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." + + "Metastore hasn't been initialized yet !!"); + } + } + + protected void processUpdate(PathsUpdate update) { applyLocal(update); notifySentry(update); } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java index ee5e0f9..4f6d7ca 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java @@ -79,7 +79,7 @@ public class MetastorePluginWithHA extends MetastorePlugin { } @Override - protected void notifySentryAndApplyLocal(PathsUpdate update) { + protected void processUpdate(PathsUpdate update) { try { // push to ZK in order to keep the metastore local cache in sync pluginCacheSync.handleCacheUpdate(update); http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java new file mode 100644 index 0000000..a5a165a --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; + +public class TestMetastoreCacheInitializer { + + @Test + public void testInitializer() throws Exception { + + Database db1 = Mockito.mock(Database.class); + Mockito.when(db1.getName()).thenReturn("db1"); + Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1"); + Database db2 = Mockito.mock(Database.class); + Mockito.when(db2.getName()).thenReturn("db2"); + Mockito.when(db2.getLocationUri()).thenReturn("hdfs:///db2"); + Database db3 = Mockito.mock(Database.class); + Mockito.when(db3.getName()).thenReturn("db3"); + Mockito.when(db3.getLocationUri()).thenReturn("hdfs:///db3"); + + Table tab21 = Mockito.mock(Table.class); + Mockito.when(tab21.getDbName()).thenReturn("db2"); + Mockito.when(tab21.getTableName()).thenReturn("tab21"); + StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db2/tab21"); + Mockito.when(tab21.getSd()).thenReturn(sd21); + + Table tab31 = Mockito.mock(Table.class); + Mockito.when(tab31.getDbName()).thenReturn("db3"); + Mockito.when(tab31.getTableName()).thenReturn("tab31"); + StorageDescriptor sd31 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd31.getLocation()).thenReturn("hdfs:///db3/tab31"); + Mockito.when(tab31.getSd()).thenReturn(sd31); + + Partition part311 = Mockito.mock(Partition.class); + StorageDescriptor sd311 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd311.getLocation()).thenReturn("hdfs:///db3/tab31/part311"); + Mockito.when(part311.getSd()).thenReturn(sd311); + + Partition part312 = Mockito.mock(Partition.class); + StorageDescriptor sd312 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd312.getLocation()).thenReturn("hdfs:///db3/tab31/part312"); + Mockito.when(part312.getSd()).thenReturn(sd312); + + IHMSHandler hmsHandler = Mockito.mock(IHMSHandler.class); + Mockito.when(hmsHandler.get_all_databases()).thenReturn(Lists + .newArrayList("db1", "db2", "db3")); + Mockito.when(hmsHandler.get_database("db1")).thenReturn(db1); + Mockito.when(hmsHandler.get_all_tables("db1")).thenReturn(new + ArrayList<String>()); + + Mockito.when(hmsHandler.get_database("db2")).thenReturn(db2); + Mockito.when(hmsHandler.get_all_tables("db2")).thenReturn(Lists + .newArrayList("tab21")); + Mockito.when(hmsHandler.get_table_objects_by_name("db2", + Lists.newArrayList("tab21"))) + .thenReturn(Lists.newArrayList(tab21)); + Mockito.when(hmsHandler.get_partition_names("db2", "tab21", (short) -1)) + .thenReturn(new ArrayList<String>()); + + Mockito.when(hmsHandler.get_database("db3")).thenReturn(db3); + Mockito.when(hmsHandler.get_all_tables("db3")).thenReturn(Lists + .newArrayList("tab31")); + Mockito.when(hmsHandler.get_table_objects_by_name("db3", + Lists.newArrayList("tab31"))) + .thenReturn(Lists.newArrayList(tab31)); + Mockito.when(hmsHandler.get_partition_names("db3", "tab31", (short) -1)) + .thenReturn(Lists.newArrayList("part311", "part312")); + + Mockito.when(hmsHandler.get_partitions_by_names("db3", "tab31", + Lists.newArrayList("part311"))) + .thenReturn(Lists.newArrayList(part311)); + Mockito.when(hmsHandler.get_partitions_by_names("db3", "tab31", + Lists.newArrayList("part312"))) + .thenReturn(Lists.newArrayList(part312)); + + Configuration conf = new Configuration(); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1); + + MetastoreCacheInitializer cacheInitializer = new + MetastoreCacheInitializer(hmsHandler, conf); + UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate(); + + Assert.assertEquals("db1", update.findAuthzObjectExactMatch(new + String[]{"db1"})); + Assert.assertEquals("db2", update.findAuthzObjectExactMatch(new + String[]{"db2"})); + Assert.assertEquals("db2.tab21", update.findAuthzObjectExactMatch(new + String[]{"db2", "tab21"})); + Assert.assertEquals("db3", update.findAuthzObjectExactMatch(new + String[]{"db3"})); + Assert.assertEquals("db3.tab31", update.findAuthzObjectExactMatch(new + String[]{"db3", "tab31"})); + Assert.assertEquals("db3.tab31", update.findAuthzObjectExactMatch(new + String[]{"db3", "tab31", "part311"})); + Assert.assertEquals("db3.tab31", update.findAuthzObjectExactMatch(new + String[]{"db3", "tab31", "part312"})); + cacheInitializer.close(); + + } +}
