Repository: phoenix
Updated Branches:
  refs/heads/3.0 ec1b76ab2 -> 94644d5b7


PHOENIX-1069: Improve CsvBulkLoadTool to build indexes when loading data.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/94644d5b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/94644d5b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/94644d5b

Branch: refs/heads/3.0
Commit: 94644d5b7215551147c03adcae4721680b3ef407
Parents: ec1b76a
Author: Jeffrey Zhong <jeffr...@apache.org>
Authored: Wed Jul 9 16:08:46 2014 -0700
Committer: Jeffrey Zhong <jeffr...@apache.org>
Committed: Wed Jul 9 17:13:31 2014 -0700

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkLoadToolIT.java    |  73 +++++++
 .../phoenix/mapreduce/CsvBulkLoadTool.java      | 207 ++++++++++++++-----
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |  16 +-
 3 files changed, 249 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/94644d5b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 744b5d6..d4a80a2 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Statement;
 
@@ -143,4 +144,76 @@ public class CsvBulkLoadToolIT {
         rs.close();
         stmt.close();
     }
+
+    @Test
+    public void testImportWithIndex() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " +
+            "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 "
+                + " (FIRST_NAME ASC)"
+                + " INCLUDE (LAST_NAME)";
+        stmt.execute(ddl);
+        
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new 
Path("/tmp/input3.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input3.csv",
+                "--table", "table3",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 
where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+
+        rs.close();
+        stmt.close();
+    }
+    
+    @Test
+    public void testImportOneIndexTable() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE4 (ID INTEGER NOT NULL PRIMARY KEY, " +
+            "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl = "CREATE INDEX TABLE4_IDX ON TABLE4 "
+                + " (FIRST_NAME ASC)";
+        stmt.execute(ddl);
+        
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new 
Path("/tmp/input4.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input4.csv",
+                "--table", "table4",
+                "--index-table", "TABLE4_IDX",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT * FROM TABLE4");
+        assertFalse(rs.next());
+        rs = stmt.executeQuery("SELECT FIRST_NAME FROM TABLE4 where 
FIRST_NAME='FirstName 1'");
+        assertTrue(rs.next());
+        assertEquals("FirstName 1", rs.getString(1));
+
+        rs.close();
+        stmt.close();
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/94644d5b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 30a9478..84fc5d5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -17,12 +17,18 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -42,14 +48,21 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.job.JobManager;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -74,6 +87,7 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
     static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, 
"Output path for temporary HFiles (optional)");
     static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, 
"Phoenix schema name (optional)");
     static final Option TABLE_NAME_OPT = new Option("t", "table", true, 
"Phoenix table name (mandatory)");
+    static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", 
true, "Phoenix index table name when just loading this particualar index 
table");
     static final Option DELIMITER_OPT = new Option("d", "delimiter", true, 
"Input delimiter, defaults to comma");
     static final Option ARRAY_DELIMITER_OPT = new Option("a", 
"array-delimiter", true, "Array element delimiter (optional)");
     static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", 
true, "Comma-separated list of columns to be imported");
@@ -129,6 +143,7 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
         Options options = new Options();
         options.addOption(INPUT_PATH_OPT);
         options.addOption(TABLE_NAME_OPT);
+        options.addOption(INDEX_TABLE_NAME_OPT);
         options.addOption(ZK_QUORUM_OPT);
         options.addOption(OUTPUT_PATH_OPT);
         options.addOption(SCHEMA_NAME_OPT);
