HIVE-11389 hbase import should allow partial imports and should work in parallel (gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0fa45e4a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0fa45e4a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0fa45e4a Branch: refs/heads/master Commit: 0fa45e4a562fc2586b1ef06a88e9c186a0835316 Parents: 7e7f461 Author: Alan Gates <ga...@hortonworks.com> Authored: Fri Jul 31 11:07:00 2015 -0700 Committer: Alan Gates <ga...@hortonworks.com> Committed: Fri Jul 31 11:07:00 2015 -0700 ---------------------------------------------------------------------- .../hive/metastore/hbase/TestHBaseImport.java | 557 +++++++++++++++++-- .../hive/metastore/hbase/HBaseImport.java | 435 +++++++++++++-- 2 files changed, 899 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0fa45e4a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java index 7bdff18..1ac10f0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java @@ -26,6 +26,9 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; @@ -38,12 +41,16 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Test that import from an RDBMS based metastore works @@ -52,6 +59,13 @@ public class TestHBaseImport extends HBaseIntegrationTests { private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName()); + private static final String[] tableNames = new String[] {"allnonparttable", "allparttable"}; + private static final String[] partVals = new String[] {"na", "emea", "latam", "apac"}; + private static final String[] funcNames = new String[] {"allfunc1", "allfunc2"}; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass public static void startup() throws Exception { HBaseIntegrationTests.startMiniCluster(); @@ -69,25 +83,396 @@ public class TestHBaseImport extends HBaseIntegrationTests { } @Test - public void doImport() throws Exception { - RawStore rdbms = new ObjectStore(); + public void importAll() throws Exception { + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"alldb1", "alldb2"}; + String[] roles = new String[] {"allrole1", "allrole2"}; + String[] tokenIds = new String[] {"alltokenid1", "alltokenid2"}; + String[] tokens = new String[] {"alltoken1", "alltoken2"}; + String[] masterKeys = new String[] {"allmk1", "allmk2"}; + int now = (int)System.currentTimeMillis() / 1000; + + setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now); + + int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size(); + int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size(); + + HBaseImport importer = new HBaseImport("-a"); + importer.setConnections(rdbms, store); + importer.run(); + + for (int i = 0; i < roles.length; i++) { + Role role = store.getRole(roles[i]); + Assert.assertNotNull(role); + Assert.assertEquals(roles[i], role.getRoleName()); + } + // Make sure there aren't any extra roles + Assert.assertEquals(baseNumRoles + 2, store.listRoleNames().size()); + + for (int i = 0; i < dbNames.length; i++) { + Database db = store.getDatabase(dbNames[i]); + Assert.assertNotNull(db); + // check one random value in the db rather than every value + Assert.assertEquals("file:/tmp", db.getLocationUri()); + + Table table = store.getTable(db.getName(), tableNames[0]); + Assert.assertNotNull(table); + Assert.assertEquals(now, table.getLastAccessTime()); + Assert.assertEquals("input", table.getSd().getInputFormat()); + + table = store.getTable(db.getName(), tableNames[1]); + Assert.assertNotNull(table); + + for (int j = 0; j < partVals.length; j++) { + Partition part = store.getPartition(dbNames[i], tableNames[1], Arrays.asList(partVals[j])); + Assert.assertNotNull(part); + Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation()); + } + + Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size()); + Assert.assertEquals(2, store.getAllTables(dbNames[i]).size()); + + Assert.assertEquals(2, store.getFunctions(dbNames[i], "*").size()); + for (int j = 0; j < funcNames.length; j++) { + Assert.assertNotNull(store.getFunction(dbNames[i], funcNames[j])); + } + } + + Assert.assertEquals(baseNumDbs + 2, store.getAllDatabases().size()); + + // I can't test total number of tokens or master keys because the import grabs all and copies + // them, which means it grabs the ones imported by importSecurity test (if it's already run). + // Depending on it already running would make the tests order dependent, which junit doesn't + // guarantee. + for (int i = 0; i < tokenIds.length; i++) { + Assert.assertEquals(tokens[i], store.getToken(tokenIds[i])); + } + String[] hbaseKeys = store.getMasterKeys(); + Set<String> keys = new HashSet<>(Arrays.asList(hbaseKeys)); + for (int i = 0; i < masterKeys.length; i++) { + Assert.assertTrue(keys.contains(masterKeys[i])); + } + } + + @Test + public void importOneDb() throws Exception { + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"onedbdb1", "onedbdb2"}; + String[] roles = new String[] {"onedbrole1", "onedbrole2"}; + String[] tokenIds = new String[] {"onedbtokenid1", "onedbtokenid2"}; + String[] tokens = new String[] {"onedbtoken1", "onedbtoken2"}; + String[] masterKeys = new String[] {"onedbmk1", "onedbmk2"}; + int now = (int)System.currentTimeMillis() / 1000; + + setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now); + + int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size(); + int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size(); + int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 : + store.getAllTokenIdentifiers().size(); + int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length; + + HBaseImport importer = new HBaseImport("-d", dbNames[0]); + importer.setConnections(rdbms, store); + importer.run(); + + // Make sure there aren't any extra roles + Assert.assertEquals(baseNumRoles, store.listRoleNames().size()); + + Database db = store.getDatabase(dbNames[0]); + Assert.assertNotNull(db); + // check one random value in the db rather than every value + Assert.assertEquals("file:/tmp", db.getLocationUri()); + + Table table = store.getTable(db.getName(), tableNames[0]); + Assert.assertNotNull(table); + Assert.assertEquals(now, table.getLastAccessTime()); + Assert.assertEquals("input", table.getSd().getInputFormat()); + + table = store.getTable(db.getName(), tableNames[1]); + Assert.assertNotNull(table); + + for (int j = 0; j < partVals.length; j++) { + Partition part = store.getPartition(dbNames[0], tableNames[1], Arrays.asList(partVals[j])); + Assert.assertNotNull(part); + Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation()); + } + + Assert.assertEquals(4, store.getPartitions(dbNames[0], tableNames[1], -1).size()); + Assert.assertEquals(2, store.getAllTables(dbNames[0]).size()); + + Assert.assertEquals(2, store.getFunctions(dbNames[0], "*").size()); + for (int j = 0; j < funcNames.length; j++) { + Assert.assertNotNull(store.getFunction(dbNames[0], funcNames[j])); + } + + Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size()); + + Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size()); + String[] hbaseKeys = store.getMasterKeys(); + Assert.assertEquals(baseNumKeys, hbaseKeys.length); + + // Have to do this last as it will throw an exception + thrown.expect(NoSuchObjectException.class); + store.getDatabase(dbNames[1]); + } + + @Test + public void importOneFunc() throws Exception { + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"onefuncdb1", "onefuncdb2"}; + String[] roles = new String[] {"onefuncrole1", "onefuncrole2"}; + String[] tokenIds = new String[] {"onefunctokenid1", "onefunctokenid2"}; + String[] tokens = new String[] {"onefunctoken1", "onefunctoken2"}; + String[] masterKeys = new String[] {"onefuncmk1", "onefuncmk2"}; + int now = (int)System.currentTimeMillis() / 1000; + + setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now); + + int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size(); + int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size(); + int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 : + store.getAllTokenIdentifiers().size(); + int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length; + + // Create the database so I can put the function in it. + store.createDatabase( + new Database(dbNames[0], "no description", "file:/tmp", emptyParameters)); + + HBaseImport importer = new HBaseImport("-f", dbNames[0] + "." + funcNames[0]); + importer.setConnections(rdbms, store); + importer.run(); + + // Make sure there aren't any extra roles + Assert.assertEquals(baseNumRoles, store.listRoleNames().size()); + + Database db = store.getDatabase(dbNames[0]); + Assert.assertNotNull(db); + + Assert.assertEquals(0, store.getAllTables(dbNames[0]).size()); + Assert.assertEquals(1, store.getFunctions(dbNames[0], "*").size()); + Assert.assertNotNull(store.getFunction(dbNames[0], funcNames[0])); + Assert.assertNull(store.getFunction(dbNames[0], funcNames[1])); + + Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size()); + + Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size()); + String[] hbaseKeys = store.getMasterKeys(); + Assert.assertEquals(baseNumKeys, hbaseKeys.length); + } + + @Test + public void importOneTableNonPartitioned() throws Exception { + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"onetabdb1", "onetabdb2"}; + String[] roles = new String[] {"onetabrole1", "onetabrole2"}; + String[] tokenIds = new String[] {"onetabtokenid1", "onetabtokenid2"}; + String[] tokens = new String[] {"onetabtoken1", "onetabtoken2"}; + String[] masterKeys = new String[] {"onetabmk1", "onetabmk2"}; + int now = (int)System.currentTimeMillis() / 1000; + + setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now); + + int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size(); + int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size(); + int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 : + store.getAllTokenIdentifiers().size(); + int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length; + + // Create the database so I can put the table in it. + store.createDatabase( + new Database(dbNames[0], "no description", "file:/tmp", emptyParameters)); + + HBaseImport importer = new HBaseImport("-t", dbNames[0] + "." + tableNames[0]); + importer.setConnections(rdbms, store); + importer.run(); + + // Make sure there aren't any extra roles + Assert.assertEquals(baseNumRoles, store.listRoleNames().size()); + + Database db = store.getDatabase(dbNames[0]); + Assert.assertNotNull(db); + + Table table = store.getTable(db.getName(), tableNames[0]); + Assert.assertNotNull(table); + Assert.assertEquals(1, store.getAllTables(db.getName()).size()); + Assert.assertNull(store.getTable(db.getName(), tableNames[1])); + + Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size()); + Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size()); + + Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size()); + String[] hbaseKeys = store.getMasterKeys(); + Assert.assertEquals(baseNumKeys, hbaseKeys.length); + + } + + @Test + public void importOneTablePartitioned() throws Exception { + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"onetabpartdb1", "onetabpartodb2"}; + String[] roles = new String[] {"onetabpartorole1", "onetabpartorole2"}; + String[] tokenIds = new String[] {"onetabpartotokenid1", "onetabpartotokenid2"}; + String[] tokens = new String[] {"onetabpartotoken1", "onetabpartotoken2"}; + String[] masterKeys = new String[] {"onetabpartomk1", "onetabpartomk2"}; + int now = (int)System.currentTimeMillis() / 1000; + + setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now); + + int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size(); + int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size(); + int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 : + store.getAllTokenIdentifiers().size(); + int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length; + + // Create the database so I can put the table in it. + store.createDatabase( + new Database(dbNames[0], "no description", "file:/tmp", emptyParameters)); + + HBaseImport importer = new HBaseImport("-t", dbNames[0] + "." + tableNames[1]); + importer.setConnections(rdbms, store); + importer.run(); + + // Make sure there aren't any extra roles + Assert.assertEquals(baseNumRoles, store.listRoleNames().size()); + + Database db = store.getDatabase(dbNames[0]); + Assert.assertNotNull(db); + + Table table = store.getTable(db.getName(), tableNames[1]); + Assert.assertNotNull(table); + Assert.assertEquals(1, store.getAllTables(db.getName()).size()); + + for (int j = 0; j < partVals.length; j++) { + Partition part = store.getPartition(dbNames[0], tableNames[1], Arrays.asList(partVals[j])); + Assert.assertNotNull(part); + Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation()); + } + Assert.assertEquals(4, store.getPartitions(dbNames[0], tableNames[1], -1).size()); + + Assert.assertNull(store.getTable(db.getName(), tableNames[0])); + + Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size()); + Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size()); + + Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size()); + String[] hbaseKeys = store.getMasterKeys(); + Assert.assertEquals(baseNumKeys, hbaseKeys.length); + } + + @Test + public void importSecurity() throws Exception { + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"securitydb1", "securitydb2"}; + String[] roles = new String[] {"securityrole1", "securityrole2"}; + String[] tokenIds = new String[] {"securitytokenid1", "securitytokenid2"}; + String[] tokens = new String[] {"securitytoken1", "securitytoken2"}; + String[] masterKeys = new String[] {"securitymk1", "securitymk2"}; + int now = (int)System.currentTimeMillis() / 1000; + + setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now); + + int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size(); + int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size(); + + HBaseImport importer = new HBaseImport("-k"); + importer.setConnections(rdbms, store); + importer.run(); + + Assert.assertEquals(baseNumRoles, store.listRoleNames().size()); + + Assert.assertEquals(baseNumDbs, store.getAllDatabases().size()); + + // I can't test total number of tokens or master keys because the import grabs all and copies + // them, which means it grabs the ones imported by importAll test (if it's already run). + // Depending on it already running would make the tests order dependent, which junit doesn't + // guarantee. + for (int i = 0; i < tokenIds.length; i++) { + Assert.assertEquals(tokens[i], store.getToken(tokenIds[i])); + } + String[] hbaseKeys = store.getMasterKeys(); + Set<String> keys = new HashSet<>(Arrays.asList(hbaseKeys)); + for (int i = 0; i < masterKeys.length; i++) { + Assert.assertTrue(keys.contains(masterKeys[i])); + } + } + + // TODO test for bogus function name + // TODO test for bogus table name + // TODO test for non-existent items + + @Test + public void importOneRole() throws Exception { + RawStore rdbms; + rdbms = new ObjectStore(); rdbms.setConf(conf); - String[] dbNames = new String[] {"importdb1", "importdb2"}; - String[] tableNames = new String[] {"nonparttable", "parttable"}; - String[] partVals = new String[] {"na", "emea", "latam", "apac"}; - String[] funcNames = new String[] {"func1", "func2"}; - String[] roles = new String[] {"role1", "role2"}; + String[] dbNames = new String[] {"oneroledb1", "oneroledb2"}; + String[] roles = new String[] {"onerolerole1", "onerolerole2"}; + String[] tokenIds = new String[] {"oneroletokenid1", "oneroletokenid2"}; + String[] tokens = new String[] {"oneroletoken1", "oneroletoken2"}; + String[] masterKeys = new String[] {"onerolemk1", "onerolemk2"}; int now = (int)System.currentTimeMillis() / 1000; + setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now); + + int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size(); + int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size(); + int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 : + store.getAllTokenIdentifiers().size(); + int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length; + + HBaseImport importer = new HBaseImport("-r", roles[0]); + importer.setConnections(rdbms, store); + importer.run(); + + Role role = store.getRole(roles[0]); + Assert.assertNotNull(role); + Assert.assertEquals(roles[0], role.getRoleName()); + + // Make sure there aren't any extra roles + Assert.assertEquals(baseNumRoles + 1, store.listRoleNames().size()); + Assert.assertEquals(baseNumDbs, store.getAllDatabases().size()); + + Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size()); + String[] hbaseKeys = store.getMasterKeys(); + Assert.assertEquals(baseNumKeys, hbaseKeys.length); + + // Have to do this last as it will throw an exception + thrown.expect(NoSuchObjectException.class); + store.getRole(roles[1]); + } + + private void setupObjectStore(RawStore rdbms, String[] roles, String[] dbNames, + String[] tokenIds, String[] tokens, String[] masterKeys, int now) + throws MetaException, InvalidObjectException, NoSuchObjectException { for (int i = 0; i < roles.length; i++) { rdbms.addRole(roles[i], "me"); } for (int i = 0; i < dbNames.length; i++) { - rdbms.createDatabase(new Database(dbNames[i], "no description", "file:/tmp", emptyParameters)); + rdbms.createDatabase( + new Database(dbNames[i], "no description", "file:/tmp", emptyParameters)); - List<FieldSchema> cols = new ArrayList<FieldSchema>(); + List<FieldSchema> cols = new ArrayList<>(); cols.add(new FieldSchema("col1", "int", "nocomment")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, @@ -95,7 +480,7 @@ public class TestHBaseImport extends HBaseIntegrationTests { rdbms.createTable(new Table(tableNames[0], dbNames[i], "me", now, now, 0, sd, null, emptyParameters, null, null, null)); - List<FieldSchema> partCols = new ArrayList<FieldSchema>(); + List<FieldSchema> partCols = new ArrayList<>(); partCols.add(new FieldSchema("region", "string", "")); rdbms.createTable(new Table(tableNames[1], dbNames[i], "me", now, now, 0, sd, partCols, emptyParameters, null, null, null)); @@ -105,57 +490,149 @@ public class TestHBaseImport extends HBaseIntegrationTests { psd.setLocation("file:/tmp/region=" + partVals[j]); Partition part = new Partition(Arrays.asList(partVals[j]), dbNames[i], tableNames[1], now, now, psd, emptyParameters); - store.addPartition(part); + rdbms.addPartition(part); } for (String funcName : funcNames) { - store.createFunction(new Function(funcName, dbNames[i], "classname", "ownername", - PrincipalType.USER, (int)System.currentTimeMillis()/1000, FunctionType.JAVA, + LOG.debug("Creating new function " + dbNames[i] + "." + funcName); + rdbms.createFunction(new Function(funcName, dbNames[i], "classname", "ownername", + PrincipalType.USER, (int) System.currentTimeMillis() / 1000, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, "uri")))); } } + for (int i = 0; i < tokenIds.length; i++) rdbms.addToken(tokenIds[i], tokens[i]); + for (int i = 0; i < masterKeys.length; i++) { + rdbms.addMasterKey(masterKeys[i]); + } + } - HBaseImport importer = new HBaseImport(); - importer.setConnections(rdbms, store); - importer.run(); + @Test + public void parallel() throws Exception { + int parallelFactor = 10; + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); - for (int i = 0; i < roles.length; i++) { - Role role = store.getRole(roles[i]); - Assert.assertNotNull(role); - Assert.assertEquals(roles[i], role.getRoleName()); + String[] dbNames = new String[] {"paralleldb1"}; + int now = (int)System.currentTimeMillis() / 1000; + + for (int i = 0; i < dbNames.length; i++) { + rdbms.createDatabase( + new Database(dbNames[i], "no description", "file:/tmp", emptyParameters)); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("region", "string", "")); + for (int j = 0; j < parallelFactor; j++) { + rdbms.createTable(new Table("t" + j, dbNames[i], "me", now, now, 0, sd, partCols, + emptyParameters, null, null, null)); + for (int k = 0; k < parallelFactor; k++) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/region=" + k); + Partition part = new Partition(Arrays.asList("p" + k), dbNames[i], "t" + j, + now, now, psd, emptyParameters); + rdbms.addPartition(part); + } + } } - // Make sure there aren't any extra roles - Assert.assertEquals(2, store.listRoleNames().size()); + + HBaseImport importer = new HBaseImport("-p", "2", "-b", "2", "-d", dbNames[0]); + importer.setConnections(rdbms, store); + importer.run(); for (int i = 0; i < dbNames.length; i++) { Database db = store.getDatabase(dbNames[i]); Assert.assertNotNull(db); - // check one random value in the db rather than every value - Assert.assertEquals("file:/tmp", db.getLocationUri()); - Table table = store.getTable(db.getName(), tableNames[0]); - Assert.assertNotNull(table); - Assert.assertEquals(now, table.getLastAccessTime()); - Assert.assertEquals("input", table.getSd().getInputFormat()); + for (int j = 0; j < parallelFactor; j++) { + Table table = store.getTable(db.getName(), "t" + j); + Assert.assertNotNull(table); + Assert.assertEquals(now, table.getLastAccessTime()); + Assert.assertEquals("input", table.getSd().getInputFormat()); - table = store.getTable(db.getName(), tableNames[1]); - Assert.assertNotNull(table); + for (int k = 0; k < parallelFactor; k++) { + Partition part = + store.getPartition(dbNames[i], "t" + j, Arrays.asList("p" + k)); + Assert.assertNotNull(part); + Assert.assertEquals("file:/tmp/region=" + k, part.getSd().getLocation()); + } - for (int j = 0; j < partVals.length; j++) { - Partition part = store.getPartition(dbNames[i], tableNames[1], Arrays.asList(partVals[j])); - Assert.assertNotNull(part); - Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation()); + Assert.assertEquals(parallelFactor, store.getPartitions(dbNames[i], "t" + j, -1).size()); } + Assert.assertEquals(parallelFactor, store.getAllTables(dbNames[i]).size()); - Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size()); - Assert.assertEquals(2, store.getAllTables(dbNames[i]).size()); + } + } - Assert.assertEquals(2, store.getFunctions(dbNames[i], "*").size()); - for (int j = 0; j < funcNames.length; j++) { - Assert.assertNotNull(store.getFunction(dbNames[i], funcNames[j])); + // Same as the test above except we create 9 of everything instead of 10. This is important + // because in using a batch size of 2 the previous test guarantees 10 /2 =5 , meaning we'll + // have 5 writes on the partition queue with exactly 2 entries. In this test we'll handle the + // case where the last entry in the queue has fewer partitions. + @Test + public void parallelOdd() throws Exception { + int parallelFactor = 9; + RawStore rdbms; + rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"oddparalleldb1"}; + int now = (int)System.currentTimeMillis() / 1000; + + for (int i = 0; i < dbNames.length; i++) { + rdbms.createDatabase( + new Database(dbNames[i], "no description", "file:/tmp", emptyParameters)); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("region", "string", "")); + for (int j = 0; j < parallelFactor; j++) { + rdbms.createTable(new Table("t" + j, dbNames[i], "me", now, now, 0, sd, partCols, + emptyParameters, null, null, null)); + for (int k = 0; k < parallelFactor; k++) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/region=" + k); + Partition part = new Partition(Arrays.asList("p" + k), dbNames[i], "t" + j, + now, now, psd, emptyParameters); + rdbms.addPartition(part); + } } } - Assert.assertEquals(2, store.getAllDatabases().size()); + HBaseImport importer = new HBaseImport("-p", "2", "-b", "2", "-d", dbNames[0]); + importer.setConnections(rdbms, store); + importer.run(); + + for (int i = 0; i < dbNames.length; i++) { + Database db = store.getDatabase(dbNames[i]); + Assert.assertNotNull(db); + + for (int j = 0; j < parallelFactor; j++) { + Table table = store.getTable(db.getName(), "t" + j); + Assert.assertNotNull(table); + Assert.assertEquals(now, table.getLastAccessTime()); + Assert.assertEquals("input", table.getSd().getInputFormat()); + + for (int k = 0; k < parallelFactor; k++) { + Partition part = + store.getPartition(dbNames[i], "t" + j, Arrays.asList("p" + k)); + Assert.assertNotNull(part); + Assert.assertEquals("file:/tmp/region=" + k, part.getSd().getLocation()); + } + + Assert.assertEquals(parallelFactor, store.getPartitions(dbNames[i], "t" + j, -1).size()); + } + Assert.assertEquals(parallelFactor, store.getAllTables(dbNames[i]).size()); + + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fa45e4a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java index e416b8a..e143de7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java @@ -19,11 +19,18 @@ package org.apache.hadoop.hive.metastore.hbase; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Deadline; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.api.Database; @@ -36,23 +43,41 @@ import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.Table; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * A tool to take the contents of an RDBMS based Hive metastore and import it into an HBase based * one. To use this the config files for Hive configured to work with the RDBMS (that is, - * including the JDBC string, etc.) and for HBase must be in the path. This tool will then + * including the JDBC string, etc.) as well as HBase configuration files must be in the path. + * There should not be a hive-site.xml that specifies HBaseStore in the path. This tool will then * handle connecting to the RDBMS via the {@link org.apache.hadoop.hive.metastore.ObjectStore} * and HBase via {@link org.apache.hadoop.hive.metastore.hbase.HBaseStore} and transferring the * data. + * + * This tool can import an entire metastore or only selected objects. When selecting objects it + * is necessary to fully specify the object's name. For example, if you want to import the table + * T in the default database it needs to be identified as default.T. The same is true for + * functions. When an object is specified, everything under that object will be imported (e.g. + * if you select database D, then all tables and functions in that database will be + * imported as well). + * + * At this point only tables and partitions are handled in parallel as it is assumed there are + * relatively few of everything else. + * + * Note that HBaseSchemaTool must have already been used to create the appropriate tables in HBase. */ public class HBaseImport { static final private Log LOG = LogFactory.getLog(HBaseImport.class.getName()); public static void main(String[] args) { - HBaseImport tool = new HBaseImport(); try { + HBaseImport tool = new HBaseImport(args); tool.run(); } catch (Exception e) { System.err.println("Caught exception " + e.getClass().getName() + " with message <" + @@ -60,112 +85,399 @@ public class HBaseImport { } } + private ThreadLocal<RawStore> rdbmsStore = new ThreadLocal<RawStore>() { + @Override + protected RawStore initialValue() { + if (rdbmsConf == null) { + throw new RuntimeException("order violation, need to set rdbms conf first"); + } + RawStore os = new ObjectStore(); + os.setConf(rdbmsConf); + return os; + } + }; + + private ThreadLocal<RawStore> hbaseStore = new ThreadLocal<RawStore>() { + @Override + protected RawStore initialValue() { + if (hbaseConf == null) { + throw new RuntimeException("order violation, need to set hbase conf first"); + } + RawStore hs = new HBaseStore(); + hs.setConf(hbaseConf); + return hs; + } + }; + private Configuration rdbmsConf; private Configuration hbaseConf; - private RawStore rdbmsStore; - private RawStore hbaseStore; private List<Database> dbs; - private List<Table> tables; + private BlockingQueue<Table> partitionedTables; + private BlockingQueue<String[]> tableNameQueue; + private BlockingQueue<PartQueueEntry> partQueue; + private boolean writingToQueue, readersFinished; + private boolean doKerberos, doAll; + private List<String> rolesToImport, dbsToImport, tablesToImport, functionsToImport; + private int parallel; + private int batchSize; @VisibleForTesting - HBaseImport() { - dbs = new ArrayList<Database>(); - tables = new ArrayList<Table>(); + HBaseImport(String... args) throws ParseException { + Options options = new Options(); + + doAll = doKerberos = false; + parallel = 1; + batchSize = 1000; + + options.addOption(OptionBuilder + .withLongOpt("all") + .withDescription("Import the full metastore") + .create('a')); + + options.addOption(OptionBuilder + .withLongOpt("batchsize") + .withDescription("Number of partitions to read and write in a batch, defaults to 1000") + .hasArg() + .create('b')); + + options.addOption(OptionBuilder + .withLongOpt("database") + .withDescription("Import a single database") + .hasArgs() + .create('d')); + + options.addOption(OptionBuilder + .withLongOpt("help") + .withDescription("You're looking at it") + .create('h')); + + options.addOption(OptionBuilder + .withLongOpt("function") + .withDescription("Import a single function") + .hasArgs() + .create('f')); + + options.addOption(OptionBuilder + .withLongOpt("kerberos") + .withDescription("Import all kerberos related objects (master key, tokens)") + .create('k')); + + options.addOption(OptionBuilder + .withLongOpt("parallel") + .withDescription("Parallel factor for loading (only applied to tables and partitions), " + + "defaults to 1") + .hasArg() + .create('p')); + + options.addOption(OptionBuilder + .withLongOpt("role") + .withDescription("Import a single role") + .hasArgs() + .create('r')); + options.addOption(OptionBuilder + .withLongOpt("tables") + .withDescription("Import a single tables") + .hasArgs() + .create('t')); + + CommandLine cli = new GnuParser().parse(options, args); + + // Process help, if it was asked for, this must be done first + if (cli.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hbaseschematool", options); + // returning here results in nothing else happening, because none of the other flags have + // been set. + return; + } + + // Now process the other command line args + if (cli.hasOption('a')) { + doAll = true; + } + if (cli.hasOption('b')) { + batchSize = Integer.valueOf(cli.getOptionValue('b')); + } + if (cli.hasOption('d')) { + dbsToImport = Arrays.asList(cli.getOptionValues('d')); + } + if (cli.hasOption('f')) { + functionsToImport = Arrays.asList(cli.getOptionValues('f')); + } + if (cli.hasOption('p')) { + parallel = Integer.valueOf(cli.getOptionValue('p')); + } + if (cli.hasOption('r')) { + rolesToImport = Arrays.asList(cli.getOptionValues('r')); + } + if (cli.hasOption('k')) { + doKerberos = true; + } + if (cli.hasOption('t')) { + tablesToImport = Arrays.asList(cli.getOptionValues('t')); + } + + dbs = new ArrayList<>(); + // We don't want to bound the size of the table queue because we keep it all in memory + partitionedTables = new LinkedBlockingQueue<>(); + tableNameQueue = new LinkedBlockingQueue<>(); + + // Bound the size of this queue so we don't get too much in memory. + partQueue = new ArrayBlockingQueue<>(parallel * 2); } @VisibleForTesting void run() throws MetaException, InstantiationException, IllegalAccessException, - NoSuchObjectException, InvalidObjectException { + NoSuchObjectException, InvalidObjectException, InterruptedException { // Order here is crucial, as you can't add tables until you've added databases, etc. init(); - copyRoles(); - copyDbs(); - copyTables(); - copyPartitions(); - copyFunctions(); + if (doAll || rolesToImport != null) { + copyRoles(); + } + if (doAll || dbsToImport != null) { + copyDbs(); + } + if (doAll || dbsToImport != null || tablesToImport != null) { + copyTables(); + copyPartitions(); + } + if (doAll || dbsToImport != null || functionsToImport != null) { + copyFunctions(); + } + if (doAll || doKerberos) { + copyKerberos(); + } } private void init() throws MetaException, IllegalAccessException, InstantiationException { - if (rdbmsStore != null) { + if (rdbmsConf != null) { // We've been configured for testing, so don't do anything here. return; } - rdbmsConf = new HiveConf(); // We're depending on having everything properly in the path - hbaseConf = new HiveConf(); + // We're depending on having everything properly in the path + rdbmsConf = new HiveConf(); + hbaseConf = new HiveConf();// HiveConf.setVar(hbaseConf, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, HBaseStore.class.getName()); HiveConf.setBoolVar(hbaseConf, HiveConf.ConfVars.METASTORE_FASTPATH, true); // First get a connection to the RDBMS based store - rdbmsStore = new ObjectStore(); - rdbmsStore.setConf(rdbmsConf); + rdbmsStore.get().setConf(rdbmsConf); // Get a connection to the HBase based store - hbaseStore = new HBaseStore(); - hbaseStore.setConf(hbaseConf); - // This will go create the tables if they don't exist - hbaseStore.verifySchema(); + hbaseStore.get().setConf(hbaseConf); } private void copyRoles() throws NoSuchObjectException, InvalidObjectException, MetaException { screen("Copying roles"); - for (String roleName : rdbmsStore.listRoleNames()) { - Role role = rdbmsStore.getRole(roleName); + List<String> toCopy = doAll ? rdbmsStore.get().listRoleNames() : rolesToImport; + for (String roleName : toCopy) { + Role role = rdbmsStore.get().getRole(roleName); screen("Copying role " + roleName); - hbaseStore.addRole(roleName, role.getOwnerName()); + hbaseStore.get().addRole(roleName, role.getOwnerName()); } } private void copyDbs() throws MetaException, NoSuchObjectException, InvalidObjectException { screen("Copying databases"); - for (String dbName : rdbmsStore.getAllDatabases()) { - Database db = rdbmsStore.getDatabase(dbName); + List<String> toCopy = doAll ? rdbmsStore.get().getAllDatabases() : dbsToImport; + for (String dbName : toCopy) { + Database db = rdbmsStore.get().getDatabase(dbName); dbs.add(db); screen("Copying database " + dbName); - hbaseStore.createDatabase(db); + hbaseStore.get().createDatabase(db); } } - private void copyTables() throws MetaException, InvalidObjectException { + private void copyTables() throws MetaException, InvalidObjectException, InterruptedException { screen("Copying tables"); + + // Start the parallel threads that will copy the tables + Thread[] copiers = new Thread[parallel]; + writingToQueue = true; + for (int i = 0; i < parallel; i++) { + copiers[i] = new TableCopier(); + copiers[i].start(); + } + + // Put tables from the databases we copied into the queue for (Database db : dbs) { screen("Coyping tables in database " + db.getName()); - for (String tableName : rdbmsStore.getAllTables(db.getName())) { - Table table = rdbmsStore.getTable(db.getName(), tableName); - tables.add(table); - screen("Copying table " + db.getName() + "." + tableName); - hbaseStore.createTable(table); + for (String tableName : rdbmsStore.get().getAllTables(db.getName())) { + tableNameQueue.put(new String[]{db.getName(), tableName}); + } + } + + // Now put any specifically requested tables into the queue + if (tablesToImport != null) { + for (String compoundTableName : tablesToImport) { + String[] tn = compoundTableName.split("\\."); + if (tn.length != 2) { + error(compoundTableName + " not in proper form. Must be in form dbname.tablename. " + + "Ignoring this table and continuing."); + } else { + tableNameQueue.put(new String[]{tn[0], tn[1]}); + } + } + } + writingToQueue = false; + + // Wait until we've finished adding all the tables + for (Thread copier : copiers) copier.join(); + } + + private class TableCopier extends Thread { + @Override + public void run() { + while (writingToQueue || tableNameQueue.size() > 0) { + try { + String[] name = tableNameQueue.poll(1, TimeUnit.SECONDS); + if (name != null) { + Table table = rdbmsStore.get().getTable(name[0], name[1]); + // If this has partitions, put it in the list to fetch partions for + if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) { + partitionedTables.put(table); + } + screen("Copying table " + name[0] + "." + name[1]); + hbaseStore.get().createTable(table); + } + } catch (InterruptedException | MetaException | InvalidObjectException e) { + throw new RuntimeException(e); + } } } } + /* Partition copying is a little complex. As we went through and copied the tables we put each + * partitioned table into a queue. We will now go through that queue and add partitions for the + * tables. We do the finding of partitions and writing of them separately and in parallel. + * This way if there is one table with >> partitions then all of the others that skew won't + * hurt us. To avoid pulling all of the partitions for a table into memory, we batch up + * partitions (by default in batches of 1000) and copy them over in batches. + */ private void copyPartitions() throws MetaException, NoSuchObjectException, - InvalidObjectException { + InvalidObjectException, InterruptedException { screen("Copying partitions"); - for (Table table : tables) { - System.out.print("Copying partitions for table " + table.getDbName() + "." + - table.getTableName()); - for (Partition part : rdbmsStore.getPartitions(table.getDbName(), table.getTableName(), -1)) { - LOG.info("Copying " + table.getTableName() + "." + table.getTableName() + "." + - StringUtils.join(part.getValues(), ':')); - System.out.print('.'); - hbaseStore.addPartition(part); + readersFinished = false; + Thread[] readers = new Thread[parallel]; + Thread[] writers = new Thread[parallel]; + for (int i = 0; i < parallel; i++) { + readers[i] = new PartitionReader(); + readers[i].start(); + writers[i] = new PartitionWriter(); + writers[i].start(); + } + + for (Thread reader : readers) reader.join(); + readersFinished = true; + + // Wait until we've finished adding all the partitions + for (Thread writer : writers) writer.join(); + } + + private class PartitionReader extends Thread { + @Override + public void run() { + while (partitionedTables.size() > 0) { + try { + Table table = partitionedTables.poll(1, TimeUnit.SECONDS); + if (table != null) { + screen("Fetching partitions for table " + table.getDbName() + "." + + table.getTableName()); + List<String> partNames = + rdbmsStore.get().listPartitionNames(table.getDbName(), table.getTableName(), + (short) -1); + if (partNames.size() <= batchSize) { + LOG.debug("Adding all partition names to queue for " + table.getDbName() + "." + + table.getTableName()); + partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(), partNames)); + } else { + int goUntil = partNames.size() % batchSize == 0 ? partNames.size() / batchSize : + partNames.size() / batchSize + 1; + for (int i = 0; i < goUntil; i++) { + int start = i * batchSize; + int end = Math.min((i + 1) * batchSize, partNames.size()); + LOG.debug("Adding partitions " + start + " to " + end + " for " + table.getDbName() + + "." + table.getTableName()); + partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(), + partNames.subList(start, end))); + } + } + } + } catch (InterruptedException | MetaException e) { + throw new RuntimeException(e); + } + } + } + } + + private class PartitionWriter extends Thread { + @Override + public void run() { + // This keeps us from throwing exceptions in our raw store calls + Deadline.registerIfNot(1000000); + while (!readersFinished || partQueue.size() > 0) { + try { + PartQueueEntry entry = partQueue.poll(1, TimeUnit.SECONDS); + if (entry != null) { + LOG.info("Writing partitions " + entry.dbName + "." + entry.tableName + "." + + StringUtils.join(entry.partNames, ':')); + // Fetch these partitions and write them to HBase + Deadline.startTimer("hbaseimport"); + List<Partition> parts = + rdbmsStore.get().getPartitionsByNames(entry.dbName, entry.tableName, + entry.partNames); + hbaseStore.get().addPartitions(entry.dbName, entry.tableName, parts); + Deadline.stopTimer(); + } + } catch (InterruptedException | MetaException | InvalidObjectException | + NoSuchObjectException e) { + throw new RuntimeException(e); + } } - System.out.println(); } } private void copyFunctions() throws MetaException, NoSuchObjectException, InvalidObjectException { screen("Copying functions"); + // Copy any functions from databases we copied. for (Database db : dbs) { screen("Copying functions in database " + db.getName()); - for (String funcName : rdbmsStore.getFunctions(db.getName(), "*")) { - Function func = rdbmsStore.getFunction(db.getName(), funcName); - screen("Copying function " + db.getName() + "." + funcName); - hbaseStore.createFunction(func); + for (String funcName : rdbmsStore.get().getFunctions(db.getName(), "*")) { + copyOneFunction(db.getName(), funcName); } } + // Now do any specifically requested functions + if (functionsToImport != null) { + for (String compoundFuncName : functionsToImport) { + String[] fn = compoundFuncName.split("\\."); + if (fn.length != 2) { + error(compoundFuncName + " not in proper form. Must be in form dbname.funcname. " + + "Ignoring this function and continuing."); + } else { + copyOneFunction(fn[0], fn[1]); + } + } + } + } + + private void copyOneFunction(String dbName, String funcName) throws MetaException, + InvalidObjectException { + Function func = rdbmsStore.get().getFunction(dbName, funcName); + screen("Copying function " + dbName + "." + funcName); + hbaseStore.get().createFunction(func); + } + + private void copyKerberos() throws MetaException { + screen("Copying kerberos related items"); + for (String tokenId : rdbmsStore.get().getAllTokenIdentifiers()) { + String token = rdbmsStore.get().getToken(tokenId); + hbaseStore.get().addToken(tokenId, token); + } + for (String masterKey : rdbmsStore.get().getMasterKeys()) { + hbaseStore.get().addMasterKey(masterKey); + } } private void screen(String msg) { @@ -173,12 +485,29 @@ public class HBaseImport { System.out.println(msg); } + private void error(String msg) { + LOG.error(msg); + System.err.println("ERROR: " + msg); + } + @VisibleForTesting - HBaseImport setConnections(RawStore rdbms, RawStore hbase) { - rdbmsStore = rdbms; - hbaseStore = hbase; + void setConnections(RawStore rdbms, RawStore hbase) { + rdbmsStore.set(rdbms); + hbaseStore.set(hbase); + rdbmsConf = rdbms.getConf(); + hbaseConf = hbase.getConf(); + } - return new HBaseImport(); + private static class PartQueueEntry { + final String dbName; + final String tableName; + final List<String> partNames; + + PartQueueEntry(String d, String t, List<String> p) { + dbName = d; + tableName = t; + partNames = p; + } } }