http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 82b353c..e594e0d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -17,10 +17,17 @@
  */
 package org.apache.phoenix.mapreduce.index;
 
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
@@ -36,11 +43,15 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -48,9 +59,13 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
 import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
@@ -62,9 +77,9 @@ import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -72,6 +87,8 @@ import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * An MR job to populate the index table from the data table.
  *
@@ -85,7 +102,11 @@ public class IndexTool extends Configured implements Tool {
     private static final Option DATA_TABLE_OPTION = new Option("dt", 
"data-table", true,
             "Data table name (mandatory)");
     private static final Option INDEX_TABLE_OPTION = new Option("it", 
"index-table", true,
-            "Index table name(mandatory)");
+            "Index table name(not required in case of partial rebuilding)");
+    
+    private static final Option PARTIAL_REBUILD_OPTION = new Option("pr", 
"partial-rebuild", false,
+            "To build indexes for a data table from least disabledTimeStamp");
+    
     private static final Option DIRECT_API_OPTION = new Option("direct", 
"direct", false,
             "If specified, we avoid the bulk load (optional)");
     private static final Option RUN_FOREGROUND_OPTION =
@@ -105,6 +126,7 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(SCHEMA_NAME_OPTION);
         options.addOption(DATA_TABLE_OPTION);
         options.addOption(INDEX_TABLE_OPTION);
+        options.addOption(PARTIAL_REBUILD_OPTION);
         options.addOption(DIRECT_API_OPTION);
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
@@ -139,18 +161,18 @@ public class IndexTool extends Configured implements Tool 
{
                     + "parameter");
         }
 
-        if (!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
-            throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + 
" is a mandatory "
-                    + "parameter");
-        }
-
-        if (!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
-            throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + 
" is a mandatory "
-                    + "parameter");
-        }
-
-        if (!cmdLine.hasOption(DIRECT_API_OPTION.getOpt())
-                && cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt())) {
+               if (!(cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) || 
cmdLine.hasOption(DIRECT_API_OPTION.getOpt()))
+                               && 
!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
+                       throw new 
IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + 
"parameter");
+               }
+        
+               if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && 
cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+                       throw new IllegalStateException("Index name should not 
be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
+               }
+                       
+        if (!(cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) && 
cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())
+                && cmdLine.hasOption(RUN_FOREGROUND_OPTION
+                        .getOpt())) {
             throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt()
                     + " is applicable only for " + 
DIRECT_API_OPTION.getLongOpt());
         }
@@ -167,50 +189,155 @@ public class IndexTool extends Configured implements 
Tool {
         formatter.printHelp("help", options);
         System.exit(exitCode);
     }
