Repository: hive Updated Branches: refs/heads/llap 47187618b -> f84eefa52
http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 332e30a..ae73feb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -80,6 +80,8 @@ class HBaseReadWrite { @VisibleForTesting final static String PART_TABLE = "HBMS_PARTITIONS"; @VisibleForTesting final static String ROLE_TABLE = "HBMS_ROLES"; @VisibleForTesting final static String SD_TABLE = "HBMS_SDS"; + @VisibleForTesting final static String SECURITY_TABLE = "HBMS_SECURITY"; + @VisibleForTesting final static String SEQUENCES_TABLE = "HBMS_SEQUENCES"; @VisibleForTesting final static String TABLE_TABLE = "HBMS_TBLS"; @VisibleForTesting final static String USER_TO_ROLE_TABLE = "HBMS_USER_TO_ROLE"; @VisibleForTesting final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING); @@ -90,7 +92,7 @@ class HBaseReadWrite { */ final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, - TABLE_TABLE }; + SECURITY_TABLE, SEQUENCES_TABLE, TABLE_TABLE}; final static Map<String, List<byte[]>> columnFamilies = new HashMap<String, List<byte[]>> (tableNames.length); @@ -103,6 +105,8 @@ class HBaseReadWrite { columnFamilies.put(USER_TO_ROLE_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(ROLE_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(SD_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(SECURITY_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(SEQUENCES_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); } @@ -110,12 +114,16 @@ class HBaseReadWrite { * Stores the bloom filter for the aggregated stats, to determine what partitions are in this * aggregate. */ + final static byte[] MASTER_KEY_SEQUENCE = "mk".getBytes(HBaseUtils.ENCODING); final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING); private final static byte[] CATALOG_COL = "c".getBytes(HBaseUtils.ENCODING); private final static byte[] ROLES_COL = "roles".getBytes(HBaseUtils.ENCODING); private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); + private final static byte[] DELEGATION_TOKEN_COL = "dt".getBytes(HBaseUtils.ENCODING); + private final static byte[] MASTER_KEY_COL = "mk".getBytes(HBaseUtils.ENCODING); private final static byte[] AGGR_STATS_STATS_COL = "s".getBytes(HBaseUtils.ENCODING); private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING); + private final static byte[] SEQUENCES_KEY = "seq".getBytes(HBaseUtils.ENCODING); private final static int TABLES_TO_CACHE = 10; // False positives are very bad here because they cause us to invalidate entries we shouldn't. // Space used and # of hash functions grows in proportion to ln of num bits so a 10x increase @@ -226,7 +234,7 @@ class HBaseReadWrite { sdHits = new Counter("storage descriptor cache hits"); sdMisses = new Counter("storage descriptor cache misses"); sdOverflows = new Counter("storage descriptor cache overflows"); - counters = new ArrayList<Counter>(); + counters = new ArrayList<>(); counters.add(tableHits); counters.add(tableMisses); counters.add(tableOverflows); @@ -241,18 +249,16 @@ class HBaseReadWrite { // (storage descriptors are shared, so 99% should be the same for a given table) int sdsCacheSize = totalCatalogObjectsToCache / 100; if (conf.getBoolean(NO_CACHE_CONF, false)) { - tableCache = new BogusObjectCache<ObjectPair<String, String>, Table>(); - sdCache = new BogusObjectCache<ByteArrayWrapper, StorageDescriptor>(); + tableCache = new BogusObjectCache<>(); + sdCache = new BogusObjectCache<>(); partCache = new BogusPartitionCache(); } else { - tableCache = new ObjectCache<ObjectPair<String, String>, Table>(TABLES_TO_CACHE, tableHits, - tableMisses, tableOverflows); - sdCache = new ObjectCache<ByteArrayWrapper, StorageDescriptor>(sdsCacheSize, sdHits, - sdMisses, sdOverflows); + tableCache = new ObjectCache<>(TABLES_TO_CACHE, tableHits, tableMisses, tableOverflows); + sdCache = new ObjectCache<>(sdsCacheSize, sdHits, sdMisses, sdOverflows); partCache = new PartitionCache(totalCatalogObjectsToCache, partHits, partMisses, partOverflows); } statsCache = StatsCache.getInstance(conf); - roleCache = new HashMap<String, HbaseMetastoreProto.RoleGrantInfoList>(); + roleCache = new HashMap<>(); entireRoleTableInCache = false; } @@ -338,7 +344,7 @@ class HBaseReadWrite { } Iterator<Result> iter = scan(DB_TABLE, CATALOG_CF, CATALOG_COL, filter); - List<Database> databases = new ArrayList<Database>(); + List<Database> databases = new ArrayList<>(); while (iter.hasNext()) { Result result = iter.next(); databases.add(HBaseUtils.deserializeDatabase(result.getRow(), @@ -404,7 +410,7 @@ class HBaseReadWrite { } Iterator<Result> iter = scan(FUNC_TABLE, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), CATALOG_CF, CATALOG_COL, filter); - List<Function> functions = new ArrayList<Function>(); + List<Function> functions = new ArrayList<>(); while (iter.hasNext()) { Result result = iter.next(); functions.add(HBaseUtils.deserializeFunction(result.getRow(), @@ -489,8 +495,8 @@ class HBaseReadWrite { */ List<Partition> getPartitions(String dbName, String tableName, List<List<String>> partValLists) throws IOException { - List<Partition> parts = new ArrayList<Partition>(partValLists.size()); - List<Get> gets = new ArrayList<Get>(partValLists.size()); + List<Partition> parts = new ArrayList<>(partValLists.size()); + List<Get> gets = new ArrayList<>(partValLists.size()); for (List<String> partVals : partValLists) { byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals); Get get = new Get(key); @@ -556,7 +562,7 @@ class HBaseReadWrite { * @throws IOException */ void putPartitions(List<Partition> partitions) throws IOException { - List<Put> puts = new ArrayList<Put>(partitions.size()); + List<Put> puts = new ArrayList<>(partitions.size()); for (Partition partition : partitions) { byte[] hash = putStorageDescriptor(partition.getSd()); byte[][] serialized = HBaseUtils.serializePartition(partition, hash); @@ -615,8 +621,8 @@ class HBaseReadWrite { Collection<Partition> cached = partCache.getAllForTable(dbName, tableName); if (cached != null) { return maxPartitions < cached.size() - ? new ArrayList<Partition>(cached).subList(0, maxPartitions) - : new ArrayList<Partition>(cached); + ? new ArrayList<>(cached).subList(0, maxPartitions) + : new ArrayList<>(cached); } byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName); List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null); @@ -645,7 +651,7 @@ class HBaseReadWrite { List<Partition> scanPartitions(String dbName, String tableName, List<String> partVals, int maxPartitions) throws IOException, NoSuchObjectException { // First, build as much of the key as we can so that we make the scan as tight as possible. - List<String> keyElements = new ArrayList<String>(); + List<String> keyElements = new ArrayList<>(); keyElements.add(dbName); keyElements.add(tableName); @@ -712,7 +718,7 @@ class HBaseReadWrite { List<Partition> scanPartitions(String dbName, String tableName, byte[] keyStart, byte[] keyEnd, Filter filter, int maxPartitions) throws IOException, NoSuchObjectException { - List<String> keyElements = new ArrayList<String>(); + List<String> keyElements = new ArrayList<>(); keyElements.add(dbName); keyElements.add(tableName); @@ -780,7 +786,7 @@ class HBaseReadWrite { throws IOException { Iterator<Result> iter = scan(PART_TABLE, startRow, endRow, CATALOG_CF, CATALOG_COL, filter); - List<Partition> parts = new ArrayList<Partition>(); + List<Partition> parts = new ArrayList<>(); int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults; for (int i = 0; i < numToFetch && iter.hasNext(); i++) { Result result = iter.next(); @@ -821,7 +827,7 @@ class HBaseReadWrite { throws IOException { buildRoleCache(); - Set<String> rolesFound = new HashSet<String>(); + Set<String> rolesFound = new HashSet<>(); for (Map.Entry<String, HbaseMetastoreProto.RoleGrantInfoList> e : roleCache.entrySet()) { for (HbaseMetastoreProto.RoleGrantInfo giw : e.getValue().getGrantInfoList()) { if (HBaseUtils.convertPrincipalTypes(giw.getPrincipalType()) == type && @@ -831,8 +837,8 @@ class HBaseReadWrite { } } } - List<Role> directRoles = new ArrayList<Role>(rolesFound.size()); - List<Get> gets = new ArrayList<Get>(); + List<Role> directRoles = new ArrayList<>(rolesFound.size()); + List<Get> gets = new ArrayList<>(); HTableInterface htab = conn.getHBaseTable(ROLE_TABLE); for (String roleFound : rolesFound) { byte[] key = HBaseUtils.buildKey(roleFound); @@ -880,7 +886,7 @@ class HBaseReadWrite { */ Set<String> findAllUsersInRole(String roleName) throws IOException { // Walk the userToRole table and collect every user that matches this role. - Set<String> users = new HashSet<String>(); + Set<String> users = new HashSet<>(); Iterator<Result> iter = scan(USER_TO_ROLE_TABLE, CATALOG_CF, CATALOG_COL); while (iter.hasNext()) { Result result = iter.next(); @@ -907,8 +913,7 @@ class HBaseReadWrite { void addPrincipalToRole(String roleName, HbaseMetastoreProto.RoleGrantInfo grantInfo) throws IOException, NoSuchObjectException { HbaseMetastoreProto.RoleGrantInfoList proto = getRolePrincipals(roleName); - List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals = - new ArrayList<HbaseMetastoreProto.RoleGrantInfo>(); + List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals = new ArrayList<>(); if (proto != null) { rolePrincipals.addAll(proto.getGrantInfoList()); } @@ -937,8 +942,7 @@ class HBaseReadWrite { throws NoSuchObjectException, IOException { HbaseMetastoreProto.RoleGrantInfoList proto = getRolePrincipals(roleName); if (proto == null) return; - List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals = - new ArrayList<HbaseMetastoreProto.RoleGrantInfo>(); + List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals = new ArrayList<>(); rolePrincipals.addAll(proto.getGrantInfoList()); for (int i = 0; i < rolePrincipals.size(); i++) { @@ -976,8 +980,8 @@ class HBaseReadWrite { LOG.debug("Building role map for " + userName); // Second, find every role the user participates in directly. - Set<String> rolesToAdd = new HashSet<String>(); - Set<String> rolesToCheckNext = new HashSet<String>(); + Set<String> rolesToAdd = new HashSet<>(); + Set<String> rolesToCheckNext = new HashSet<>(); for (Map.Entry<String, HbaseMetastoreProto.RoleGrantInfoList> e : roleCache.entrySet()) { for (HbaseMetastoreProto.RoleGrantInfo grantInfo : e.getValue().getGrantInfoList()) { if (HBaseUtils.convertPrincipalTypes(grantInfo.getPrincipalType()) == PrincipalType.USER && @@ -993,7 +997,7 @@ class HBaseReadWrite { // Third, find every role the user participates in indirectly (that is, they have been // granted into role X and role Y has been granted into role X). while (rolesToCheckNext.size() > 0) { - Set<String> tmpRolesToCheckNext = new HashSet<String>(); + Set<String> tmpRolesToCheckNext = new HashSet<>(); for (String roleName : rolesToCheckNext) { HbaseMetastoreProto.RoleGrantInfoList grantInfos = roleCache.get(roleName); if (grantInfos == null) continue; // happens when a role contains no grants @@ -1010,7 +1014,7 @@ class HBaseReadWrite { } byte[] key = HBaseUtils.buildKey(userName); - byte[] serialized = HBaseUtils.serializeRoleList(new ArrayList<String>(rolesToAdd)); + byte[] serialized = HBaseUtils.serializeRoleList(new ArrayList<>(rolesToAdd)); store(USER_TO_ROLE_TABLE, key, CATALOG_CF, CATALOG_COL, serialized); } @@ -1022,12 +1026,11 @@ class HBaseReadWrite { void removeRoleGrants(String roleName) throws IOException { buildRoleCache(); - List<Put> puts = new ArrayList<Put>(); + List<Put> puts = new ArrayList<>(); // First, walk the role table and remove any references to this role for (Map.Entry<String, HbaseMetastoreProto.RoleGrantInfoList> e : roleCache.entrySet()) { boolean madeAChange = false; - List<HbaseMetastoreProto.RoleGrantInfo> rgil = - new ArrayList<HbaseMetastoreProto.RoleGrantInfo>(); + List<HbaseMetastoreProto.RoleGrantInfo> rgil = new ArrayList<>(); rgil.addAll(e.getValue().getGrantInfoList()); for (int i = 0; i < rgil.size(); i++) { if (HBaseUtils.convertPrincipalTypes(rgil.get(i).getPrincipalType()) == PrincipalType.ROLE && @@ -1066,7 +1069,7 @@ class HBaseReadWrite { // Now, walk the db table puts.clear(); List<Database> dbs = scanDatabases(null); - if (dbs == null) dbs = new ArrayList<Database>(); // rare, but can happen + if (dbs == null) dbs = new ArrayList<>(); // rare, but can happen for (Database db : dbs) { if (db.getPrivileges() != null && db.getPrivileges().getRolePrivileges() != null && @@ -1130,7 +1133,7 @@ class HBaseReadWrite { */ List<Role> scanRoles() throws IOException { Iterator<Result> iter = scan(ROLE_TABLE, CATALOG_CF, CATALOG_COL); - List<Role> roles = new ArrayList<Role>(); + List<Role> roles = new ArrayList<>(); while (iter.hasNext()) { Result result = iter.next(); roles.add(HBaseUtils.deserializeRole(result.getRow(), @@ -1199,11 +1202,11 @@ class HBaseReadWrite { List<Table> getTables(String dbName, List<String> tableNames) throws IOException { // I could implement getTable in terms of this method. But it is such a core function // that I don't want to slow it down for the much less common fetching of multiple tables. - List<Table> results = new ArrayList<Table>(tableNames.size()); + List<Table> results = new ArrayList<>(tableNames.size()); ObjectPair<String, String>[] hashKeys = new ObjectPair[tableNames.size()]; boolean atLeastOneMissing = false; for (int i = 0; i < tableNames.size(); i++) { - hashKeys[i] = new ObjectPair<String, String>(dbName, tableNames.get(i)); + hashKeys[i] = new ObjectPair<>(dbName, tableNames.get(i)); // The result may be null, but we still want to add it so that we have a slot in the list // for it. results.add(tableCache.get(hashKeys[i])); @@ -1212,7 +1215,7 @@ class HBaseReadWrite { if (!atLeastOneMissing) return results; // Now build a single get that will fetch the remaining tables - List<Get> gets = new ArrayList<Get>(); + List<Get> gets = new ArrayList<>(); HTableInterface htab = conn.getHBaseTable(TABLE_TABLE); for (int i = 0; i < tableNames.size(); i++) { if (results.get(i) != null) continue; @@ -1261,7 +1264,7 @@ class HBaseReadWrite { Iterator<Result> iter = scan(TABLE_TABLE, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), CATALOG_CF, CATALOG_COL, filter); - List<Table> tables = new ArrayList<Table>(); + List<Table> tables = new ArrayList<>(); while (iter.hasNext()) { Result result = iter.next(); HBaseUtils.StorageDescriptorParts sdParts = @@ -1284,7 +1287,7 @@ class HBaseReadWrite { byte[] hash = putStorageDescriptor(table.getSd()); byte[][] serialized = HBaseUtils.serializeTable(table, hash); store(TABLE_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]); - tableCache.put(new ObjectPair<String, String>(table.getDbName(), table.getTableName()), table); + tableCache.put(new ObjectPair<>(table.getDbName(), table.getTableName()), table); } /** @@ -1323,7 +1326,7 @@ class HBaseReadWrite { private void deleteTable(String dbName, String tableName, boolean decrementRefCnt) throws IOException { - tableCache.remove(new ObjectPair<String, String>(dbName, tableName)); + tableCache.remove(new ObjectPair<>(dbName, tableName)); if (decrementRefCnt) { // Find the table so I can get the storage descriptor and drop it Table t = getTable(dbName, tableName, false); @@ -1335,7 +1338,7 @@ class HBaseReadWrite { private Table getTable(String dbName, String tableName, boolean populateCache) throws IOException { - ObjectPair<String, String> hashKey = new ObjectPair<String, String>(dbName, tableName); + ObjectPair<String, String> hashKey = new ObjectPair<>(dbName, tableName); Table cached = tableCache.get(hashKey); if (cached != null) return cached; byte[] key = HBaseUtils.buildKey(dbName, tableName); @@ -1623,6 +1626,7 @@ class HBaseReadWrite { byte[] serialized = read(AGGR_STATS_TABLE, key, CATALOG_CF, AGGR_STATS_STATS_COL); if (serialized == null) return null; return HBaseUtils.deserializeAggrStats(serialized); + } /** @@ -1696,6 +1700,134 @@ class HBaseReadWrite { } /********************************************************************************************** + * Security related methods + *********************************************************************************************/ + + /** + * Fetch a delegation token + * @param tokId identifier of the token to fetch + * @return the delegation token, or null if there is no such delegation token + * @throws IOException + */ + String getDelegationToken(String tokId) throws IOException { + byte[] key = HBaseUtils.buildKey(tokId); + byte[] serialized = read(SECURITY_TABLE, key, CATALOG_CF, DELEGATION_TOKEN_COL); + if (serialized == null) return null; + return HBaseUtils.deserializeDelegationToken(serialized); + } + + /** + * Get all delegation token ids + * @return list of all delegation token identifiers + * @throws IOException + */ + List<String> scanDelegationTokenIdentifiers() throws IOException { + Iterator<Result> iter = scan(SECURITY_TABLE, CATALOG_CF, DELEGATION_TOKEN_COL); + List<String> ids = new ArrayList<>(); + while (iter.hasNext()) { + Result result = iter.next(); + byte[] serialized = result.getValue(CATALOG_CF, DELEGATION_TOKEN_COL); + if (serialized != null) { + // Don't deserialize the value, as what we're after is the key. We just had to check the + // value wasn't null in order to check this is a record with a delegation token and not a + // master key. + ids.add(new String(result.getRow(), HBaseUtils.ENCODING)); + + } + } + return ids; + } + + /** + * Store a delegation token + * @param tokId token id + * @param token delegation token to store + * @throws IOException + */ + void putDelegationToken(String tokId, String token) throws IOException { + byte[][] serialized = HBaseUtils.serializeDelegationToken(tokId, token); + store(SECURITY_TABLE, serialized[0], CATALOG_CF, DELEGATION_TOKEN_COL, serialized[1]); + } + + /** + * Delete a delegation token + * @param tokId identifier of token to drop + * @throws IOException + */ + void deleteDelegationToken(String tokId) throws IOException { + byte[] key = HBaseUtils.buildKey(tokId); + delete(SECURITY_TABLE, key, CATALOG_CF, DELEGATION_TOKEN_COL); + } + + /** + * Fetch a master key + * @param seqNo sequence number of the master key + * @return the master key, or null if there is no such master key + * @throws IOException + */ + String getMasterKey(Integer seqNo) throws IOException { + byte[] key = HBaseUtils.buildKey(seqNo.toString()); + byte[] serialized = read(SECURITY_TABLE, key, CATALOG_CF, MASTER_KEY_COL); + if (serialized == null) return null; + return HBaseUtils.deserializeMasterKey(serialized); + } + + /** + * Get all master keys + * @return list of all master keys + * @throws IOException + */ + List<String> scanMasterKeys() throws IOException { + Iterator<Result> iter = scan(SECURITY_TABLE, CATALOG_CF, MASTER_KEY_COL); + List<String> keys = new ArrayList<>(); + while (iter.hasNext()) { + Result result = iter.next(); + byte[] serialized = result.getValue(CATALOG_CF, MASTER_KEY_COL); + if (serialized != null) { + keys.add(HBaseUtils.deserializeMasterKey(serialized)); + + } + } + return keys; + } + + /** + * Store a master key + * @param seqNo sequence number + * @param key master key to store + * @throws IOException + */ + void putMasterKey(Integer seqNo, String key) throws IOException { + byte[][] serialized = HBaseUtils.serializeMasterKey(seqNo, key); + store(SECURITY_TABLE, serialized[0], CATALOG_CF, MASTER_KEY_COL, serialized[1]); + } + + /** + * Delete a master key + * @param seqNo sequence number of master key to delete + * @throws IOException + */ + void deleteMasterKey(Integer seqNo) throws IOException { + byte[] key = HBaseUtils.buildKey(seqNo.toString()); + delete(SECURITY_TABLE, key, CATALOG_CF, MASTER_KEY_COL); + } + + /********************************************************************************************** + * Sequence methods + *********************************************************************************************/ + + long getNextSequence(byte[] sequence) throws IOException { + byte[] serialized = read(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence); + long val = 0; + if (serialized != null) { + val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); + } + byte[] incrSerialized = new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING); + store(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence, incrSerialized); + return val; + } + + /********************************************************************************************** * Cache methods *********************************************************************************************/ @@ -1772,8 +1904,7 @@ class HBaseReadWrite { htab.delete(d); } - private Iterator<Result> scan(String table, byte[] colFam, - byte[] colName) throws IOException { + private Iterator<Result> scan(String table, byte[] colFam, byte[] colName) throws IOException { return scan(table, null, null, colFam, colName, null); } http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 9782859..744070d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -1613,43 +1613,128 @@ public class HBaseStore implements RawStore { @Override public boolean addToken(String tokenIdentifier, String delegationToken) { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + getHBase().putDelegationToken(tokenIdentifier, delegationToken); + commit = true; + return commit; // See HIVE-11302, for now always returning true + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); + } } @Override public boolean removeToken(String tokenIdentifier) { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + getHBase().deleteDelegationToken(tokenIdentifier); + commit = true; + return commit; // See HIVE-11302, for now always returning true + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); + } } @Override public String getToken(String tokenIdentifier) { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + String token = getHBase().getDelegationToken(tokenIdentifier); + commit = true; + return token; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); + } } @Override public List<String> getAllTokenIdentifiers() { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + List<String> ids = getHBase().scanDelegationTokenIdentifiers(); + commit = true; + return ids; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); + } } @Override public int addMasterKey(String key) throws MetaException { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + long seq = getHBase().getNextSequence(HBaseReadWrite.MASTER_KEY_SEQUENCE); + getHBase().putMasterKey((int) seq, key); + commit = true; + return (int)seq; + } catch (IOException e) { + LOG.error("Unable to add master key", e); + throw new MetaException("Failed adding master key, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } @Override public void updateMasterKey(Integer seqNo, String key) throws NoSuchObjectException, MetaException { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + if (getHBase().getMasterKey(seqNo) == null) { + throw new NoSuchObjectException("No key found with keyId: " + seqNo); + } + getHBase().putMasterKey(seqNo, key); + commit = true; + } catch (IOException e) { + LOG.error("Unable to update master key", e); + throw new MetaException("Failed updating master key, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } @Override public boolean removeMasterKey(Integer keySeq) { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + getHBase().deleteMasterKey(keySeq); + commit = true; + return true; // See HIVE-11302, for now always returning true + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); + } } @Override public String[] getMasterKeys() { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + List<String> keys = getHBase().scanMasterKeys(); + commit = true; + return keys.toArray(new String[keys.size()]); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 4d57af2..62bb4de 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -104,8 +104,7 @@ class HBaseUtils { } private static HbaseMetastoreProto.Parameters buildParameters(Map<String, String> params) { - List<HbaseMetastoreProto.ParameterEntry> entries = - new ArrayList<HbaseMetastoreProto.ParameterEntry>(); + List<HbaseMetastoreProto.ParameterEntry> entries = new ArrayList<>(); for (Map.Entry<String, String> e : params.entrySet()) { entries.add( HbaseMetastoreProto.ParameterEntry.newBuilder() @@ -119,7 +118,7 @@ class HBaseUtils { } private static Map<String, String> buildParameters(HbaseMetastoreProto.Parameters protoParams) { - Map<String, String> params = new HashMap<String, String>(); + Map<String, String> params = new HashMap<>(); for (HbaseMetastoreProto.ParameterEntry pe : protoParams.getParameterList()) { params.put(pe.getKey(), pe.getValue()); } @@ -129,8 +128,7 @@ class HBaseUtils { private static List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry> buildPrincipalPrivilegeSetEntry(Map<String, List<PrivilegeGrantInfo>> entries) { - List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry> results = - new ArrayList<HbaseMetastoreProto.PrincipalPrivilegeSetEntry>(); + List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry> results = new ArrayList<>(); for (Map.Entry<String, List<PrivilegeGrantInfo>> entry : entries.entrySet()) { results.add(HbaseMetastoreProto.PrincipalPrivilegeSetEntry.newBuilder() .setPrincipalName(entry.getKey()) @@ -142,8 +140,7 @@ class HBaseUtils { private static List<HbaseMetastoreProto.PrivilegeGrantInfo> buildPrivilegeGrantInfo( List<PrivilegeGrantInfo> privileges) { - List<HbaseMetastoreProto.PrivilegeGrantInfo> results = - new ArrayList<HbaseMetastoreProto.PrivilegeGrantInfo>(); + List<HbaseMetastoreProto.PrivilegeGrantInfo> results = new ArrayList<>(); for (PrivilegeGrantInfo privilege : privileges) { HbaseMetastoreProto.PrivilegeGrantInfo.Builder builder = HbaseMetastoreProto.PrivilegeGrantInfo.newBuilder(); @@ -187,8 +184,7 @@ class HBaseUtils { private static Map<String, List<PrivilegeGrantInfo>> convertPrincipalPrivilegeSetEntries( List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry> entries) { - Map<String, List<PrivilegeGrantInfo>> map = - new HashMap<String, List<PrivilegeGrantInfo>>(); + Map<String, List<PrivilegeGrantInfo>> map = new HashMap<>(); for (HbaseMetastoreProto.PrincipalPrivilegeSetEntry entry : entries) { map.put(entry.getPrincipalName(), convertPrivilegeGrantInfos(entry.getPrivilegesList())); } @@ -197,7 +193,7 @@ class HBaseUtils { private static List<PrivilegeGrantInfo> convertPrivilegeGrantInfos( List<HbaseMetastoreProto.PrivilegeGrantInfo> privileges) { - List<PrivilegeGrantInfo> results = new ArrayList<PrivilegeGrantInfo>(); + List<PrivilegeGrantInfo> results = new ArrayList<>(); for (HbaseMetastoreProto.PrivilegeGrantInfo proto : privileges) { PrivilegeGrantInfo pgi = new PrivilegeGrantInfo(); if (proto.hasPrivilege()) pgi.setPrivilege(proto.getPrivilege()); @@ -316,7 +312,7 @@ class HBaseUtils { static List<String> deserializeRoleList(byte[] value) throws InvalidProtocolBufferException { HbaseMetastoreProto.RoleList proto = HbaseMetastoreProto.RoleList.parseFrom(value); - return new ArrayList<String>(proto.getRoleList()); + return new ArrayList<>(proto.getRoleList()); } /** @@ -491,7 +487,7 @@ class HBaseUtils { private static List<FieldSchema> convertFieldSchemaListFromProto(List<HbaseMetastoreProto.FieldSchema> protoList) { - List<FieldSchema> schemas = new ArrayList<FieldSchema>(protoList.size()); + List<FieldSchema> schemas = new ArrayList<>(protoList.size()); for (HbaseMetastoreProto.FieldSchema proto : protoList) { schemas.add(new FieldSchema(proto.getName(), proto.getType(), proto.hasComment() ? proto.getComment() : null)); @@ -501,8 +497,7 @@ class HBaseUtils { private static List<HbaseMetastoreProto.FieldSchema> convertFieldSchemaListToProto(List<FieldSchema> schemas) { - List<HbaseMetastoreProto.FieldSchema> protoList = - new ArrayList<HbaseMetastoreProto.FieldSchema>(schemas.size()); + List<HbaseMetastoreProto.FieldSchema> protoList = new ArrayList<>(schemas.size()); for (FieldSchema fs : schemas) { HbaseMetastoreProto.FieldSchema.Builder builder = HbaseMetastoreProto.FieldSchema.newBuilder(); @@ -552,8 +547,7 @@ class HBaseUtils { } if (sd.getSortCols() != null) { List<Order> orders = sd.getSortCols(); - List<HbaseMetastoreProto.StorageDescriptor.Order> protoList = - new ArrayList<HbaseMetastoreProto.StorageDescriptor.Order>(orders.size()); + List<HbaseMetastoreProto.StorageDescriptor.Order> protoList = new ArrayList<>(orders.size()); for (Order order : orders) { protoList.add(HbaseMetastoreProto.StorageDescriptor.Order.newBuilder() .setColumnName(order.getCol()) @@ -625,7 +619,7 @@ class HBaseUtils { md.update(serde.getSerializationLib().getBytes(ENCODING)); } if (serde.getParameters() != null) { - SortedMap<String, String> params = new TreeMap<String, String>(serde.getParameters()); + SortedMap<String, String> params = new TreeMap<>(serde.getParameters()); for (Map.Entry<String, String> param : params.entrySet()) { md.update(param.getKey().getBytes(ENCODING)); md.update(param.getValue().getBytes(ENCODING)); @@ -633,11 +627,11 @@ class HBaseUtils { } } if (sd.getBucketCols() != null) { - SortedSet<String> bucketCols = new TreeSet<String>(sd.getBucketCols()); + SortedSet<String> bucketCols = new TreeSet<>(sd.getBucketCols()); for (String bucket : bucketCols) md.update(bucket.getBytes(ENCODING)); } if (sd.getSortCols() != null) { - SortedSet<Order> orders = new TreeSet<Order>(sd.getSortCols()); + SortedSet<Order> orders = new TreeSet<>(sd.getSortCols()); for (Order order : orders) { md.update(order.getCol().getBytes(ENCODING)); md.update(Integer.toString(order.getOrder()).getBytes(ENCODING)); @@ -646,21 +640,21 @@ class HBaseUtils { if (sd.getSkewedInfo() != null) { SkewedInfo skewed = sd.getSkewedInfo(); if (skewed.getSkewedColNames() != null) { - SortedSet<String> colnames = new TreeSet<String>(skewed.getSkewedColNames()); + SortedSet<String> colnames = new TreeSet<>(skewed.getSkewedColNames()); for (String colname : colnames) md.update(colname.getBytes(ENCODING)); } if (skewed.getSkewedColValues() != null) { - SortedSet<String> sortedOuterList = new TreeSet<String>(); + SortedSet<String> sortedOuterList = new TreeSet<>(); for (List<String> innerList : skewed.getSkewedColValues()) { - SortedSet<String> sortedInnerList = new TreeSet<String>(innerList); + SortedSet<String> sortedInnerList = new TreeSet<>(innerList); sortedOuterList.add(StringUtils.join(sortedInnerList, ".")); } for (String colval : sortedOuterList) md.update(colval.getBytes(ENCODING)); } if (skewed.getSkewedColValueLocationMaps() != null) { - SortedMap<String, String> sortedMap = new TreeMap<String, String>(); + SortedMap<String, String> sortedMap = new TreeMap<>(); for (Map.Entry<List<String>, String> smap : skewed.getSkewedColValueLocationMaps().entrySet()) { - SortedSet<String> sortedKey = new TreeSet<String>(smap.getKey()); + SortedSet<String> sortedKey = new TreeSet<>(smap.getKey()); sortedMap.put(StringUtils.join(sortedKey, "."), smap.getValue()); } for (Map.Entry<String, String> e : sortedMap.entrySet()) { @@ -690,8 +684,8 @@ class HBaseUtils { serde.setParameters(buildParameters(proto.getSerdeInfo().getParameters())); sd.setSerdeInfo(serde); } - sd.setBucketCols(new ArrayList<String>(proto.getBucketColsList())); - List<Order> sortCols = new ArrayList<Order>(); + sd.setBucketCols(new ArrayList<>(proto.getBucketColsList())); + List<Order> sortCols = new ArrayList<>(); for (HbaseMetastoreProto.StorageDescriptor.Order protoOrder : proto.getSortColsList()) { sortCols.add(new Order(protoOrder.getColumnName(), protoOrder.getOrder())); } @@ -699,15 +693,15 @@ class HBaseUtils { if (proto.hasSkewedInfo()) { SkewedInfo skewed = new SkewedInfo(); skewed - .setSkewedColNames(new ArrayList<String>(proto.getSkewedInfo().getSkewedColNamesList())); + .setSkewedColNames(new ArrayList<>(proto.getSkewedInfo().getSkewedColNamesList())); for (HbaseMetastoreProto.StorageDescriptor.SkewedInfo.SkewedColValueList innerList : proto.getSkewedInfo().getSkewedColValuesList()) { - skewed.addToSkewedColValues(new ArrayList<String>(innerList.getSkewedColValueList())); + skewed.addToSkewedColValues(new ArrayList<>(innerList.getSkewedColValueList())); } - Map<List<String>, String> colMaps = new HashMap<List<String>, String>(); + Map<List<String>, String> colMaps = new HashMap<>(); for (HbaseMetastoreProto.StorageDescriptor.SkewedInfo.SkewedColValueLocationMap map : proto.getSkewedInfo().getSkewedColValueLocationMapsList()) { - colMaps.put(new ArrayList<String>(map.getKeyList()), map.getValue()); + colMaps.put(new ArrayList<>(map.getKeyList()), map.getValue()); } skewed.setSkewedColValueLocationMaps(colMaps); sd.setSkewedInfo(skewed); @@ -742,7 +736,7 @@ class HBaseUtils { } static byte[] buildPartitionKey(String dbName, String tableName, List<String> partVals) { - Deque<String> keyParts = new ArrayDeque<String>(partVals); + Deque<String> keyParts = new ArrayDeque<>(partVals); keyParts.addFirst(tableName); keyParts.addFirst(dbName); return buildKey(keyParts.toArray(new String[keyParts.size()])); @@ -1135,6 +1129,61 @@ class HBaseUtils { } /** + * Serialize a delegation token + * @param tokenIdentifier + * @param delegationToken + * @return two byte arrays, first contains the key, the second the serialized value. + */ + static byte[][] serializeDelegationToken(String tokenIdentifier, String delegationToken) { + byte[][] result = new byte[2][]; + result[0] = buildKey(tokenIdentifier); + result[1] = HbaseMetastoreProto.DelegationToken.newBuilder() + .setTokenStr(delegationToken) + .build() + .toByteArray(); + return result; + } + + /** + * Deserialize a delegation token. + * @param value value fetched from hbase + * @return A delegation token. + * @throws InvalidProtocolBufferException + */ + static String deserializeDelegationToken(byte[] value) throws InvalidProtocolBufferException { + HbaseMetastoreProto.DelegationToken protoToken = + HbaseMetastoreProto.DelegationToken.parseFrom(value); + return protoToken.getTokenStr(); + } + + /** + * Serialize a master key + * @param seqNo + * @param key + * @return two byte arrays, first contains the key, the second the serialized value. + */ + static byte[][] serializeMasterKey(Integer seqNo, String key) { + byte[][] result = new byte[2][]; + result[0] = buildKey(seqNo.toString()); + result[1] = HbaseMetastoreProto.MasterKey.newBuilder() + .setMasterKey(key) + .build() + .toByteArray(); + return result; + } + + /** + * Deserialize a master key. + * @param value value fetched from hbase + * @return A master key + * @throws InvalidProtocolBufferException + */ + static String deserializeMasterKey(byte[] value) throws InvalidProtocolBufferException { + HbaseMetastoreProto.MasterKey protoKey = HbaseMetastoreProto.MasterKey.parseFrom(value); + return protoKey.getMasterKey(); + } + + /** * @param keyStart byte array representing the start prefix * @return byte array corresponding to the next possible prefix */ http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto ---------------------------------------------------------------------- diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto index 3cd8867..cba3671 100644 --- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto +++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto @@ -104,6 +104,10 @@ message Database { optional PrincipalType owner_type = 6; } +message DelegationToken { + required string token_str = 1; +} + message FieldSchema { required string name = 1; required string type = 2; @@ -133,6 +137,10 @@ message Function { repeated ResourceUri resource_uris = 6; } +message MasterKey { + required string master_key = 1; +} + message ParameterEntry { required string key = 1; required string value = 2; @@ -247,8 +255,3 @@ message Table { optional PrincipalPrivilegeSet privileges = 13; optional bool is_temporary = 14; } - - - - - http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java index 9878499..fac7dcc 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -1246,56 +1245,6 @@ public class TestHBaseStore { Assert.assertEquals(decimalData.getNumDVs(), decimalDataFromDB.getNumDVs()); } - // TODO: Activate this test, when we are able to mock the HBaseReadWrite.NO_CACHE_CONF set to false - // Right now, I have tested this by using aggrStatsCache despite NO_CACHE_CONF set to true - // Also need to add tests for other data types + refactor a lot of duplicate code in stats testing - //@Test - public void AggrStats() throws Exception { - int numParts = 3; - ColumnStatistics stats; - ColumnStatisticsDesc desc; - ColumnStatisticsObj obj; - List<String> partNames = new ArrayList<String>(); - List<String> colNames = new ArrayList<String>(); - colNames.add(BOOLEAN_COL); - // Add boolean col stats to DB for numParts partitions: - // PART_VALS(0), PART_VALS(1) & PART_VALS(2) for PART_KEYS(0) - for (int i = 0; i < numParts; i++) { - stats = new ColumnStatistics(); - // Get a default ColumnStatisticsDesc for partition level stats - desc = getMockPartColStatsDesc(0, i); - stats.setStatsDesc(desc); - partNames.add(desc.getPartName()); - // Get one of the pre-created ColumnStatisticsObj - obj = booleanColStatsObjs.get(i); - stats.addToStatsObj(obj); - // Add to DB - List<String> parVals = new ArrayList<String>(); - parVals.add(PART_VALS.get(i)); - store.updatePartitionColumnStatistics(stats, parVals); - } - // Read aggregate stats - AggrStats aggrStatsFromDB = store.get_aggr_stats_for(DB, TBL, partNames, colNames); - // Verify - Assert.assertEquals(1, aggrStatsFromDB.getColStatsSize()); - ColumnStatisticsObj objFromDB = aggrStatsFromDB.getColStats().get(0); - Assert.assertNotNull(objFromDB); - // Aggregate our mock values - long numTrues = 0, numFalses = 0, numNulls = 0; - BooleanColumnStatsData boolData;; - for (int i = 0; i < numParts; i++) { - boolData = booleanColStatsObjs.get(i).getStatsData().getBooleanStats(); - numTrues = numTrues + boolData.getNumTrues(); - numFalses = numFalses + boolData.getNumFalses(); - numNulls = numNulls + boolData.getNumNulls(); - } - // Compare with what we got from the method call - BooleanColumnStatsData boolDataFromDB = objFromDB.getStatsData().getBooleanStats(); - Assert.assertEquals(numTrues, boolDataFromDB.getNumTrues()); - Assert.assertEquals(numFalses, boolDataFromDB.getNumFalses()); - Assert.assertEquals(numNulls, boolDataFromDB.getNumNulls()); - } - /** * Returns a dummy table level ColumnStatisticsDesc with default values */