Repository: phoenix Updated Branches: refs/heads/master 61c948b73 -> 9bb0b01f6
PHOENIX-1069: Improve CsvBulkLoadTool to build indexes when loading data. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9bb0b01f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9bb0b01f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9bb0b01f Branch: refs/heads/master Commit: 9bb0b01f68e5da104810c3f1e3adb04ec2ba491f Parents: 61c948b Author: Jeffrey Zhong <jeffr...@apache.org> Authored: Wed Jul 9 16:08:46 2014 -0700 Committer: Jeffrey Zhong <jeffr...@apache.org> Committed: Wed Jul 9 16:08:46 2014 -0700 ---------------------------------------------------------------------- .../phoenix/mapreduce/CsvBulkLoadToolIT.java | 73 +++++++ .../phoenix/mapreduce/CsvBulkLoadTool.java | 207 ++++++++++++++----- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 18 +- 3 files changed, 250 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb0b01f/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java index 744b5d6..d4a80a2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; @@ -143,4 +144,76 @@ public class CsvBulkLoadToolIT { rs.close(); stmt.close(); } + + @Test + public void testImportWithIndex() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 " + + " (FIRST_NAME ASC)" + + " INCLUDE (LAST_NAME)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input3.csv", + "--table", "table3", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("FirstName 2", rs.getString(2)); + + rs.close(); + stmt.close(); + } + + @Test + public void testImportOneIndexTable() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE4 (ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = "CREATE INDEX TABLE4_IDX ON TABLE4 " + + " (FIRST_NAME ASC)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input4.csv", + "--table", "table4", + "--index-table", "TABLE4_IDX", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT * FROM TABLE4"); + assertFalse(rs.next()); + rs = stmt.executeQuery("SELECT FIRST_NAME FROM TABLE4 where FIRST_NAME='FirstName 1'"); + assertTrue(rs.next()); + assertEquals("FirstName 1", rs.getString(1)); + + rs.close(); + stmt.close(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb0b01f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index f0b4ae8..08a43fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -17,12 +17,18 @@ */ package org.apache.phoenix.mapreduce; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -42,14 +48,21 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.job.JobManager; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.util.CSVCommonsLoader; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; @@ -75,6 +88,7 @@ public class CsvBulkLoadTool extends Configured implements Tool { static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)"); static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)"); static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)"); + static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table"); static final Option DELIMITER_OPT = new Option("d", "delimiter", true, "Input delimiter, defaults to comma"); static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)"); static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported"); @@ -130,6 +144,7 @@ public class CsvBulkLoadTool extends Configured implements Tool { Options options = new Options(); options.addOption(INPUT_PATH_OPT); options.addOption(TABLE_NAME_OPT); + options.addOption(INDEX_TABLE_NAME_OPT); options.addOption(ZK_QUORUM_OPT); options.addOption(OUTPUT_PATH_OPT); options.addOption(SCHEMA_NAME_OPT); @@ -156,7 +171,7 @@ public class CsvBulkLoadTool extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - HBaseConfiguration.addHbaseResources(getConf()); + Configuration conf = HBaseConfiguration.addHbaseResources(getConf()); CommandLine cmdLine = null; try { @@ -167,11 +182,23 @@ public class CsvBulkLoadTool extends Configured implements Tool { Class.forName(DriverManager.class.getName()); Connection conn = DriverManager.getConnection( getJdbcUrl(cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()))); - String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt()); + + return loadData(conf, cmdLine, conn); + } + + private int loadData(Configuration conf, CommandLine cmdLine, + Connection conn) throws SQLException, InterruptedException, + ExecutionException { + String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt()); String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()); + String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt()); String qualifiedTableName = getQualifiedTableName(schemaName, tableName); + String qualifedIndexTableName = null; + if(indexTableName != null){ + qualifedIndexTableName = getQualifiedTableName(schemaName, indexTableName); + } List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName); - configureOptions(cmdLine, importColumns, getConf()); + configureOptions(cmdLine, importColumns, conf); try { validateTable(conn, schemaName, tableName); @@ -186,52 +213,54 @@ public class CsvBulkLoadTool extends Configured implements Tool { } else { outputPath = new Path("/tmp/" + UUID.randomUUID()); } - LOG.info("Configuring HFile output path to {}", outputPath); - - Job job = new Job(getConf(), - "Phoenix MapReduce import for " - + getConf().get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY)); - - // Allow overriding the job jar setting by using a -D system property at startup - if (job.getJar() == null) { - job.setJarByClass(CsvToKeyValueMapper.class); + + List<String> tablesToBeLoaded = new ArrayList<String>(); + tablesToBeLoaded.add(qualifiedTableName); + tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName)); + + // When loading a single index table, check index table name is correct + if(qualifedIndexTableName != null){ + boolean exists = false; + for(String tmpTable : tablesToBeLoaded){ + if(tmpTable.compareToIgnoreCase(qualifedIndexTableName) == 0) { + exists = true; + break; + } + } + if(!exists){ + throw new IllegalStateException("CSV Bulk Loader error: index table " + + qualifedIndexTableName + " doesn't exist"); + } + tablesToBeLoaded.clear(); + tablesToBeLoaded.add(qualifedIndexTableName); } - job.setInputFormatClass(TextInputFormat.class); - FileInputFormat.addInputPath(job, inputPath); - - FileSystem.get(getConf()); - FileOutputFormat.setOutputPath(job, outputPath); - - job.setMapperClass(CsvToKeyValueMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - - HTable htable = new HTable(getConf(), qualifiedTableName); - - // Auto configure partitioner and reducer according to the Main Data table - HFileOutputFormat.configureIncrementalLoad(job, htable); - - LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath); - boolean success = job.waitForCompletion(true); - if (!success) { - LOG.error("Import job failed, check JobTracker for details"); - return 1; + + List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>(); + ExecutorService executor = JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20); + try{ + for(String table : tablesToBeLoaded) { + Path tablePath = new Path(outputPath, table); + Configuration jobConf = new Configuration(conf); + jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName); + if(qualifiedTableName.compareToIgnoreCase(table) != 0) { + jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table); + } + TableLoader tableLoader = new TableLoader(jobConf, table, inputPath, tablePath); + runningJobs.add(executor.submit(tableLoader)); + } + } finally { + executor.shutdown(); } - - LOG.info("Loading HFiles from {}", outputPath); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConf()); - loader.doBulkLoad(outputPath, htable); - htable.close(); - - LOG.info("Incremental load complete"); - - LOG.info("Removing output directory {}", outputPath); - if (!FileSystem.get(getConf()).delete(outputPath, true)) { - LOG.error("Removing output directory {} failed", outputPath); + + // wait for all jobs to complete + int retCode = 0; + for(Future<Boolean> task : runningJobs){ + if(!task.get() && (retCode==0)){ + retCode = -1; + } } - - return 0; - } + return retCode; + } String getJdbcUrl(String zkQuorum) { if (zkQuorum == null) { @@ -343,4 +372,90 @@ public class CsvBulkLoadTool extends Configured implements Tool { } rs.close(); } + + /** + * Get names of index tables of current data table + * @throws java.sql.SQLException + */ + private List<String> getIndexTables(Connection conn, String schemaName, String tableName) + throws SQLException { + PTable table = PhoenixRuntime.getTable(conn, tableName); + List<String> indexTables = new ArrayList<String>(); + for(PTable indexTable : table.getIndexes()){ + indexTables.add(getQualifiedTableName(schemaName, + indexTable.getTableName().getString())); + } + return indexTables; + } + + /** + * A runnable to load data into a single table + * + */ + private static class TableLoader implements Callable<Boolean> { + + private Configuration conf; + private String tableName; + private Path inputPath; + private Path outputPath; + + public TableLoader(Configuration conf, String qualifiedTableName, Path inputPath, + Path outputPath){ + this.conf = conf; + this.tableName = qualifiedTableName; + this.inputPath = inputPath; + this.outputPath = outputPath; + } + + @Override + public Boolean call() { + LOG.info("Configuring HFile output path to {}", outputPath); + try{ + Job job = new Job(conf, "Phoenix MapReduce import for " + tableName); + + // Allow overriding the job jar setting by using a -D system property at startup + if (job.getJar() == null) { + job.setJarByClass(CsvToKeyValueMapper.class); + } + job.setInputFormatClass(TextInputFormat.class); + FileInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + + job.setMapperClass(CsvToKeyValueMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + + HTable htable = new HTable(conf, tableName); + + // Auto configure partitioner and reducer according to the Main Data table + HFileOutputFormat.configureIncrementalLoad(job, htable); + + LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath); + boolean success = job.waitForCompletion(true); + if (!success) { + LOG.error("Import job failed, check JobTracker for details"); + return false; + } + + LOG.info("Loading HFiles from {}", outputPath); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + loader.doBulkLoad(outputPath, htable); + htable.close(); + + LOG.info("Incremental load complete for table=" + tableName); + + LOG.info("Removing output directory {}", outputPath); + if (!FileSystem.get(conf).delete(outputPath, true)) { + LOG.error("Removing output directory {} failed", outputPath); + } + + return true; + } catch(Exception ex) { + LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex); + return false; + } + } + + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb0b01f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index 776f028..4feb7e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -81,6 +82,9 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes /** Configuration key for the name of the output table */ public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename"; + + /** Configuration key for the name of the output index table */ + public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename"; /** Configuration key for the columns to be imported */ public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos"; @@ -93,6 +97,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes private MapperUpsertListener upsertListener; private CsvLineParser csvLineParser; private ImportPreUpsertKeyValueProcessor preUpdateProcessor; + private byte[] tableName; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -115,6 +120,11 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0)); preUpdateProcessor = loadPreUpsertProcessor(conf); + if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){ + tableName = Bytes.toBytes(conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY)); + } else { + tableName = Bytes.toBytes(conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, "")); + } } @SuppressWarnings("deprecation") @@ -136,13 +146,17 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes csvUpsertExecutor.execute(ImmutableList.of(csvRecord)); Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator - = PhoenixRuntime.getUncommittedDataIterator(conn); + = PhoenixRuntime.getUncommittedDataIterator(conn, true); while (uncommittedDataIterator.hasNext()) { Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next(); + if(Bytes.compareTo(tableName, kvPair.getFirst()) != 0) { + // skip edits for other tables + continue; + } List<KeyValue> keyValueList = kvPair.getSecond(); keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); for (KeyValue kv : keyValueList) { - outputKey.set(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()); + outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); context.write(outputKey, kv); } }