+    
+    class JobFactory {
+        Connection connection;
+        Configuration configuration;
+        private Path outputPath;
 
-    @Override
-    public int run(String[] args) throws Exception {
-        Connection connection = null;
-        try {
-            CommandLine cmdLine = null;
-            try {
-                cmdLine = parseOptions(args);
-            } catch (IllegalStateException e) {
-                printHelpAndExit(e.getMessage(), getOptions());
+        public JobFactory(Connection connection, Configuration configuration, 
Path outputPath) {
+            this.connection = connection;
+            this.configuration = configuration;
+            this.outputPath = outputPath;
+
+        }
+
+        public Job getJob(String schemaName, String indexTable, String 
dataTable, boolean useDirectApi, boolean isPartialBuild) throws Exception {
+            if (isPartialBuild) {
+                return configureJobForPartialBuild(schemaName, dataTable);
+            } else {
+                return configureJobForAysncIndex(schemaName, indexTable, 
dataTable, useDirectApi);
             }
-            final Configuration configuration = 
HBaseConfiguration.addHbaseResources(getConf());
-            final String schemaName = 
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
-            final String dataTable = 
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
-            final String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+        }
+        
+        private Job configureJobForPartialBuild(String schemaName, String 
dataTable) throws Exception {
             final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            final String qIndexTable = 
SchemaUtil.getQualifiedTableName(schemaName, indexTable);
-
+            final PTable pdataTable = PhoenixRuntime.getTable(connection, 
qDataTable);
             connection = ConnectionUtil.getInputConnection(configuration);
-            if (!isValidIndexTable(connection, qDataTable, indexTable)) {
-                throw new IllegalArgumentException(String.format(
-                    " %s is not an index table for %s ", qIndexTable, 
qDataTable));
+            long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
+            PTable indexWithMinDisableTimestamp = null;
+            
+            //Get Indexes in building state, minDisabledTimestamp 
+            List<String> disableIndexes = new ArrayList<String>();
+            List<PTable> disabledPIndexes = new ArrayList<PTable>();
+            for (PTable index : pdataTable.getIndexes()) {
+                if (index.getIndexState().equals(PIndexState.BUILDING)) {
+                    disableIndexes.add(index.getTableName().getString());
+                    disabledPIndexes.add(index);
+                    if (minDisableTimestamp > 
index.getIndexDisableTimestamp()) {
+                        minDisableTimestamp = index.getIndexDisableTimestamp();
+                        indexWithMinDisableTimestamp = index;
+                    }
+                }
+            }
+            
+            if (indexWithMinDisableTimestamp == null) {
+                throw new Exception("There is no index for a datatable to be 
rebuild:" + qDataTable);
+            }
+            if (minDisableTimestamp == 0) {
+                throw new Exception("It seems Index " + 
indexWithMinDisableTimestamp
+                        + " has disable timestamp as 0 , please run IndexTool 
with IndexName to build it first");
+                // TODO probably we can initiate the job by ourself or can 
skip them while making the list for partial build with a warning
+            }
+            
+            long maxTimestamp = getMaxRebuildAsyncDate(schemaName, 
disableIndexes);
+            
+            //serialize index maintaienr in job conf with Base64 TODO: Need to 
find better way to serialize them in conf.
+            List<IndexMaintainer> maintainers = 
Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
+            for (PTable index : disabledPIndexes) {
+                maintainers.add(index.getIndexMaintainer(pdataTable, 
connection.unwrap(PhoenixConnection.class)));
+            }
+            ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+            IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, 
disabledPIndexes, connection.unwrap(PhoenixConnection.class));
+            PhoenixConfigurationUtil.setIndexMaintainers(configuration, 
indexMetaDataPtr);
+            
+            //Prepare raw scan 
+            Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
+            scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
+            scan.setRaw(true);
+            scan.setCacheBlocks(false);
+            if (pdataTable.isTransactional()) {
+                long maxTimeRange = pdataTable.getTimeStamp() + 1;
+                scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
+                        
Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
+            }
+            
+          
+            String physicalTableName=pdataTable.getPhysicalName().getString();
+            final String jobName = String.format("Phoenix Indexes build for " 
+ pdataTable.getName().toString());
+            
+            PhoenixConfigurationUtil.setInputTableName(configuration, 
qDataTable);
+            PhoenixConfigurationUtil.setPhysicalTableName(configuration, 
physicalTableName);
+            
+            //TODO: update disable indexes
+            PhoenixConfigurationUtil.setDisableIndexes(configuration, 
StringUtils.join(",",disableIndexes));
+            
+            final Job job = Job.getInstance(configuration, jobName);
+                       if (outputPath != null) {
+                               FileOutputFormat.setOutputPath(job, outputPath);
+                       }
+            job.setJarByClass(IndexTool.class);
+            TableMapReduceUtil.initTableMapperJob(physicalTableName, scan, 
PhoenixIndexPartialBuildMapper.class, null,
+                    null, job);
+            TableMapReduceUtil.initCredentials(job);
+            TableInputFormat.configureSplitTable(job, 
TableName.valueOf(physicalTableName));
+            return configureSubmittableJobUsingDirectApi(job, true);
+        }
+        
+        private long getMaxRebuildAsyncDate(String schemaName, List<String> 
disableIndexes) throws SQLException {
+            Long maxRebuilAsyncDate=HConstants.LATEST_TIMESTAMP;
+            Long maxDisabledTimeStamp=0L;
+            if (disableIndexes == null || disableIndexes.isEmpty()) { return 
0; }
+            List<String> quotedIndexes = new 
ArrayList<String>(disableIndexes.size());
+            for (String index : disableIndexes) {
+                quotedIndexes.add("'" + index + "'");
+            }
+            ResultSet rs = connection.createStatement()
+                    .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + 
"),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " ("
+                            + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " + 
TABLE_SCHEM
+                            + (schemaName != null && schemaName.length() > 0 ? 
"='" + schemaName + "'" : " IS NULL")
+                            + " and " + TABLE_NAME + " IN (" + 
StringUtils.join(",", quotedIndexes) + ")");
+            if (rs.next()) {
+                maxRebuilAsyncDate = rs.getLong(1);
+                maxDisabledTimeStamp = rs.getLong(2);
+            }
+            // Do check if table is disabled again after user invoked async 
rebuilding during the run of the job
+            if (maxRebuilAsyncDate > maxDisabledTimeStamp) {
+                return maxRebuilAsyncDate;
+            } else {
+                throw new RuntimeException(
+                        "Inconsistent state we have one or more index tables 
which are disabled after the async is called!!");
             }
+            
+        }
 
+        private Job configureJobForAysncIndex(String schemaName, String 
indexTable, String dataTable, boolean useDirectApi)
+                throws Exception {
+            final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+            final String qIndexTable;
+            if (schemaName != null && !schemaName.isEmpty()) {
+                qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, 
indexTable);
+            } else {
+                qIndexTable = indexTable;
+            }
             final PTable pdataTable = PhoenixRuntime.getTable(connection, 
qDataTable);
+            
             final PTable pindexTable = PhoenixRuntime.getTable(connection, 
qIndexTable);
-
+            
+            long maxTimeRange = pindexTable.getTimeStamp() + 1;
             // this is set to ensure index tables remains consistent post 
population.
-            long maxTimeRange = pindexTable.getTimeStamp()+1;
+
             if (pdataTable.isTransactional()) {
                 configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
                     
Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
             }
             configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
                 Long.toString(maxTimeRange));
-
+            
             // check if the index type is LOCAL, if so, derive and set the 
physicalIndexName that is
             // computed from the qDataTable name.
             String physicalIndexTable = 
pindexTable.getPhysicalName().getString();
-            boolean isLocalIndexBuild = false;
-            if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
-                physicalIndexTable = qDataTable;
-                isLocalIndexBuild = true;
-            }
+            
 
             final PhoenixConnection pConnection = 
connection.unwrap(PhoenixConnection.class);
             final PostIndexDDLCompiler ddlCompiler =
@@ -224,18 +351,15 @@ public class IndexTool extends Configured implements Tool 
{
 
             configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, 
upsertQuery);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, 
physicalIndexTable);
-            PhoenixConfigurationUtil.setOutputTableName(configuration, 
indexTable);
+            PhoenixConfigurationUtil.setDisableIndexes(configuration, 
indexTable);
             PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
                 indexColumns.toArray(new String[indexColumns.size()]));
             final List<ColumnInfo> columnMetadataList =
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, 
indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList);
-
-            final Path outputPath = CsvBulkImportUtil
-                    .getOutputPath(new 
Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt())), physicalIndexTable);
             FileSystem.get(configuration).delete(outputPath, true);
             