@@ -155,7 +170,7 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
     @Override
     public int run(String[] args) throws Exception {
 
-        HBaseConfiguration.addHbaseResources(getConf());
+        Configuration conf = HBaseConfiguration.addHbaseResources(getConf());
 
         CommandLine cmdLine = null;
         try {
@@ -166,11 +181,23 @@ public class CsvBulkLoadTool extends Configured 
implements Tool {
         Class.forName(DriverManager.class.getName());
         Connection conn = DriverManager.getConnection(
                 getJdbcUrl(cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt())));
-        String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
+        
+        return loadData(conf, cmdLine, conn);
+    }
+
+       private int loadData(Configuration conf, CommandLine cmdLine,
+                       Connection conn) throws SQLException, 
InterruptedException,
+                       ExecutionException {
+               String tableName = 
cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
         String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
+        String indexTableName = 
cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
+        String qualifedIndexTableName = null;
+        if(indexTableName != null){
+               qualifedIndexTableName = getQualifiedTableName(schemaName, 
indexTableName);
+        }
         List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, 
qualifiedTableName);
-        configureOptions(cmdLine, importColumns, getConf());
+        configureOptions(cmdLine, importColumns, conf);
 
         try {
             validateTable(conn, schemaName, tableName);
@@ -185,52 +212,54 @@ public class CsvBulkLoadTool extends Configured 
implements Tool {
         } else {
             outputPath = new Path("/tmp/" + UUID.randomUUID());
         }
-        LOG.info("Configuring HFile output path to {}", outputPath);
-
-        Job job = new Job(getConf(),
-                "Phoenix MapReduce import for "
-                        + 
getConf().get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY));
-
-        // Allow overriding the job jar setting by using a -D system property 
at startup
-        if (job.getJar() == null) {
-            job.setJarByClass(CsvToKeyValueMapper.class);
+        
+        List<String> tablesToBeLoaded = new ArrayList<String>();
+        tablesToBeLoaded.add(qualifiedTableName);
+        tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, 
qualifiedTableName));
+        
+        // When loading a single index table, check index table name is correct
+        if(qualifedIndexTableName != null){
+               boolean exists = false;
+               for(String tmpTable : tablesToBeLoaded){
+                       if(tmpTable.compareToIgnoreCase(qualifedIndexTableName) 
== 0) {
+                               exists = true;
+                               break;
+                       }
+               }
+               if(!exists){
+                throw new IllegalStateException("CSV Bulk Loader error: index 
table " +
+                    qualifedIndexTableName + " doesn't exist");
+               }
+               tablesToBeLoaded.clear();
+               tablesToBeLoaded.add(qualifedIndexTableName);
         }
-        job.setInputFormatClass(TextInputFormat.class);
-        FileInputFormat.addInputPath(job, inputPath);
-
-        FileSystem.get(getConf());
-        FileOutputFormat.setOutputPath(job, outputPath);
-
-        job.setMapperClass(CsvToKeyValueMapper.class);
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        job.setMapOutputValueClass(KeyValue.class);
-
-        HTable htable = new HTable(getConf(), qualifiedTableName);
-
-        // Auto configure partitioner and reducer according to the Main Data 
table
-        HFileOutputFormat.configureIncrementalLoad(job, htable);
-
-        LOG.info("Running MapReduce import job from {} to {}", inputPath, 
outputPath);
-        boolean success = job.waitForCompletion(true);
-        if (!success) {
-            LOG.error("Import job failed, check JobTracker for details");
-            return 1;
+        
+        List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
+        ExecutorService executor =  
JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20);
+        try{
+               for(String table : tablesToBeLoaded) {
+                       Path tablePath = new Path(outputPath, table);
+                       Configuration jobConf = new Configuration(conf);
+                       jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, 
qualifiedTableName);
+                       if(qualifiedTableName.compareToIgnoreCase(table) != 0) {
+                               
jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table);
+                       }
+                       TableLoader tableLoader = new TableLoader(jobConf, 
table, inputPath, tablePath);
+                       runningJobs.add(executor.submit(tableLoader));
+               }
+        } finally {
+               executor.shutdown();
         }
-
-        LOG.info("Loading HFiles from {}", outputPath);
-        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConf());
-        loader.doBulkLoad(outputPath, htable);
-        htable.close();
-
-        LOG.info("Incremental load complete");
-
-        LOG.info("Removing output directory {}", outputPath);
-        if (!FileSystem.get(getConf()).delete(outputPath, true)) {
-            LOG.error("Removing output directory {} failed", outputPath);
+        
+        // wait for all jobs to complete
+        int retCode = 0;
+        for(Future<Boolean> task : runningJobs){
+               if(!task.get() && (retCode==0)){
+                       retCode = -1;
+               }
         }
-
-        return 0;
-    }
+               return retCode;
+       }
 
     String getJdbcUrl(String zkQuorum) {
         if (zkQuorum == null) {
@@ -342,4 +371,90 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
         }
         rs.close();
     }
