Repository: hbase Updated Branches: refs/heads/0.98 5e1801b56 -> cb3ad7c70
HBASE-8361 Bulk load and other utilities should not create tables for user (Ashish Singhi) Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb3ad7c7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb3ad7c7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb3ad7c7 Branch: refs/heads/0.98 Commit: cb3ad7c707ad3ba467d8cc151e7770d7cef48986 Parents: 5e1801b Author: stack <st...@apache.org> Authored: Sun Oct 12 21:52:01 2014 -0700 Committer: stack <st...@apache.org> Committed: Sun Oct 12 22:11:28 2014 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/mapreduce/ImportTsv.java | 35 +++++++++++++------- .../hbase/mapreduce/LoadIncrementalHFiles.java | 20 ++++++++--- .../hadoop/hbase/mapreduce/TestImportTsv.java | 25 +++++++++++--- .../mapreduce/TestLoadIncrementalHFiles.java | 24 ++++++++++---- 4 files changed, 76 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ad7c7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 3fb3a99..bb8ee36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -92,6 +93,7 @@ public class ImportTsv extends Configured implements Tool { final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>"; final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ","; final static Class DEFAULT_MAPPER = TsvImporterMapper.class; + public final static String CREATE_TABLE_CONF_KEY = "create.table"; public static class TsvParser { /** @@ -129,7 +131,7 @@ public class ImportTsv extends Configured implements Tool { /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC - * @param separatorStr + * @param separatorStr */ public TsvParser(String columnsSpecification, String separatorStr) { // Configure separator @@ -253,7 +255,7 @@ public class ImportTsv extends Configured implements Tool { public int getRowKeyLength() { return getColumnLength(rowKeyColumnIndex); } - + public long getTimestamp(long ts) throws BadTsvLineException { // Return ts if HBASE_TS_KEY is not configured in column spec if (!hasTimestamp()) { @@ -279,7 +281,7 @@ public class ImportTsv extends Configured implements Tool { getColumnLength(attrKeyColumnIndex)); } } - + public String[] getIndividualAttributes() { String attributes = getAttributes(); if (attributes != null) { @@ -288,7 +290,7 @@ public class ImportTsv extends Configured implements Tool { return null; } } - + public int getAttributeKeyOffset() { if (hasAttributes()) { return getColumnOffset(attrKeyColumnIndex); @@ -430,10 +432,16 @@ public class ImportTsv extends Configured implements Tool { if (hfileOutPath != null) { if (!admin.tableExists(tableName)) { - LOG.warn(format("Table '%s' does not exist.", tableName)); - // TODO: this is backwards. Instead of depending on the existence of a table, - // create a sane splits file for HFileOutputFormat based on data sampling. - createTable(admin, tableName, columns); + String errorMsg = format("Table '%s' does not exist.", tableName); + if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) { + LOG.warn(errorMsg); + // TODO: this is backwards. Instead of depending on the existence of a table, + // create a sane splits file for HFileOutputFormat based on data sampling. + createTable(admin, tableName, columns); + } else { + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } } HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); @@ -497,7 +505,7 @@ public class ImportTsv extends Configured implements Tool { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } - String usage = + String usage = "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" + "\n" + "Imports the given input directory of TSV data into the specified table.\n" + @@ -531,6 +539,9 @@ public class ImportTsv extends Configured implements Tool { " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" + " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + + " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + + "\n" + "For performance consider the following options:\n" + " -Dmapred.map.tasks.speculative.execution=false\n" + " -Dmapred.reduce.tasks.speculative.execution=false"; @@ -581,7 +592,7 @@ public class ImportTsv extends Configured implements Tool { + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); return -1; } - + int attrKeysFound = 0; for (String col : columns) { if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC)) @@ -592,7 +603,7 @@ public class ImportTsv extends Configured implements Tool { + TsvParser.ATTRIBUTES_COLUMN_SPEC); return -1; } - + // Make sure one or more columns are specified excluding rowkey and // timestamp key if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) { @@ -607,7 +618,7 @@ public class ImportTsv extends Configured implements Tool { // Set it back to replace invalid timestamp (non-numeric) with current // system time getConf().setLong(TIMESTAMP_CONF_KEY, timstamp); - + Job job = createSubmittableJob(getConf(), otherArgs); return job.waitForCompletion(true) ? 0 : 1; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ad7c7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 263ed92..2a237a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static java.lang.String.format; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -111,6 +113,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public static final String MAX_FILES_PER_REGION_PER_FAMILY = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; + public final static String CREATE_TABLE_CONF_KEY = "create.table"; private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; @@ -145,9 +148,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } private void usage() { - System.err.println("usage: " + NAME + - " /path/to/hfileoutputformat-output " + - "tablename"); + System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + + "\n"); } /** @@ -891,7 +895,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool { TableName tableName = TableName.valueOf(args[1]); boolean tableExists = this.doesTableExist(tableName); - if (!tableExists) this.createTable(tableName,dirPath); + if (!tableExists) { + if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { + this.createTable(tableName, dirPath); + } else { + String errorMsg = format("Table '%s' does not exist.", tableName); + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } + } Path hfofDir = new Path(dirPath); HTable table = new HTable(getConf(), tableName); http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ad7c7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 5333881..9b4339f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -113,7 +115,7 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, null, args, 1); util.deleteTable(table); } - + @Test public void testMROnTableWithTimestamp() throws Exception { String table = "test-" + UUID.randomUUID(); @@ -131,7 +133,7 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, data, args, 1); util.deleteTable(table); } - + @Test public void testMROnTableWithCustomMapper() @@ -148,7 +150,7 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, null, args, 3); util.deleteTable(table); } - + @Test public void testBulkOutputWithoutAnExistingTable() throws Exception { String table = "test-" + UUID.randomUUID(); @@ -221,12 +223,25 @@ public class TestImportTsv implements Configurable { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table }; String data = "KEY\u001bVALUE4\u001bVALUE8\n"; doMROnTableTest(util, FAMILY, data, args, 4); } - + + @Test(expected = TableNotFoundException.class) + public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { + String table = "test-" + UUID.randomUUID(); + String[] args = + new String[] { table, "/inputFile" }; + + Configuration conf = new Configuration(util.getConfiguration()); + conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); + conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); + conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); + ImportTsv.createSubmittableJob(conf, args); + } + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args) throws Exception { return doMROnTableTest(util, family, data, args, 1); http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ad7c7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 71a7580..86f30bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -127,7 +128,7 @@ public class TestLoadIncrementalHFiles { new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); } - + /** * Test loading into a column family that has a ROWCOL bloom filter. */ @@ -340,7 +341,7 @@ public class TestLoadIncrementalHFiles { map.put(last, value-1); } - @Test + @Test public void testInferBoundaries() { TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); @@ -350,8 +351,8 @@ public class TestLoadIncrementalHFiles { * * Should be inferred as: * a-----------------k m-------------q r--------------t u---------x - * - * The output should be (m,r,u) + * + * The output should be (m,r,u) */ String first; @@ -359,7 +360,7 @@ public class TestLoadIncrementalHFiles { first = "a"; last = "e"; addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - + first = "r"; last = "s"; addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); @@ -380,14 +381,14 @@ public class TestLoadIncrementalHFiles { first = "s"; last = "t"; addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - + first = "u"; last = "w"; addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map); byte[][] compare = new byte[3][]; compare[0] = "m".getBytes(); - compare[1] = "r".getBytes(); + compare[1] = "r".getBytes(); compare[2] = "u".getBytes(); assertEquals(keysArray.length, 3); @@ -421,5 +422,14 @@ public class TestLoadIncrementalHFiles { + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); } } + + @Test(expected = TableNotFoundException.class) + public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { + Configuration conf = util.getConfiguration(); + conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + String[] args = { "directory", "nonExistingTable" }; + loader.run(args); + } }