-            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
dataTable, indexTable);
+            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
pdataTable.getName().toString(), indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
@@ -245,17 +369,146 @@ public class IndexTool extends Configured implements 
Tool {
                 selectQuery);
             TableMapReduceUtil.initCredentials(job);
             
-            boolean useDirectApi = 
cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
+            
             if (useDirectApi) {
-                configureSubmittableJobUsingDirectApi(job, outputPath,
-                    cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()));
+                return configureSubmittableJobUsingDirectApi(job, false);
             } else {
-                configureRunnableJobUsingBulkLoad(job, outputPath, 
isLocalIndexBuild);
-                // Without direct API, we need to update the index state to 
ACTIVE from client.
-                IndexToolUtil.updateIndexState(connection, qDataTable, 
indexTable,
-                        PIndexState.ACTIVE);
+                return configureRunnableJobUsingBulkLoad(job, outputPath);
+                
+            }
+            
+        }
+
+        /**
+         * Submits the job and waits for completion.
+         * @param job
+         * @param outputPath
+         * @return
+         * @throws Exception
+         */
+        private Job configureRunnableJobUsingBulkLoad(Job job, Path 
outputPath) throws Exception {
+            job.setMapperClass(PhoenixIndexImportMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            job.setMapOutputValueClass(KeyValue.class);
+            final Configuration configuration = job.getConfiguration();
+            final String physicalIndexTable =
+                    
PhoenixConfigurationUtil.getPhysicalTableName(configuration);
+            final HTable htable = new HTable(configuration, 
physicalIndexTable);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
+            return job;
+               
+        }
+        
+        /**
+         * Uses the HBase Front Door Api to write to index table. Submits the 
job and either returns or
+         * waits for the job completion based on runForeground parameter.
+         * 
+         * @param job
+         * @param outputPath
+         * @param runForeground - if true, waits for job completion, else 
submits and returns
+         *            immediately.
+         * @return
+         * @throws Exception
+         */
+        private Job configureSubmittableJobUsingDirectApi(Job job, boolean 
isPartialRebuild)
+                throws Exception {
+            if (!isPartialRebuild) {
+                //Don't configure mapper for partial build as it is configured 
already
+                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+            }
+            job.setReducerClass(PhoenixIndexImportDirectReducer.class);
+            Configuration conf = job.getConfiguration();
+            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+            // Set the Physical Table name for use in 
DirectHTableWriter#write(Mutation)
+            conf.set(TableOutputFormat.OUTPUT_TABLE,
+                
PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
+            //Set the Output classes
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            job.setMapOutputValueClass(IntWritable.class);
+            job.setOutputKeyClass(NullWritable.class);
+            job.setOutputValueClass(NullWritable.class);
+            TableMapReduceUtil.addDependencyJars(job);
+            job.setNumReduceTasks(1);
+            return job;
+        }
+        
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Connection connection = null;
+        HTable htable = null;
+        try {
+            CommandLine cmdLine = null;
+            try {
+                cmdLine = parseOptions(args);
+            } catch (IllegalStateException e) {
+                printHelpAndExit(e.getMessage(), getOptions());
+            }
+            final Configuration configuration = 
HBaseConfiguration.addHbaseResources(getConf());
+            final String schemaName = 
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+            final String dataTable = 
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+            final String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            final boolean isPartialBuild = 
cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+            final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+            boolean useDirectApi = 
cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
+            String 
basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+            boolean isForeground = 
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+            connection = ConnectionUtil.getInputConnection(configuration);
+            byte[][] splitKeysBeforeJob = null;
+            boolean isLocalIndexBuild = false;
+            PTable pindexTable = null;
+            if (indexTable != null) {
+                if (!isValidIndexTable(connection, qDataTable,indexTable)) {
+                    throw new IllegalArgumentException(String.format(
+                        " %s is not an index table for %s ", indexTable, 
qDataTable));
+                }
+                pindexTable = PhoenixRuntime.getTable(connection, schemaName 
!= null && !schemaName.isEmpty()
+                        ? SchemaUtil.getQualifiedTableName(schemaName, 
indexTable) : indexTable);
+                htable = 
(HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
+                        .getTable(pindexTable.getPhysicalName().getBytes());
+                if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+                    isLocalIndexBuild = true;
+                    splitKeysBeforeJob = 
htable.getRegionLocator().getStartKeys();
+                }
+            }
+            
+            PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, 
qDataTable);
+                       Path outputPath = null;
+                       if (basePath != null) {
+                               outputPath = 
CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null
+                                               ? 
pdataTable.getPhysicalName().getString() : 
pindexTable.getPhysicalName().getString());
+                               
FileSystem.get(configuration).delete(outputPath, true);
+                       }
+            
+            Job job = new JobFactory(connection, configuration, 
outputPath).getJob(schemaName, indexTable, dataTable,
+                    useDirectApi, isPartialBuild);
+            if (!isForeground && useDirectApi) {
+                LOG.info("Running Index Build in Background - Submit async and 
exit");
+                job.submit();
+                return 0;
+            }
+            LOG.info("Running Index Build in Foreground. Waits for the build 
to complete. This may take a long time!.");
+            boolean result = job.waitForCompletion(true);
+            
+            if (result) {
+                if (!useDirectApi && indexTable != null) {
+                    if (isLocalIndexBuild) {
+                        validateSplitForLocalIndex(splitKeysBeforeJob, htable);
+                    }
+                    LOG.info("Loading HFiles from {}", outputPath);
+                    LoadIncrementalHFiles loader = new 
LoadIncrementalHFiles(configuration);
+                    loader.doBulkLoad(outputPath, htable);
+                    htable.close();
+                    // Without direct API, we need to update the index state 
to ACTIVE from client.
+                    IndexToolUtil.updateIndexState(connection, qDataTable, 
indexTable, PIndexState.ACTIVE);
+                    FileSystem.get(configuration).delete(outputPath, true);
+                }
+                return 0;
+            } else {
+                LOG.error("IndexTool job failed! Check logs for errors..");
+                return -1;
             }
-            return 0;
         } catch (Exception ex) {
             LOG.error("An exception occurred while performing the indexing 
job: "
                     + ExceptionUtils.getMessage(ex) + " at:\n" + 
ExceptionUtils.getStackTrace(ex));
@@ -265,6 +518,9 @@ public class IndexTool extends Configured implements Tool {
                 if (connection != null) {
                     connection.close();
                 }
+                if (htable != null) {
+                    htable.close();
+                }
             } catch (SQLException sqle) {
                 LOG.error("Failed to close connection ", sqle.getMessage());
                 throw new RuntimeException("Failed to close connection");
@@ -272,91 +528,18 @@ public class IndexTool extends Configured implements Tool 
{
         }
     }
 
-    /**
-     * Submits the job and waits for completion.
-     * @param job
-     * @param outputPath
-     * @return
-     * @throws Exception
-     */
-    private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, 
boolean isLocalIndexBuild) throws Exception {
-        job.setMapperClass(PhoenixIndexImportMapper.class);
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        job.setMapOutputValueClass(KeyValue.class);
-        final Configuration configuration = job.getConfiguration();
-        final String physicalIndexTable =
-                PhoenixConfigurationUtil.getPhysicalTableName(configuration);
-        final HTable htable = new HTable(configuration, physicalIndexTable);
-        HFileOutputFormat.configureIncrementalLoad(job, htable);
-        byte[][] splitKeysBeforeJob = null;
-        if(isLocalIndexBuild) {
-            splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
-        }
-        boolean status = job.waitForCompletion(true);
-        if (!status) {
-            LOG.error("IndexTool job failed!");
-            htable.close();
-            throw new Exception("IndexTool job failed: " + job.toString());
-        } else {
-            if (isLocalIndexBuild
-                    && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, 
htable.getRegionLocator()
-                            .getStartKeys())) {
-                String errMsg = "The index to build is local index and the 
split keys are not matching"
-                        + " before and after running the job. Please rerun the 
job otherwise"
-                        + " there may be inconsistencies between actual data 
and index data";
-                LOG.error(errMsg);
-                throw new Exception(errMsg);
-            }
-        }
-
-        LOG.info("Loading HFiles from {}", outputPath);
-        LoadIncrementalHFiles loader = new 
LoadIncrementalHFiles(configuration);
-        loader.doBulkLoad(outputPath, htable);
-        htable.close();
-        
-        FileSystem.get(configuration).delete(outputPath, true);
-    }
     
-    /**
-     * Uses the HBase Front Door Api to write to index table. Submits the job 
and either returns or
-     * waits for the job completion based on runForeground parameter.
-     * 
-     * @param job
-     * @param outputPath
-     * @param runForeground - if true, waits for job completion, else submits 
and returns
-     *            immediately.
-     * @return
-     * @throws Exception
-     */
-    private void configureSubmittableJobUsingDirectApi(Job job, Path 
outputPath, boolean runForeground)
-            throws Exception {
-        job.setMapperClass(PhoenixIndexImportDirectMapper.class);
-        job.setReducerClass(PhoenixIndexImportDirectReducer.class);
-        Configuration conf = job.getConfiguration();
-        HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-        // Set the Physical Table name for use in 
DirectHTableWriter#write(Mutation)
-        conf.set(TableOutputFormat.OUTPUT_TABLE,
-            
PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
-        
-        //Set the Output classes
-        job.setMapOutputValueClass(IntWritable.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(NullWritable.class);
-        TableMapReduceUtil.addDependencyJars(job);
-        job.setNumReduceTasks(1);
-
-        if (!runForeground) {
-            LOG.info("Running Index Build in Background - Submit async and 
exit");
-            job.submit();
-            return;
-        }
-        LOG.info("Running Index Build in Foreground. Waits for the build to 
complete. This may take a long time!.");
-        boolean result = job.waitForCompletion(true);
-        if (!result) {
-            LOG.error("IndexTool job failed!");
-            throw new Exception("IndexTool job failed: " + job.toString());
+
+    private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, 
HTable htable) throws Exception {
+        if (splitKeysBeforeJob != null
+                && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, 
htable.getRegionLocator().getStartKeys())) {
+            String errMsg = "The index to build is local index and the split 
keys are not matching"
+                    + " before and after running the job. Please rerun the job 
otherwise"
+                    + " there may be inconsistencies between actual data and 
index data";
+            LOG.error(errMsg);
+            throw new Exception(errMsg);
         }
-        FileSystem.get(conf).delete(outputPath, true);
+        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
index 1058670..2dc7551 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
@@ -48,11 +48,13 @@ public class IndexToolUtil {
         */
        public static void updateIndexState(Configuration 
configuration,PIndexState state) throws SQLException {
                final String masterTable = 
PhoenixConfigurationUtil.getInputTableName(configuration);
-               final String indexTable = 
PhoenixConfigurationUtil.getOutputTableName(configuration);
+               final String[] indexTables = 
PhoenixConfigurationUtil.getDisableIndexes(configuration).split(",");
                final Properties overrideProps = new Properties();
                final Connection connection = 
ConnectionUtil.getOutputConnection(configuration, overrideProps);
                try {
-                       updateIndexState(connection, masterTable, indexTable , 
state);
+            for (String indexTable : indexTables) {
+                updateIndexState(connection, masterTable, indexTable, state);
+            }
                } finally {
                        if(connection != null) {
                                connection.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index 9c64efc..15e55dd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -99,7 +99,7 @@ public class PhoenixIndexImportDirectMapper extends
             this.pStatement = connection.prepareStatement(upsertQuery);
 
         } catch (SQLException e) {
-            throw new RuntimeException(e.getMessage());
+            throw new RuntimeException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
new file mode 100644
index 0000000..47a38a7
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapper that hands over rows from data table to the index table.
+ */
+public class PhoenixIndexPartialBuildMapper extends 
TableMapper<ImmutableBytesWritable, IntWritable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixIndexPartialBuildMapper.class);
+
+    private PhoenixConnection connection;
+
+    private DirectHTableWriter writer;
+
+    private int batchSize;
+
+    private List<Mutation> mutations ;
+    
+    private ImmutableBytesPtr maintainers;
+
+    @Override
+    protected void setup(final Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        final Configuration configuration = context.getConfiguration();
+        writer = new DirectHTableWriter(configuration);
+
+        try {
+            final Properties overrideProps = new Properties();
+            String scn = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            String txScnValue = 
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+            if(txScnValue==null && scn!=null) {
+                overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+            }
+            connection = ConnectionUtil.getOutputConnection(configuration, 
overrideProps).unwrap(PhoenixConnection.class);
+            connection.setAutoCommit(false);
+            // Get BatchSize
+            ConnectionQueryServices services = connection.getQueryServices();
+            int maxSize =
+                    
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                        QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+            batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+            LOG.info("Mutation Batch Size = " + batchSize);
+            this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
+            maintainers=new 
ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage());
+        } 
+    }
+
+    @Override
+    protected void map(ImmutableBytesWritable row, Result value, Context 
context)
+            throws IOException, InterruptedException {
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+        try {
+            byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(maintainers);
+            byte[] uuidValue = ServerCacheClient.generateId();
+            Put put = null;
+            Delete del = null;
+            for (Cell cell : value.rawCells()) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) == 
KeyValue.Type.Put) {
+                    if (put == null) {
+                        put = new Put(CellUtil.cloneRow(cell));
+                        put.setAttribute(PhoenixIndexCodec.INDEX_UUID, 
uuidValue);
+                        put.setAttribute(PhoenixIndexCodec.INDEX_MD, 
attribValue);
+                        
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                        mutations.add(put);
+                    }
+                    put.add(cell);
+                } else {
+                    if (del == null) {
+                        del = new Delete(CellUtil.cloneRow(cell));
+                        del.setAttribute(PhoenixIndexCodec.INDEX_UUID, 
uuidValue);
+                        del.setAttribute(PhoenixIndexCodec.INDEX_MD, 
attribValue);
+                        
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                        mutations.add(del);
+                    }
+                    del.addDeleteMarker(cell);
+                }
+            }
+            // Write Mutation Batch
+            if 
(context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 
0) {
+                writeBatch(mutations, context);
+                mutations.clear();
+            }
+            // Make sure progress is reported to Application Master.
+            context.progress();
+        } catch (SQLException e) {
+            LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void writeBatch(List<Mutation> mutations, Context context)
+            throws IOException, SQLException, InterruptedException {
+        writer.write(mutations);
+        
context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(mutations.size());
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        try {
+            // Write the last & final Mutation Batch
+            if (!mutations.isEmpty()) {
+                writeBatch(mutations, context);
+            }
+            // We are writing some dummy key-value as map output here so that 
we commit only one
+            // output to reducer.
+            context.write(new 
ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+                new IntWritable(0));
+            super.cleanup(context);
+        } catch (SQLException e) {
+            LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            throw new RuntimeException(e);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
+                        e.getMessage());
+                }
+            }
+            if (writer != null) {
+                writer.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 2264acd..f3e4450 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -30,7 +30,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -98,6 +100,12 @@ public final class PhoenixConfigurationUtil {
     
     public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = 
"phoneix.mapreduce.output.cluster.quorum";
 
+    public static final String INDEX_DISABLED_TIMESTAMP_VALUE = 
"phoenix.mr.index.disableTimestamp";
+
+    public static final String INDEX_MAINTAINERS = 
"phoenix.mr.index.maintainers";
+    
+    public static final String DISABLED_INDEXES = 
"phoenix.mr.index.disabledIndexes";
+
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -428,4 +436,27 @@ public final class PhoenixConfigurationUtil {
     
         return ReflectionUtils.newInstance(processorClass, conf);
     }
+
+    public static byte[] getIndexMaintainers(final Configuration 
configuration){
+        Preconditions.checkNotNull(configuration);
+        return Base64.decode(configuration.get(INDEX_MAINTAINERS));
+    }
+    
+    public static void setIndexMaintainers(final Configuration configuration,
+            final ImmutableBytesWritable indexMetaDataPtr) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(indexMetaDataPtr);
+        configuration.set(INDEX_MAINTAINERS, 
Base64.encodeBytes(indexMetaDataPtr.get()));
+    }
+    
+    public static void setDisableIndexes(Configuration configuration, String 
indexName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(indexName);
+        configuration.set(DISABLED_INDEXES, indexName);
+    }
+    
+    public static String getDisableIndexes(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(DISABLED_INDEXES);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
index fcf817a..11328c2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
@@ -23,12 +23,14 @@ public class AlterIndexStatement extends 
SingleTableStatement {
     private final String dataTableName;
     private final boolean ifExists;
     private final PIndexState indexState;
+    private boolean async;
 
-    public AlterIndexStatement(NamedTableNode indexTableNode, String 
dataTableName, boolean ifExists, PIndexState indexState) {
+    public AlterIndexStatement(NamedTableNode indexTableNode, String 
dataTableName, boolean ifExists, PIndexState indexState, boolean async) {
         super(indexTableNode,0);
         this.dataTableName = dataTableName;
         this.ifExists = ifExists;
         this.indexState = indexState;
+        this.async = async;
     }
 
     public String getTableName() {
@@ -48,4 +50,8 @@ public class AlterIndexStatement extends SingleTableStatement 
{
         return indexState;
     }
 
+    public boolean isAsync() {
+        return async;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 0c60e39..4b65c6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -374,8 +374,12 @@ public class ParseNodeFactory {
         return new DropIndexStatement(indexName, tableName, ifExists);
     }
 
+    public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, 
String dataTableName, boolean ifExists, PIndexState state, boolean async) {
+        return new AlterIndexStatement(indexTableNode, dataTableName, 
ifExists, state, async);
+    }
+    
     public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, 
String dataTableName, boolean ifExists, PIndexState state) {
-        return new AlterIndexStatement(indexTableNode, dataTableName, 
ifExists, state);
+        return new AlterIndexStatement(indexTableNode, dataTableName, 
ifExists, state, false);
     }
 
     public TraceStatement trace(boolean isTraceOn, double samplingRate) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 4a565be..1e002d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -127,10 +127,13 @@ public interface QueryServices extends SQLCloseable {
 
     // A master switch if to enable auto rebuild an index which failed to be 
updated previously
     public static final String INDEX_FAILURE_HANDLING_REBUILD_ATTRIB = 
"phoenix.index.failure.handling.rebuild";
+    public static final String INDEX_FAILURE_HANDLING_REBUILD_PERIOD = 
"phoenix.index.failure.handling.rebuild.period";
 
     // Time interval to check if there is an index needs to be rebuild
     public static final String INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB =
         "phoenix.index.failure.handling.rebuild.interval";
+    
+    public static final String 
INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = 
"phoenix.index.rebuild.batch.perTable";
 
     // A master switch if to block writes when index build failed
     public static final String INDEX_FAILURE_BLOCK_WRITE = 
"phoenix.index.failure.block.write";
@@ -227,6 +230,7 @@ public interface QueryServices extends SQLCloseable {
     
     public static final String CLIENT_CACHE_ENCODING = 
"phoenix.table.client.cache.encoding";
     public static final String AUTO_UPGRADE_ENABLED = 
"phoenix.autoupgrade.enabled";
+       
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 7f183e9..a3b9b32 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -27,6 +27,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_CREATED_DATE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
@@ -304,16 +305,27 @@ public class MetaDataClient {
                     TENANT_ID + "," +
                     TABLE_SCHEM + "," +
                     TABLE_NAME + "," +
-                    INDEX_STATE +
+                    INDEX_STATE + "," +
+                    ASYNC_REBUILD_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName() +
+                    ") VALUES (?, ?, ?, ?, ?)";
+    
+    private static final String UPDATE_INDEX_REBUILD_ASYNC_STATE =
+            "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
+                    TENANT_ID + "," +
+                    TABLE_SCHEM + "," +
+                    TABLE_NAME + "," +
+                    ASYNC_REBUILD_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName() +
                     ") VALUES (?, ?, ?, ?)";
+    
     private static final String UPDATE_INDEX_STATE_TO_ACTIVE =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
                     TABLE_SCHEM + "," +
                     TABLE_NAME + "," +
                     INDEX_STATE + "," +
-                    INDEX_DISABLE_TIMESTAMP +
-                    ") VALUES (?, ?, ?, ?, ?)";
+                    INDEX_DISABLE_TIMESTAMP +","+
+                    ASYNC_REBUILD_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName() +
+                    ") VALUES (?, ?, ?, ?, ?, ?)";
     //TODO: merge INSERT_COLUMN_CREATE_TABLE and INSERT_COLUMN_ALTER_TABLE 
column when
     // the new major release is out.
     private static final String INSERT_COLUMN_CREATE_TABLE =
@@ -3469,7 +3481,13 @@ public class MetaDataClient {
             String dataTableName = statement.getTableName();
             String schemaName = statement.getTable().getName().getSchemaName();
             String indexName = statement.getTable().getName().getTableName();
+            boolean isAsync = statement.isAsync();
             PIndexState newIndexState = statement.getIndexState();
+            if (isAsync && newIndexState != PIndexState.REBUILD) { throw new 
SQLExceptionInfo.Builder(
+                    SQLExceptionCode.ASYNC_NOT_ALLOWED)
+                            .setMessage(" ASYNC building of index is allowed 
only with REBUILD index state")
+                            
.setSchemaName(schemaName).setTableName(indexName).build().buildException(); }
+
             if (newIndexState == PIndexState.REBUILD) {
                 newIndexState = PIndexState.BUILDING;
             }
@@ -3480,15 +3498,16 @@ public class MetaDataClient {
             try {
                 if(newIndexState == PIndexState.ACTIVE){
                     tableUpsert = 
connection.prepareStatement(UPDATE_INDEX_STATE_TO_ACTIVE);
-                } else {
+                }else{
                     tableUpsert = 
connection.prepareStatement(UPDATE_INDEX_STATE);
                 }
                 tableUpsert.setString(1, connection.getTenantId() == null ? 
null : connection.getTenantId().getString());
                 tableUpsert.setString(2, schemaName);
                 tableUpsert.setString(3, indexName);
                 tableUpsert.setString(4, newIndexState.getSerializedValue());
+                tableUpsert.setLong(5, 0);
                 if(newIndexState == PIndexState.ACTIVE){
-                    tableUpsert.setLong(5, 0);
+                    tableUpsert.setLong(6, 0);
                 }
                 tableUpsert.execute();
             } finally {
@@ -3515,9 +3534,25 @@ public class MetaDataClient {
                     addTableToCache(result);
                     // Set so that we get the table below with the potentially 
modified rowKeyOrderOptimizable flag set
                     indexRef.setTable(result.getTable());
+                    if (newIndexState == PIndexState.BUILDING && isAsync) {
+                        try {
+                            tableUpsert = 
connection.prepareStatement(UPDATE_INDEX_REBUILD_ASYNC_STATE);
+                            tableUpsert.setString(1,
+                                    connection.getTenantId() == null ? null : 
connection.getTenantId().getString());
+                            tableUpsert.setString(2, schemaName);
+                            tableUpsert.setString(3, indexName);
+                            tableUpsert.setLong(4, 
result.getTable().getTimeStamp());
+                            tableUpsert.execute();
+                            connection.commit();
+                        } finally {
+                            if (tableUpsert != null) {
+                                tableUpsert.close();
+                            }
+                        }
+                    }
                 }
             }
-            if (newIndexState == PIndexState.BUILDING) {
+            if (newIndexState == PIndexState.BUILDING && !isAsync) {
                 PTable index = indexRef.getTable();
                 // First delete any existing rows of the index
                 Long scn = connection.getSCN();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 9622880..2a0f3b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -42,9 +42,13 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
@@ -54,6 +58,10 @@ import org.apache.phoenix.compile.IndexStatementRewriter;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector;
@@ -67,10 +75,12 @@ import 
org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -78,6 +88,7 @@ import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
@@ -87,12 +98,13 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.tephra.TxConstants;
 
 import com.google.common.collect.Lists;
-
-import org.apache.tephra.TxConstants;
+import com.google.protobuf.ServiceException;
 
 public class IndexUtil {
     public static final String INDEX_COLUMN_NAME_SEP = ":";
@@ -673,13 +685,48 @@ public class IndexUtil {
             HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
 
+    public static MetaDataMutationResult disableIndexWithTimestamp(String 
indexTableName, long minTimeStamp,
+            HTableInterface metaTable, boolean blockWriteRebuildIndex) throws 
ServiceException, Throwable {
+        byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
+        // Mimic the Put that gets generated by the client on an update of the 
index state
+        Put put = new Put(indexTableKey);
+        if (blockWriteRebuildIndex)
+            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                    PIndexState.ACTIVE.getSerializedBytes());
+        else
+            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                    PIndexState.DISABLE.getSerializedBytes());
+        put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+                PLong.INSTANCE.toBytes(minTimeStamp));
+        put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES,
+                PLong.INSTANCE.toBytes(0));
+        final List<Mutation> tableMetadata = Collections.<Mutation> 
singletonList(put);
+
+        final Map<byte[], MetaDataResponse> results = 
metaTable.coprocessorService(MetaDataService.class, indexTableKey,
+                indexTableKey, new Batch.Call<MetaDataService, 
MetaDataResponse>() {
+                    @Override
+                    public MetaDataResponse call(MetaDataService instance) 
throws IOException {
+                        ServerRpcController controller = new 
ServerRpcController();
+                        BlockingRpcCallback<MetaDataResponse> rpcCallback = 
new BlockingRpcCallback<MetaDataResponse>();
+                        UpdateIndexStateRequest.Builder builder = 
UpdateIndexStateRequest.newBuilder();
+                        for (Mutation m : tableMetadata) {
+                            MutationProto mp = ProtobufUtil.toProto(m);
+                            
builder.addTableMetadataMutations(mp.toByteString());
+                        }
+                        instance.updateIndexState(controller, builder.build(), 
rpcCallback);
+                        if (controller.getFailedOn() != null) { throw 
controller.getFailedOn(); }
+                        return rpcCallback.get();
+                    }
+                });
+        if (results.isEmpty()) { throw new IOException("Didn't get expected 
result size"); }
+        MetaDataResponse tmpResponse = results.values().iterator().next();
+        return MetaDataMutationResult.constructFromProto(tmpResponse);
+    }
+
     public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] 
splitKeys2) throws IOException {
-        if (splitKeys1 != null && splitKeys2 != null
-                && splitKeys1.length == splitKeys2.length) {
+        if (splitKeys1 != null && splitKeys2 != null && splitKeys1.length == 
splitKeys2.length) {
             for (int i = 0; i < splitKeys1.length; i++) {
-                if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) {
-                    return false;
-                }
+                if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) { 
return false; }
             }
         } else {
             return false;

Reply via email to