+    
+    /**
+     * Get names of index tables of current data table
+     * @throws java.sql.SQLException
+     */
+    private List<String> getIndexTables(Connection conn, String schemaName, 
String tableName) 
+        throws SQLException {
+        PTable table = PhoenixRuntime.getTable(conn, tableName);
+        List<String> indexTables = new ArrayList<String>();
+        for(PTable indexTable : table.getIndexes()){
+               indexTables.add(getQualifiedTableName(schemaName, 
+                indexTable.getTableName().getString()));
+        }
+        return indexTables;
+    }
+    
+    /**
+     * A runnable to load data into a single table
+     *
+     */
+    private static class TableLoader implements Callable<Boolean> {
+        
+       private Configuration conf;
+        private String tableName;
+        private Path inputPath;
+        private Path outputPath;
+         
+        public TableLoader(Configuration conf, String qualifiedTableName, Path 
inputPath, 
+                       Path outputPath){
+               this.conf = conf;
+            this.tableName = qualifiedTableName;
+            this.inputPath = inputPath;
+            this.outputPath = outputPath;
+        }
+        
+        @Override
+        public Boolean call() {
+            LOG.info("Configuring HFile output path to {}", outputPath);
+            try{
+                   Job job = new Job(conf, "Phoenix MapReduce import for " + 
tableName);
+       
+                   // Allow overriding the job jar setting by using a -D 
system property at startup
+                   if (job.getJar() == null) {
+                       job.setJarByClass(CsvToKeyValueMapper.class);
+                   }
+                   job.setInputFormatClass(TextInputFormat.class);
+                   FileInputFormat.addInputPath(job, inputPath);
+                   FileOutputFormat.setOutputPath(job, outputPath);
+       
+                   job.setMapperClass(CsvToKeyValueMapper.class);
+                   job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+                   job.setMapOutputValueClass(KeyValue.class);
+       
+                   HTable htable = new HTable(conf, tableName);
+       
+                   // Auto configure partitioner and reducer according to the 
Main Data table
+                   HFileOutputFormat.configureIncrementalLoad(job, htable);
+       
+                   LOG.info("Running MapReduce import job from {} to {}", 
inputPath, outputPath);
+                   boolean success = job.waitForCompletion(true);
+                   if (!success) {
+                       LOG.error("Import job failed, check JobTracker for 
details");
+                       return false;
+                   }
+       
+                   LOG.info("Loading HFiles from {}", outputPath);
+                   LoadIncrementalHFiles loader = new 
LoadIncrementalHFiles(conf);
+                   loader.doBulkLoad(outputPath, htable);
+                   htable.close();
+       
+                   LOG.info("Incremental load complete for table=" + 
tableName);
+       
+                   LOG.info("Removing output directory {}", outputPath);
+                   if (!FileSystem.get(conf).delete(outputPath, true)) {
+                       LOG.error("Removing output directory {} failed", 
outputPath);
+                   }
+                   
+                   return true;
+            } catch(Exception ex) {
+               LOG.error("Import job on table=" + tableName + " failed due to 
exception:" + ex);
+               return false;
+            }
+        }
+     
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/94644d5b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 1bf15ff..24b6a12 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -80,6 +81,9 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
 
     /** Configuration key for the name of the output table */
     public static final String TABLE_NAME_CONFKEY = 
"phoenix.mapreduce.import.tablename";
+    
+    /** Configuration key for the name of the output index table */
+    public static final String INDEX_TABLE_NAME_CONFKEY = 
"phoenix.mapreduce.import.indextablename";
 
     /** Configuration key for the columns to be imported */
     public static final String COLUMN_INFO_CONFKEY = 
"phoenix.mapreduce.import.columninfos";
@@ -92,6 +96,7 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
     private MapperUpsertListener upsertListener;
     private CsvLineParser csvLineParser;
     private ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+    private byte[] tableName;
 
     @Override
     protected void setup(Context context) throws IOException, 
InterruptedException {
@@ -114,6 +119,11 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
         csvLineParser = new 
CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0));
 
         preUpdateProcessor = loadPreUpsertProcessor(conf);
+        if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, 
"").isEmpty()){
+               tableName = 
Bytes.toBytes(conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY));
+        } else {
+               tableName = 
Bytes.toBytes(conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, ""));
+        }
     }
 
     @Override
@@ -134,9 +144,13 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
             csvUpsertExecutor.execute(ImmutableList.of(csvRecord));
 
             Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
-                    = PhoenixRuntime.getUncommittedDataIterator(conn);
+                    = PhoenixRuntime.getUncommittedDataIterator(conn, true);
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<KeyValue>> kvPair = 
uncommittedDataIterator.next();
+                if(Bytes.compareTo(tableName, kvPair.getFirst()) != 0) {
+                       // skip edits for other tables
+                       continue;
+                }
                 List<KeyValue> keyValueList = kvPair.getSecond();
 
                 keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), 
keyValueList);

Reply via email to