This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit e5ba2e989fad4e3f1de6734a400502cb586d5e8a
Author: Kadir <kozde...@salesforce.com>
AuthorDate: Tue Jan 29 17:14:02 2019 -0800

    PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong 
timestamps
    
    Signed-off-by: Geoffrey Jacoby <gjac...@apache.org>
---
 .../phoenix/end2end/IndexBuildTimestampIT.java     | 246 ++++++++++++
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  37 ++
 .../phoenix/end2end/TableDDLPermissionsIT.java     |   8 -
 .../org/apache/phoenix/rpc/PhoenixServerRpcIT.java |   6 -
 .../phoenix/compile/ServerBuildIndexCompiler.java  | 138 +++++++
 .../org/apache/phoenix/index/IndexMaintainer.java  | 433 ++++++++++-----------
 .../phoenix/mapreduce/PhoenixInputFormat.java      |   3 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java     |   4 +-
 .../PhoenixServerBuildIndexInputFormat.java        | 111 ++++++
 .../apache/phoenix/mapreduce/index/IndexTool.java  | 241 +++++++-----
 .../index/PhoenixServerBuildIndexMapper.java       |  75 ++++
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  25 ++
 .../mapreduce/util/PhoenixMapReduceUtil.java       |  27 ++
 .../org/apache/phoenix/schema/MetaDataClient.java  |  16 +-
 14 files changed, 1032 insertions(+), 338 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
new file mode 100644
index 0000000..7efba07
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class IndexBuildTimestampIT extends BaseUniqueNamesOwnClusterIT {
+    private final boolean localIndex;
+    private final boolean async;
+    private final boolean view;
+    private final String tableDDLOptions;
+
+    public IndexBuildTimestampIT(boolean mutable, boolean localIndex,
+                            boolean async, boolean view) {
+        this.localIndex = localIndex;
+        this.async = async;
+        this.view = view;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (!mutable) {
+            optionBuilder.append(" IMMUTABLE_ROWS=true ");
+        }
+        optionBuilder.append(" SPLIT ON(1,2)");
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        IndexToolIT.setup();
+    }
+
+    @Parameters(
+            name = "mutable={0},localIndex={1},async={2},view={3}")
+    public static Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(8);
+        boolean[] Booleans = new boolean[]{false, true};
+        for (boolean mutable : Booleans) {
+            for (boolean localIndex : Booleans) {
+                for (boolean async : Booleans) {
+                    for (boolean view : Booleans) {
+                        list.add(new Object[]{mutable, localIndex, async, 
view});
+                    }
+                }
+            }
+        }
+        return list;
+    }
+
+    public static void assertExplainPlan(Connection conn, boolean localIndex, 
String selectSql,
+                                         String dataTableFullName, String 
indexTableFullName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + 
selectSql);
+        String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+        IndexToolIT.assertExplainPlan(localIndex, actualExplainPlan, 
dataTableFullName, indexTableFullName);
+    }
+
+    private class MyClock extends EnvironmentEdge {
+        long initialTime;
+        long delta;
+
+        public MyClock(long delta) {
+            initialTime = System.currentTimeMillis() + delta;
+            this.delta = delta;
+        }
+
+        @Override
+        public long currentTime() {
+            return System.currentTimeMillis() + delta;
+        }
+
+        public long initialTime() {
+            return initialTime;
+        }
+    }
+
+    private void populateTable(String tableName, MyClock clock1, MyClock 
clock2) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + tableName +
+                " (id varchar(10) not null primary key, val varchar(10), ts 
timestamp)" + tableDDLOptions);
+
+        EnvironmentEdgeManager.injectEdge(clock1);
+        conn.createStatement().execute("upsert into " + tableName + " values 
('aaa', 'abc', current_date())");
+        conn.commit();
+
+        EnvironmentEdgeManager.injectEdge(clock2);
+        conn.createStatement().execute("upsert into " + tableName + " values 
('bbb', 'bcd', current_date())");
+        conn.commit();
+        conn.close();
+
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(clock1.initialTime()));
+        conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("select * from " + 
tableName);
+        assertFalse(rs.next());
+        conn.close();
+
+        props.setProperty("CurrentSCN", Long.toString(clock2.initialTime()));
+        conn = DriverManager.getConnection(getUrl(), props);
+        rs = conn.createStatement().executeQuery("select * from " + tableName);
+
+        assertTrue(rs.next());
+        assertEquals("aaa", rs.getString(1));
+        assertEquals("abc", rs.getString(2));
+        assertNotNull(rs.getDate(3));
+
+        assertFalse(rs.next());
+        conn.close();
+
+        props.setProperty("CurrentSCN", Long.toString(clock2.currentTime()));
+        conn = DriverManager.getConnection(getUrl(), props);
+        rs = conn.createStatement().executeQuery("select * from " + tableName);
+
+        assertTrue(rs.next());
+        assertEquals("aaa", rs.getString(1));
+        assertEquals("abc", rs.getString(2));
+        assertNotNull(rs.getDate(3));
+
+        assertTrue(rs.next());
+        assertEquals("bbb", rs.getString(1));
+        assertEquals("bcd", rs.getString(2));
+        assertNotNull(rs.getDate(3));
+        assertFalse(rs.next());
+        conn.close();
+    }
+
+    @Test
+    public void testCellTimestamp() throws Exception {
+        EnvironmentEdgeManager.reset();
+        MyClock clock1 = new MyClock(100000);
+        MyClock clock2 = new MyClock(200000);
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName, clock1, clock2);
+
+        MyClock clock3 = new MyClock(300000);
+        EnvironmentEdgeManager.injectEdge(clock3);
+
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String viewName = null;
+        if (view) {
+            viewName = generateUniqueName();
+            conn.createStatement().execute("CREATE VIEW "+ viewName + " AS 
SELECT * FROM " +
+                    dataTableName);
+        }
+        String indexName = generateUniqueName();
+        conn.createStatement().execute("CREATE "+ (localIndex ? "LOCAL " : "") 
+ " INDEX " + indexName + " on " +
+                (view ? viewName : dataTableName) + " (val) include (ts)" + 
(async ? "ASYNC" : ""));
+
+        conn.close();
+
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, (view ? viewName : 
dataTableName), indexName);
+        }
+
+        // Verify the index timestamps via Phoenix
+        String selectSql = String.format("SELECT * FROM %s WHERE val = 'abc'", 
(view ? viewName : dataTableName));
+        conn = DriverManager.getConnection(getUrl());
+        // assert we are pulling from index table
+        assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? 
"_IDX_" + dataTableName : indexName));
+        ResultSet rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue (rs.next());
+        
assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp()
 < clock2.initialTime() &&
+                
rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= 
clock1.initialTime());
+
+        selectSql =
+                String.format("SELECT * FROM %s WHERE val = 'bcd'", (view ? 
viewName : dataTableName));
+        // assert we are pulling from index table
+        assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? 
"_IDX_" + dataTableName : indexName));
+
+        rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue (rs.next());
+        
assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp()
 < clock3.initialTime() &&
+                        
rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= 
clock2.initialTime()
+                );
+        assertFalse (rs.next());
+
+        // Verify the index timestamps via HBase
+        PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName);
+        Table table = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(pIndexTable.getPhysicalName().getBytes());
+
+        Scan scan = new Scan();
+        scan.setTimeRange(clock3.initialTime(), clock3.currentTime());
+        ResultScanner scanner = table.getScanner(scan);
+        assertTrue(scanner.next() == null);
+
+
+        scan = new Scan();
+        scan.setTimeRange(clock2.initialTime(), clock3.initialTime());
+        scanner = table.getScanner(scan);
+        assertTrue(scanner.next() != null);
+
+
+        scan = new Scan();
+        scan.setTimeRange(clock1.initialTime(), clock2.initialTime());
+        scanner = table.getScanner(scan);
+        assertTrue(scanner.next() != null);
+        conn.close();
+        EnvironmentEdgeManager.reset();
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 9d6f881..aaf9509 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -47,11 +47,17 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -67,6 +73,7 @@ import org.junit.runners.Parameterized.Parameters;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+
 @RunWith(Parameterized.class)
 @Category(NeedsOwnMiniClusterTest.class)
 public class IndexToolIT extends ParallelStatsEnabledIT {
@@ -488,6 +495,32 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
         runIndexTool(directApi, useSnapshot, schemaName, dataTableName, 
indexTableName, new String[0]);
     }
 
+    private static void verifyMapper(Job job, boolean directApi, boolean 
useSnapshot, String schemaName,
+                                  String dataTableName, String indexTableName, 
String tenantId) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        if (tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        try (Connection conn =
+                     DriverManager.getConnection(getUrl(), props)) {
+            PTable dataTable = PhoenixRuntime.getTable(conn, 
SchemaUtil.getTableName(schemaName, dataTableName));
+            PTable indexTable = PhoenixRuntime.getTable(conn, 
SchemaUtil.getTableName(schemaName, indexTableName));
+            boolean transactional = dataTable.isTransactional();
+            boolean localIndex = 
PTable.IndexType.LOCAL.equals(indexTable.getIndexType());
+
+            if (directApi) {
+                if ((localIndex || !transactional) && !useSnapshot) {
+                    assertEquals(job.getMapperClass(), 
PhoenixServerBuildIndexMapper.class);
+                } else {
+                    assertEquals(job.getMapperClass(), 
PhoenixIndexImportDirectMapper.class);
+                }
+            }
+            else {
+                assertEquals(job.getMapperClass(), 
PhoenixIndexImportMapper.class);
+            }
+        }
+    }
+
     public static void runIndexTool(boolean directApi, boolean useSnapshot, 
String schemaName,
             String dataTableName, String indexTableName, String... 
additionalArgs) throws Exception {
         runIndexTool(directApi, useSnapshot, schemaName, dataTableName, 
indexTableName, null, 0, additionalArgs);
@@ -505,6 +538,10 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
         List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
         cmdArgList.addAll(Arrays.asList(additionalArgs));
         int status = indexingTool.run(cmdArgList.toArray(new 
String[cmdArgList.size()]));
+
+        if (expectedStatus == 0) {
+            verifyMapper(indexingTool.getJob(), directApi, useSnapshot, 
schemaName, dataTableName, indexTableName, tenantId);
+        }
         assertEquals(expectedStatus, status);
     }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
index 8666bb8..f4b4b64 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
@@ -197,14 +197,7 @@ public class TableDDLPermissionsIT extends 
BasePermissionsIT {
 
             // we should be able to read the data from another index as well 
to which we have not given any access to
             // this user
-            verifyAllowed(createIndex(indexName2, phoenixTableName), 
unprivilegedUser);
             verifyAllowed(readTable(phoenixTableName, indexName1), 
unprivilegedUser);
-            verifyAllowed(readTable(phoenixTableName, indexName2), 
unprivilegedUser);
-            verifyAllowed(rebuildIndex(indexName2, phoenixTableName), 
unprivilegedUser);
-
-            // data table user should be able to read new index
-            verifyAllowed(rebuildIndex(indexName2, phoenixTableName), 
regularUser1);
-            verifyAllowed(readTable(phoenixTableName, indexName2), 
regularUser1);
 
             verifyAllowed(readTable(phoenixTableName), regularUser1);
             verifyAllowed(rebuildIndex(indexName1, phoenixTableName), 
regularUser1);
@@ -213,7 +206,6 @@ public class TableDDLPermissionsIT extends 
BasePermissionsIT {
             verifyAllowed(dropView(viewName1), regularUser1);
             verifyAllowed(dropView(viewName2), regularUser1);
             verifyAllowed(dropColumn(phoenixTableName, "val1"), regularUser1);
-            verifyAllowed(dropIndex(indexName2, phoenixTableName), 
regularUser1);
             verifyAllowed(dropIndex(indexName1, phoenixTableName), 
regularUser1);
             verifyAllowed(dropTable(phoenixTableName), regularUser1);
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 1c18667..ab05c16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -140,12 +140,6 @@ public class PhoenixServerRpcIT extends 
BaseUniqueNamesOwnClusterIT {
             assertEquals("k1", rs.getString(1));
             assertEquals("v2", rs.getString(2));
             assertFalse(rs.next());
-            
-            TestPhoenixIndexRpcSchedulerFactory.reset();
-            createIndex(conn, indexName + "_1");
-            // Verify that that index queue is not used since running upsert 
select on server side has been disabled
-            // See PHOENIX-4171
-            
Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), 
Mockito.never()).dispatch(Mockito.any(CallRunner.class));
         }
         finally {
             conn.close();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
new file mode 100644
index 0000000..7d1c1b4
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.StringUtil;
+
+import com.google.common.collect.Lists;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+
+/**
+ * Class that compiles plan to generate initial data values after a DDL 
command for
+ * index table.
+ */
+public class ServerBuildIndexCompiler {
+    private final PhoenixConnection connection;
+    private final String tableName;
+    private PTable dataTable;
+    private QueryPlan plan;
+
+    private class RowCountMutationPlan extends BaseMutationPlan {
+        private RowCountMutationPlan(StatementContext context, 
PhoenixStatement.Operation operation) {
+            super(context, operation);
+        }
+        @Override
+        public MutationState execute() throws SQLException {
+            connection.getMutationState().commitDDLFence(dataTable);
+            Tuple tuple = plan.iterator().next();
+            long rowCount = 0;
+            if (tuple != null) {
+                Cell kv = tuple.getValue(0);
+                ImmutableBytesWritable tmpPtr = new 
ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), 
kv.getValueLength());
+                // A single Cell will be returned with the count(*) - we 
decode that here
+                rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, 
SortOrder.getDefault());
+            }
+            // The contract is to return a MutationState that contains the 
number of rows modified. In this
+            // case, it's the number of rows in the data table which 
corresponds to the number of index
+            // rows that were added.
+            return new MutationState(0, 0, connection, rowCount);
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return plan;
+        }
+    };
+    
+    public ServerBuildIndexCompiler(PhoenixConnection connection, String 
tableName) {
+        this.connection = connection;
+        this.tableName = tableName;
+    }
+
+    public MutationPlan compile(PTable index) throws SQLException {
+        try (final PhoenixStatement statement = new 
PhoenixStatement(connection)) {
+            String query = "SELECT count(*) FROM " + tableName;
+            this.plan = statement.compileQuery(query);
+            TableRef tableRef = plan.getTableRef();
+            Scan scan = plan.getContext().getScan();
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            dataTable = tableRef.getTable();
+            if (index.getIndexType() == PTable.IndexType.GLOBAL &&  
dataTable.isTransactional()) {
+                throw new IllegalArgumentException(
+                        "ServerBuildIndexCompiler does not support global 
indexes on transactional tables");
+            }
+            IndexMaintainer.serialize(dataTable, ptr, 
Collections.singletonList(index), plan.getContext().getConnection());
+            // Set the scan attributes that UngroupedAggregateRegionObserver 
will switch on.
+            // For local indexes, the 
BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
+            // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute 
is set to the serialized form of index
+            // metadata to build index rows from data table rows. For global 
indexes, we also need to set (1) the
+            // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to 
signal UngroupedAggregateRegionObserver
+            // that this scan is for building global indexes and (2) the 
MetaDataProtocol.PHOENIX_VERSION attribute
+            // that will be passed as a mutation attribute for the scanned 
mutations that will be applied on
+            // the index table possibly remotely
+            if (index.getIndexType() == PTable.IndexType.LOCAL) {
+                
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+            } else {
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+                scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
+                ScanUtil.setClientVersion(scan, 
MetaDataProtocol.PHOENIX_VERSION);
+            }
+            // By default, we'd use a FirstKeyOnly filter as nothing else 
needs to be projected for count(*).
+            // However, in this case, we need to project all of the data 
columns that contribute to the index.
+            IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(dataTable, connection);
+            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+                if (index.getImmutableStorageScheme() == 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    scan.addFamily(columnRef.getFamily());
+                } else {
+                    scan.addColumn(columnRef.getFamily(), 
columnRef.getQualifier());
+                }
+            }
+
+            if (dataTable.isTransactional()) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
connection.getMutationState().encodeTransaction());
+            }
+
+            // Go through MutationPlan abstraction so that we can create local 
indexes
+            // with a connectionless connection (which makes testing easier).
+            return new RowCountMutationPlan(plan.getContext(), 
PhoenixStatement.Operation.UPSERT);
+        }
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index bc2523d..d94d187 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -119,7 +119,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
- * 
+ *
  * Class that builds index row key from data row key and current state of
  * row and caches any covered columns. Client-side serializes into byte array 
using 
  * @link #serialize(PTable, ImmutableBytesWritable)}
@@ -127,15 +127,15 @@ import com.google.common.collect.Sets;
  * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_PROTO_MD}
  * Mutation attribute or as a separate RPC call using 
  * {@link org.apache.phoenix.cache.ServerCacheClient})
- * 
- * 
+ *
+ *
  * @since 2.1.0
  */
 public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
 
     private static final int EXPRESSION_NOT_PRESENT = -1;
     private static final int ESTIMATED_EXPRESSION_SIZE = 8;
-    
+
     public static IndexMaintainer create(PTable dataTable, PTable index, 
PhoenixConnection connection) {
         if (dataTable.getType() == PTableType.INDEX || index.getType() != 
PTableType.INDEX || !dataTable.getIndexes().contains(index)) {
             throw new IllegalArgumentException();
@@ -143,7 +143,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         IndexMaintainer maintainer = new IndexMaintainer(dataTable, index, 
connection);
         return maintainer;
     }
-    
+
     private static boolean sendIndexMaintainer(PTable index) {
         PIndexState indexState = index.getIndexState();
         return ! ( PIndexState.DISABLE == indexState || 
PIndexState.PENDING_ACTIVE == indexState );
@@ -157,7 +157,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
         });
     }
-    
+
     public static Iterator<PTable> maintainedGlobalIndexes(Iterator<PTable> 
indexes) {
         return Iterators.filter(indexes, new Predicate<PTable>() {
             @Override
@@ -166,7 +166,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
         });
     }
-    
+
     public static Iterator<PTable> maintainedLocalIndexes(Iterator<PTable> 
indexes) {
         return Iterators.filter(indexes, new Predicate<PTable>() {
             @Override
@@ -175,7 +175,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
         });
     }
-    
+
     /**
      * For client-side to serialize all IndexMaintainers for a given table
      * @param dataTable data table
@@ -183,17 +183,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, 
PhoenixConnection connection) {
         List<PTable> indexes = dataTable.getIndexes();
-        serialize(dataTable, ptr, indexes, connection);
+        serializeServerMaintainedIndexes(dataTable, ptr, indexes, connection);
     }
 
-    /**
-     * For client-side to serialize all IndexMaintainers for a given table
-     * @param dataTable data table
-     * @param ptr bytes pointer to hold returned serialized value
-     * @param indexes indexes to serialize
-     */
-    public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
-            List<PTable> indexes, PhoenixConnection connection) {
+    public static void serializeServerMaintainedIndexes(PTable dataTable, 
ImmutableBytesWritable ptr,
+                                                        List<PTable> indexes, 
PhoenixConnection connection) {
         Iterator<PTable> indexesItr;
         boolean onlyLocalIndexes = dataTable.isImmutableRows() || 
dataTable.isTransactional();
         if (onlyLocalIndexes) {
@@ -201,15 +195,22 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         } else {
             indexesItr = maintainedIndexes(indexes.iterator());
         }
-        if (!indexesItr.hasNext()) {
+
+        serialize(dataTable, ptr, Lists.newArrayList(indexesItr), connection);
+    }
+    /**
+     * For client-side to serialize all IndexMaintainers for a given table
+     * @param dataTable data table
+     * @param ptr bytes pointer to hold returned serialized value
+     * @param indexes indexes to serialize
+     */
+    public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
+                                 List<PTable> indexes, PhoenixConnection 
connection) {
+        if (indexes.isEmpty()) {
             ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
             return;
         }
-        int nIndexes = 0;
-        while (indexesItr.hasNext()) {
-            nIndexes++;
-            indexesItr.next();
-        }
+        int nIndexes = indexes.size();
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         DataOutputStream output = new DataOutputStream(stream);
         try {
@@ -217,21 +218,18 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             WritableUtils.writeVInt(output, nIndexes * 
(dataTable.getBucketNum() == null ? 1 : -1));
             // Write out data row key schema once, since it's the same for all 
index maintainers
             dataTable.getRowKeySchema().write(output);
-            indexesItr = onlyLocalIndexes 
-                        ? maintainedLocalIndexes(indexes.iterator())
-                        : maintainedIndexes(indexes.iterator());
-            while (indexesItr.hasNext()) {
-                    
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, 
connection));
-                    byte[] protoBytes = proto.toByteArray();
-                    WritableUtils.writeVInt(output, protoBytes.length);
-                    output.write(protoBytes);
+            for (PTable index : indexes) {
+                
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = IndexMaintainer.toProto(index.getIndexMaintainer(dataTable, 
connection));
+                byte[] protoBytes = proto.toByteArray();
+                WritableUtils.writeVInt(output, protoBytes.length);
+                output.write(protoBytes);
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
         ptr.set(stream.toByteArray(), 0, stream.size());
     }
-    
+
     /**
      * For client-side to append serialized IndexMaintainers of keyValueIndexes
      * @param dataTable data table
@@ -239,7 +237,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      * @param keyValueIndexes indexes to serialize
      */
     public static void serializeAdditional(PTable table, 
ImmutableBytesWritable indexMetaDataPtr,
-            List<PTable> keyValueIndexes, PhoenixConnection connection) {
+                                           List<PTable> keyValueIndexes, 
PhoenixConnection connection) {
         int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : 
ByteUtil.vintFromBytes(indexMetaDataPtr);
         int nIndexes = nMutableIndexes + keyValueIndexes.size();
         int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case 
new size increases buffer
@@ -273,53 +271,55 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         indexMetaDataPtr.set(stream.getBuffer(), 0, stream.size());
     }
-    
+
     public static List<IndexMaintainer> deserialize(ImmutableBytesWritable 
metaDataPtr,
-            KeyValueBuilder builder, boolean useProtoForIndexMaintainer) {
+                                                    KeyValueBuilder builder, 
boolean useProtoForIndexMaintainer) {
         return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), 
metaDataPtr.getLength(), useProtoForIndexMaintainer);
     }
-    
+
     public static List<IndexMaintainer> deserialize(byte[] buf, boolean 
useProtoForIndexMaintainer) {
         return deserialize(buf, 0, buf.length, useProtoForIndexMaintainer);
     }
 
     private static List<IndexMaintainer> deserialize(byte[] buf, int offset, 
int length, boolean useProtoForIndexMaintainer) {
-        ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, 
length);
-        DataInput input = new DataInputStream(stream);
         List<IndexMaintainer> maintainers = Collections.emptyList();
-        try {
-            int size = WritableUtils.readVInt(input);
-            boolean isDataTableSalted = size < 0;
-            size = Math.abs(size);
-            RowKeySchema rowKeySchema = new RowKeySchema();
-            rowKeySchema.readFields(input);
-            maintainers = Lists.newArrayListWithExpectedSize(size);
-            for (int i = 0; i < size; i++) {
-                if (useProtoForIndexMaintainer) {
-                  int protoSize = WritableUtils.readVInt(input);
-                  byte[] b = new byte[protoSize];
-                  input.readFully(b);
-                  
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
-                  maintainers.add(IndexMaintainer.fromProto(proto, 
rowKeySchema, isDataTableSalted));
-                } else {
-                    IndexMaintainer maintainer = new 
IndexMaintainer(rowKeySchema, isDataTableSalted);
-                    maintainer.readFields(input);
-                    maintainers.add(maintainer);
+        if (length > 0) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(buf, 
offset, length);
+            DataInput input = new DataInputStream(stream);
+            try {
+                int size = WritableUtils.readVInt(input);
+                boolean isDataTableSalted = size < 0;
+                size = Math.abs(size);
+                RowKeySchema rowKeySchema = new RowKeySchema();
+                rowKeySchema.readFields(input);
+                maintainers = Lists.newArrayListWithExpectedSize(size);
+                for (int i = 0; i < size; i++) {
+                    if (useProtoForIndexMaintainer) {
+                        int protoSize = WritableUtils.readVInt(input);
+                        byte[] b = new byte[protoSize];
+                        input.readFully(b);
+                        
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
+                        maintainers.add(IndexMaintainer.fromProto(proto, 
rowKeySchema, isDataTableSalted));
+                    } else {
+                        IndexMaintainer maintainer = new 
IndexMaintainer(rowKeySchema, isDataTableSalted);
+                        maintainer.readFields(input);
+                        maintainers.add(maintainer);
+                    }
                 }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
             }
-        } catch (IOException e) {
-            throw new RuntimeException(e); // Impossible
         }
         return maintainers;
     }
-    
+
     private byte[] viewIndexId;
     private boolean isMultiTenant;
     // indexed expressions that are not present in the row key of the data 
table, the expression can also refer to a regular column
     private List<Expression> indexedExpressions;
     // columns required to evaluate all expressions in indexedExpressions 
(this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
-    
+
     // columns required to create index row i.e. indexedColumns + 
coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -336,15 +336,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     // Transient state
     private final boolean isDataTableSalted;
     private final RowKeySchema dataRowKeySchema;
-    
+
     private int estimatedIndexRowKeyBytes;
     private int estimatedExpressionSize;
     private int[] dataPkPosition;
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
-    
-    /**** START: New member variables added in 4.10 *****/ 
+
+    /**** START: New member variables added in 4.10 *****/
     private QualifierEncodingScheme encodingScheme;
     private ImmutableStorageScheme immutableStorageScheme;
     /*
@@ -366,7 +366,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
-    
+
     private IndexMaintainer(final PTable dataTable, final PTable index, 
PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
@@ -374,11 +374,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.viewIndexId = index.getViewIndexId() == null ? null : 
MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
         this.encodingScheme = index.getEncodingScheme();
-        
+
         // null check for b/w compatibility
         this.encodingScheme = index.getEncodingScheme() == null ? 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme();
         this.immutableStorageScheme = index.getImmutableStorageScheme() == 
null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : 
index.getImmutableStorageScheme();
-        
+
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
@@ -390,16 +390,16 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // number of expressions that are indexed that are not present in the 
row key of the data table
         int indexedExpressionCount = 0;
         for (int i = indexPosOffset; i<index.getPKColumns().size();i++) {
-               PColumn indexColumn = index.getPKColumns().get(i);
-               String indexColumnName = indexColumn.getName().getString();
+            PColumn indexColumn = index.getPKColumns().get(i);
+            String indexColumnName = indexColumn.getName().getString();
             String dataFamilyName = 
IndexUtil.getDataColumnFamilyName(indexColumnName);
             String dataColumnName = 
IndexUtil.getDataColumnName(indexColumnName);
             try {
                 PColumn dataColumn = dataFamilyName.equals("") ? 
dataTable.getColumnForColumnName(dataColumnName) : 
dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
-                if (SchemaUtil.isPKColumn(dataColumn)) 
+                if (SchemaUtil.isPKColumn(dataColumn))
                     continue;
             } catch (ColumnNotFoundException e) {
-             // This column must be an expression
+                // This column must be an expression
             } catch (Exception e) {
                 throw new IllegalArgumentException(e);
             }
@@ -457,7 +457,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         StatementContext context = new StatementContext(new 
PhoenixStatement(connection), resolver);
         this.indexedColumnsInfo = 
Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
-        
+
         IndexExpressionCompiler expressionIndexCompiler = new 
IndexExpressionCompiler(context);
         for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) {
             PColumn indexColumn = index.getPKColumns().get(i);
@@ -470,30 +470,30 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 throw new RuntimeException(e); // Impossible
             }
             if ( expressionIndexCompiler.getColumnRef()!=null ) {
-               // get the column of the data column that corresponds to this 
index column
-                   PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
-                   boolean isPKColumn = SchemaUtil.isPKColumn(column);
-                   if (isPKColumn) {
-                       int dataPkPos = 
dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 
0 : 1) - (this.isMultiTenant ? 1 : 0);
-                       this.rowKeyMetaData.setIndexPkPosition(dataPkPos, 
indexPos);
-                       indexedColumnsInfo.add(new Pair<>((String)null, 
column.getName().getString()));
-                   } else {
-                       indexColByteSize += column.getDataType().isFixedWidth() 
? SchemaUtil.getFixedByteSize(column) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
-                       try {
-                           // Surround constant with cast so that we can still 
know the original type. Otherwise, if we lose the type,
-                           // (for example when VARCHAR becomes CHAR), it can 
lead to problems in the type translation we do between data tables and indexes.
-                           if (column.isNullable() && 
ExpressionUtil.isConstant(expression)) {
-                               expression = 
CoerceExpression.create(expression, indexColumn.getDataType());
-                           }
+                // get the column of the data column that corresponds to this 
index column
+                PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
+                boolean isPKColumn = SchemaUtil.isPKColumn(column);
+                if (isPKColumn) {
+                    int dataPkPos = dataTable.getPKColumns().indexOf(column) - 
(dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0);
+                    this.rowKeyMetaData.setIndexPkPosition(dataPkPos, 
indexPos);
+                    indexedColumnsInfo.add(new Pair<>((String)null, 
column.getName().getString()));
+                } else {
+                    indexColByteSize += column.getDataType().isFixedWidth() ? 
SchemaUtil.getFixedByteSize(column) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
+                    try {
+                        // Surround constant with cast so that we can still 
know the original type. Otherwise, if we lose the type,
+                        // (for example when VARCHAR becomes CHAR), it can 
lead to problems in the type translation we do between data tables and indexes.
+                        if (column.isNullable() && 
ExpressionUtil.isConstant(expression)) {
+                            expression = CoerceExpression.create(expression, 
indexColumn.getDataType());
+                        }
                         this.indexedExpressions.add(expression);
                         indexedColumnsInfo.add(new 
Pair<>(column.getFamilyName().getString(), column.getName().getString()));
                     } catch (SQLException e) {
                         throw new RuntimeException(e); // Impossible
                     }
-                   }
+                }
             }
             else {
-               indexColByteSize += expression.getDataType().isFixedWidth() ? 
SchemaUtil.getFixedByteSize(expression) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
+                indexColByteSize += expression.getDataType().isFixedWidth() ? 
SchemaUtil.getFixedByteSize(expression) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
                 this.indexedExpressions.add(expression);
                 KeyValueExpressionVisitor kvVisitor = new 
KeyValueExpressionVisitor() {
                     @Override
@@ -523,7 +523,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                             PColumn dataColumn =
                                     cf == null ? 
dataTable.getColumnForColumnQualifier(null, cq)
                                             : dataTable.getColumnFamily(cf)
-                                                    
.getPColumnForColumnQualifier(cq);
+                                            .getPColumnForColumnQualifier(cq);
                             indexedColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName()
                                     .getString(), 
dataColumn.getName().getString()));
                         } catch (ColumnNotFoundException | 
ColumnFamilyNotFoundException
@@ -549,7 +549,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 if (dataColumn != null) {
                     byte[] dataColumnCq = dataColumn.getColumnQualifierBytes();
                     byte[] indexColumnCq = 
indexColumn.getColumnQualifierBytes();
-                    this.coveredColumnsMap.put(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), 
+                    this.coveredColumnsMap.put(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq),
                             new 
ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq));
                 }
             }
@@ -557,14 +557,14 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.estimatedIndexRowKeyBytes = 
estimateIndexRowKeyByteSize(indexColByteSize);
         initCachedState();
     }
-    
+
     public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
         boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0;
         int prefixKeyLength =
                 prependRegionStartKey ? (regionStartKey.length != 0 ? 
regionStartKey.length
-                        : regionEndKey.length) : 0; 
+                        : regionEndKey.length) : 0;
         TrustedByteArrayOutputStream stream = new 
TrustedByteArrayOutputStream(estimatedIndexRowKeyBytes + (prependRegionStartKey 
? prefixKeyLength : 0));
         DataOutput output = new DataOutputStream(stream);
         try {
@@ -588,11 +588,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             // Skip data table salt byte
             int maxRowKeyOffset = rowKeyPtr.getOffset() + 
rowKeyPtr.getLength();
             dataRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset);
-            
+
             if (viewIndexId != null) {
                 output.write(viewIndexId);
             }
-            
+
             if (isMultiTenant) {
                 dataRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset);
                 output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
@@ -601,7 +601,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 }
                 dataPosOffset++;
             }
-            
+
             // Write index row key
             for (int i = dataPosOffset; i < dataRowKeySchema.getFieldCount(); 
i++) {
                 Boolean hasValue=dataRowKeySchema.next(ptr, i, 
maxRowKeyOffset);
@@ -617,7 +617,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                         dataRowKeyLocator[0][pos] = 0;
                         dataRowKeyLocator[1][pos] = 0;
                     }
-                } 
+                }
             }
             BitSet descIndexColumnBitSet = 
rowKeyMetaData.getDescIndexColumnBitSet();
             Iterator<Expression> expressionIterator = 
indexedExpressions.iterator();
@@ -626,11 +626,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 boolean isNullable;
                 SortOrder dataSortOrder;
                 if (dataPkPosition[i] == EXPRESSION_NOT_PRESENT) {
-                       Expression expression = expressionIterator.next();
-                       dataColumnType = expression.getDataType();
-                       dataSortOrder = expression.getSortOrder();
+                    Expression expression = expressionIterator.next();
+                    dataColumnType = expression.getDataType();
+                    dataSortOrder = expression.getSortOrder();
                     isNullable = expression.isNullable();
-                       expression.evaluate(new ValueGetterTuple(valueGetter, 
ts), ptr);
+                    expression.evaluate(new ValueGetterTuple(valueGetter, ts), 
ptr);
                 }
                 else {
                     Field field = dataRowKeySchema.getField(dataPkPosition[i]);
@@ -788,7 +788,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
         }
     }
-    
+
     /*
      * return the view index id from the index row key
      */
@@ -801,7 +801,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     }
 
     private volatile RowKeySchema indexRowKeySchema;
-    
+
     // We have enough information to generate the index row key schema
     private RowKeySchema generateIndexRowKeySchema() {
         int nIndexedColumns = getIndexPkColumnCount() + (isMultiTenant ? 1 : 
0) + (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0) + (viewIndexId != null ? 
1 : 0) - getNumViewConstants();
@@ -840,7 +840,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 public SortOrder getSortOrder() {
                     return SortOrder.getDefault();
                 }
-                
+
             }, false, SortOrder.getDefault());
         }
         if (isMultiTenant) {
@@ -848,7 +848,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             builder.addField(field, field.isNullable(), field.getSortOrder());
             nIndexedColumns--;
         }
-        
+
         Field[] indexFields = new Field[nIndexedColumns];
         BitSet viewConstantColumnBitSet = 
this.rowKeyMetaData.getViewConstantColumnBitSet();
         // Add Field for all data row pk columns
@@ -858,9 +858,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             // same for all rows in this index)
             if (!viewConstantColumnBitSet.get(i)) {
                 int pos = rowKeyMetaData.getIndexPkPosition(i-dataPosOffset);
-                indexFields[pos] = 
+                indexFields[pos] =
                         dataRowKeySchema.getField(i);
-            } 
+            }
         }
         BitSet descIndexColumnBitSet = 
rowKeyMetaData.getDescIndexColumnBitSet();
         Iterator<Expression> expressionItr = indexedExpressions.iterator();
@@ -916,12 +916,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 public SortOrder getSortOrder() {
                     return sortOrder;
                 }
-                
+
             }, true, sortOrder);
         }
         return builder.build();
     }
-    
+
     private int getNumViewConstants() {
         BitSet bitSet = this.rowKeyMetaData.getViewConstantColumnBitSet();
         int num = 0;
@@ -942,7 +942,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         return indexRowKeySchema;
     }
-    
+
     public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey, byte[] regionEndKey) throws IOException {
         byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey, ts);
         Put put = null;
@@ -957,9 +957,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), 
dataEmptyKeyValueRef.getQualifierWritable(), ts,
-                // set the value to the empty column name
-                dataEmptyKeyValueRef.getQualifierWritable()));
+                    this.getEmptyKeyValueFamily(), 
dataEmptyKeyValueRef.getQualifierWritable(), ts,
+                    QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
         }
         ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
@@ -993,22 +992,22 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                         public boolean isNullable() {
                             return false;
                         }
-                        
+
                         @Override
                         public SortOrder getSortOrder() {
                             return null;
                         }
-                        
+
                         @Override
                         public Integer getScale() {
                             return null;
                         }
-                        
+
                         @Override
                         public Integer getMaxLength() {
                             return null;
                         }
-                        
+
                         @Override
                         public PDataType getDataType() {
                             return null;
@@ -1022,7 +1021,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                         colValues[indexArrayPos] = new 
LiteralExpression(value);
                     }
                 }
-                
+
                 List<Expression> children = Arrays.asList(colValues);
                 // we use SingleCellConstructorExpression to serialize 
multiple columns into a single byte[]
                 SingleCellConstructorExpression 
singleCellConstructorExpression = new 
SingleCellConstructorExpression(immutableStorageScheme, children);
@@ -1058,38 +1057,38 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> 
pendingUpdates) {
         return getDeleteTypeOrNull(pendingUpdates, this.nDataCFs);
     }
-   
+
     private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> 
pendingUpdates, int nCFs) {
         int nDeleteCF = 0;
         int nDeleteVersionCF = 0;
         for (Cell kv : pendingUpdates) {
-               if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamilyVersion.getCode()) {
+            if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamilyVersion.getCode()) {
                 nDeleteVersionCF++;
             }
-               else if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode()
-                               // Since we don't include the index rows in the 
change set for txn tables, we need to detect row deletes that have transformed 
by TransactionProcessor
-                       || TransactionUtil.isDeleteFamily(kv)) {
-                   nDeleteCF++;
-               }
+            else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
+                    // Since we don't include the index rows in the change set 
for txn tables, we need to detect row deletes that have transformed by 
TransactionProcessor
+                    || TransactionUtil.isDeleteFamily(kv)) {
+                nDeleteCF++;
+            }
         }
         // This is what a delete looks like on the server side for mutable 
indexing...
         // Should all be one or the other for DeleteFamily versus 
DeleteFamilyVersion, but just in case not
-        DeleteType deleteType = null; 
+        DeleteType deleteType = null;
         if (nDeleteVersionCF > 0 && nDeleteVersionCF >= nCFs) {
-               deleteType = DeleteType.SINGLE_VERSION;
+            deleteType = DeleteType.SINGLE_VERSION;
         } else {
-                       int nDelete = nDeleteCF + nDeleteVersionCF;
-                       if (nDelete>0 && nDelete >= nCFs) {
-                               deleteType = DeleteType.ALL_VERSIONS;
-                       }
-               }
+            int nDelete = nDeleteCF + nDeleteVersionCF;
+            if (nDelete>0 && nDelete >= nCFs) {
+                deleteType = DeleteType.ALL_VERSIONS;
+            }
+        }
         return deleteType;
     }
-    
+
     public boolean isRowDeleted(Collection<? extends Cell> pendingUpdates) {
         return getDeleteTypeOrNull(pendingUpdates) != null;
     }
-    
+
     public boolean isRowDeleted(Mutation m) {
         if (m.getFamilyCellMap().size() < this.nDataCFs) {
             return false;
@@ -1106,42 +1105,42 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         if (pendingUpdates.isEmpty()) {
             return false;
         }
-        Map<ColumnReference,Cell> newState = 
Maps.newHashMapWithExpectedSize(pendingUpdates.size()); 
+        Map<ColumnReference,Cell> newState = 
Maps.newHashMapWithExpectedSize(pendingUpdates.size());
         for (Cell kv : pendingUpdates) {
             newState.put(new ColumnReference(CellUtil.cloneFamily(kv), 
CellUtil.cloneQualifier(kv)), kv);
         }
         for (ColumnReference ref : indexedColumns) {
-               Cell newValue = newState.get(ref);
-               if (newValue != null) { // Indexed column has potentially 
changed
-                   ImmutableBytesWritable oldValue = 
oldState.getLatestValue(ref, ts);
+            Cell newValue = newState.get(ref);
+            if (newValue != null) { // Indexed column has potentially changed
+                ImmutableBytesWritable oldValue = oldState.getLatestValue(ref, 
ts);
                 boolean newValueSetAsNull = (newValue.getTypeByte() == 
Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() 
|| CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY));
-                       boolean oldValueSetAsNull = oldValue == null || 
oldValue.getLength() == 0;
-                       //If the new column value has to be set as null and the 
older value is null too,
-                       //then just skip to the next indexed column.
-                       if (newValueSetAsNull && oldValueSetAsNull) {
-                               continue;
-                       }
-                       if (oldValueSetAsNull || newValueSetAsNull) {
-                               return true;
-                       }
-                       // If the old value is different than the new value, 
the index row needs to be deleted
-                       if (Bytes.compareTo(oldValue.get(), 
oldValue.getOffset(), oldValue.getLength(), 
-                                       newValue.getValueArray(), 
newValue.getValueOffset(), newValue.getValueLength()) != 0) {
-                               return true;
-                       }
-               }
+                boolean oldValueSetAsNull = oldValue == null || 
oldValue.getLength() == 0;
+                //If the new column value has to be set as null and the older 
value is null too,
+                //then just skip to the next indexed column.
+                if (newValueSetAsNull && oldValueSetAsNull) {
+                    continue;
+                }
+                if (oldValueSetAsNull || newValueSetAsNull) {
+                    return true;
+                }
+                // If the old value is different than the new value, the index 
row needs to be deleted
+                if (Bytes.compareTo(oldValue.get(), oldValue.getOffset(), 
oldValue.getLength(),
+                        newValue.getValueArray(), newValue.getValueOffset(), 
newValue.getValueLength()) != 0) {
+                    return true;
+                }
+            }
         }
         return false;
     }
-    
-   /**
+
+    /**
      * Used for immutable indexes that only index PK column values. In that 
case, we can handle a data row deletion,
      * since we can build the corresponding index row key.
      */
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, 
ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException {
         return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, 
Collections.<KeyValue>emptyList(), ts, null, null);
     }
-    
+
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
         byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey, ts);
         // Delete the entire row if any of the indexed columns changed
@@ -1149,7 +1148,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         if (oldState == null || 
(deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || 
hasIndexedColumnChanged(oldState, pendingUpdates, ts)) { // Deleting the entire 
row
             byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
             Delete delete = new Delete(indexRowKey);
-            
+
             for (ColumnReference ref : getCoveredColumns()) {
                 ColumnReference indexColumn = coveredColumnsMap.get(ref);
                 // If table delete was single version, then index delete 
should be as well
@@ -1175,12 +1174,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 ColumnReference ref = new ColumnReference(kv.getFamily(), 
kv.getQualifier());
                 if (dataTableColRefs.contains(ref)) {
                     if (delete == null) {
-                        delete = new Delete(indexRowKey);                    
+                        delete = new Delete(indexRowKey);
                         delete.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
                     ColumnReference indexColumn = coveredColumnsMap.get(ref);
                     // If point delete for data table, then use point delete 
for index as well
-                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { 
+                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
                         delete.addColumn(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     } else {
                         delete.addColumns(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
@@ -1190,11 +1189,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         return delete;
     }
-    
+
     public byte[] getIndexTableName() {
         return indexTableName;
     }
-    
+
     public Set<ColumnReference> getCoveredColumns() {
         return coveredColumnsMap.keySet();
     }
@@ -1202,14 +1201,14 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     public Set<ColumnReference> getAllColumns() {
         return allColumns;
     }
-    
+
     public ImmutableBytesPtr getEmptyKeyValueFamily() {
         // Since the metadata of an index table will never change,
         // we can infer this based on the family of the first covered column
         // If if there are no covered columns, we know it's our default name
         return emptyKeyValueCFPtr;
     }
-    
+
     @Deprecated // Only called by code older than our 4.10 release
     @Override
     public void readFields(DataInput input) throws IOException {
@@ -1263,16 +1262,16 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         //TODO remove this in the next major release
         boolean isNewClient = false;
         if (len < 0) {
-          isNewClient = true;
-          len=Math.abs(len);
+            isNewClient = true;
+            len=Math.abs(len);
         }
         byte [] emptyKeyValueCF = new byte[len];
         input.readFully(emptyKeyValueCF, 0, len);
         emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueCF);
-        
+
         if (isNewClient) {
             int numIndexedExpressions = WritableUtils.readVInt(input);
-            indexedExpressions = 
Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
+            indexedExpressions = 
Lists.newArrayListWithExpectedSize(numIndexedExpressions);
             for (int i = 0; i < numIndexedExpressions; i++) {
                 Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
                 expression.readFields(input);
@@ -1287,27 +1286,27 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 ColumnReference colRef = colReferenceIter.next();
                 final PDataType dataType = dataTypeIter.next();
                 indexedExpressions.add(new KeyValueColumnExpression(new 
PDatum() {
-                    
+
                     @Override
                     public boolean isNullable() {
                         return true;
                     }
-                    
+
                     @Override
                     public SortOrder getSortOrder() {
                         return SortOrder.getDefault();
                     }
-                    
+
                     @Override
                     public Integer getScale() {
                         return null;
                     }
-                    
+
                     @Override
                     public Integer getMaxLength() {
                         return null;
                     }
-                    
+
                     @Override
                     public PDataType getDataType() {
                         return dataType;
@@ -1315,7 +1314,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 }, colRef.getFamily(), colRef.getQualifier()));
             }
         }
-        
+
         rowKeyMetaData = newRowKeyMetaData();
         rowKeyMetaData.readFields(input);
         int nDataCFs = WritableUtils.readVInt(input);
@@ -1330,8 +1329,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.encodingScheme = QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
         initCachedState();
     }
-    
-        
+
+
     public static IndexMaintainer 
fromProto(ServerCachingProtos.IndexMaintainer proto, RowKeySchema 
dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
         IndexMaintainer maintainer = new 
IndexMaintainer(dataTableRowKeySchema, isDataTableSalted);
         maintainer.nIndexSaltBuckets = proto.getSaltBuckets();
@@ -1365,7 +1364,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         maintainer.rowKeyMetaData = newRowKeyMetaData(maintainer, 
dataTableRowKeySchema, maintainer.indexedExpressions.size(), isDataTableSalted, 
maintainer.isMultiTenant);
         try (ByteArrayInputStream stream = new 
ByteArrayInputStream(proto.getRowKeyMetadata().toByteArray())) {
             DataInput input = new DataInputStream(stream);
-            maintainer.rowKeyMetaData.readFields(input);   
+            maintainer.rowKeyMetaData.readFields(input);
         }
         maintainer.nDataCFs = proto.getNumDataTableColFamilies();
         maintainer.indexWALDisabled = proto.getIndexWalDisabled();
@@ -1380,7 +1379,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         maintainer.encodingScheme = 
PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme());
         maintainer.immutableStorageScheme = 
PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme());
         maintainer.isLocalIndex = proto.getIsLocalIndex();
-        
+
         List<ServerCachingProtos.ColumnReference> 
dataTableColRefsForCoveredColumnsList = 
proto.getDataTableColRefForCoveredColumnsList();
         List<ServerCachingProtos.ColumnReference> 
indexTableColRefsForCoveredColumnsList = 
proto.getIndexTableColRefForCoveredColumnsList();
         maintainer.coveredColumnsMap = 
Maps.newHashMapWithExpectedSize(dataTableColRefsForCoveredColumnsList.size());
@@ -1390,7 +1389,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             ColumnReference dataTableColRef = new 
ColumnReference(colRefFromProto.getFamily().toByteArray(), 
colRefFromProto.getQualifier( ).toByteArray());
             ColumnReference indexTableColRef;
             if (encodedColumnNames) {
-                ServerCachingProtos.ColumnReference fromProto = 
indexTableColRefItr.next(); 
+                ServerCachingProtos.ColumnReference fromProto = 
indexTableColRefItr.next();
                 indexTableColRef = new 
ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier( 
).toByteArray());
             } else {
                 byte[] cq = 
IndexUtil.getIndexColumnName(dataTableColRef.getFamily(), 
dataTableColRef.getQualifier());
@@ -1402,7 +1401,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         maintainer.initCachedState();
         return maintainer;
     }
-    
+
     @Deprecated // Only called by code older than our 4.10 release
     @Override
     public void write(DataOutput output) throws IOException {
@@ -1436,20 +1435,20 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // when indexedColumnTypes is removed, remove this 
         WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), 
emptyKeyValueCFPtr.getLength());
-        
+
         WritableUtils.writeVInt(output, indexedExpressions.size());
         for (Expression expression : indexedExpressions) {
             WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
             expression.write(output);
         }
-        
+
         rowKeyMetaData.write(output);
         // Encode indexWALDisabled in nDataCFs
         WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? 
-1 : 1));
         // Encode estimatedIndexRowKeyBytes and immutableRows together.
         WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * 
(immutableRows ? -1 : 1));
     }
-    
+
     public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer 
maintainer) throws IOException {
         ServerCachingProtos.IndexMaintainer.Builder builder = 
ServerCachingProtos.IndexMaintainer.newBuilder();
         builder.setSaltBuckets(maintainer.nIndexSaltBuckets);
@@ -1555,12 +1554,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         size += estimatedExpressionSize;
         return size;
     }
-    
+
     private int estimateIndexRowKeyByteSize(int indexColByteSize) {
         int estimatedIndexRowKeyBytes = indexColByteSize + 
dataRowKeySchema.getEstimatedValueLength() + (nIndexSaltBuckets == 0 || 
isLocalIndex || this.isDataTableSalted ? 0 : SaltingUtil.NUM_SALTING_BYTES);
         return estimatedIndexRowKeyBytes;
     }
-    
+
     /**
      * Init calculated state reading/creating
      */
@@ -1571,12 +1570,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // columns that are required to evaluate all expressions in 
indexedExpressions (not including columns in data row key)
         this.indexedColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
         for (Expression expression : indexedExpressions) {
-               KeyValueExpressionVisitor visitor = new 
KeyValueExpressionVisitor() {
+            KeyValueExpressionVisitor visitor = new 
KeyValueExpressionVisitor() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                       if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), 
expression.getColumnQualifier()))) {
-                               
indexedColumnTypes.add(expression.getDataType());
-                       }
+                    if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), 
expression.getColumnQualifier()))) {
+                        indexedColumnTypes.add(expression.getDataType());
+                    }
                     return null;
                 }
             };
@@ -1590,7 +1589,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 allColumns.add(new ColumnReference(colRef.getFamily(), 
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
             }
         }
-        
+
         int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 
0);
         int nIndexPkColumns = getIndexPkColumnCount();
         dataPkPosition = new int[nIndexPkColumns];
@@ -1605,7 +1604,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 numViewConstantColumns++;
             }
         }
-        
+
         // Calculate the max number of trailing nulls that we should get rid 
of after building the index row key.
         // We only get rid of nulls for variable length types, so we have to 
be careful to consider the type of the
         // index table, not the data type of the data table
@@ -1635,15 +1634,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private int getIndexPkColumnCount() {
         return getIndexPkColumnCount(dataRowKeySchema, 
indexedExpressions.size(), isDataTableSalted, isMultiTenant);
     }
-    
+
     private static int getIndexPkColumnCount(RowKeySchema rowKeySchema, int 
numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) {
         return rowKeySchema.getFieldCount() + numIndexExpressions - 
(isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
     }
-    
+
     private RowKeyMetaData newRowKeyMetaData() {
         return getIndexPkColumnCount() < 0xFF ? new ByteSizeRowKeyMetaData() : 
new IntSizedRowKeyMetaData();
     }
-    
+
     private static RowKeyMetaData newRowKeyMetaData(IndexMaintainer i, 
RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, 
boolean isMultiTenant) {
         int indexPkColumnCount = getIndexPkColumnCount(rowKeySchema, 
numIndexExpressions, isDataTableSalted, isMultiTenant);
         return indexPkColumnCount < 0xFF ? i.new ByteSizeRowKeyMetaData() : 
i.new IntSizedRowKeyMetaData();
@@ -1659,26 +1658,26 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             output.write(b);
         }
     }
-    
+
     private abstract class RowKeyMetaData implements Writable {
         private BitSet descIndexColumnBitSet;
         private BitSet viewConstantColumnBitSet;
-        
+
         private RowKeyMetaData() {
         }
-        
+
         private RowKeyMetaData(int nIndexedColumns) {
             descIndexColumnBitSet = BitSet.withCapacity(nIndexedColumns);
             viewConstantColumnBitSet = 
BitSet.withCapacity(dataRowKeySchema.getMaxFields()); // Size based on number 
of data PK columns
-      }
-        
+        }
+
         protected int getByteSize() {
             return BitSet.getByteSize(getIndexPkColumnCount()) * 3 + 
BitSet.getByteSize(dataRowKeySchema.getMaxFields());
         }
-        
+
         protected abstract int getIndexPkPosition(int dataPkPosition);
         protected abstract int setIndexPkPosition(int dataPkPosition, int 
indexPkPosition);
-        
+
         @Override
         public void readFields(DataInput input) throws IOException {
             int length = getIndexPkColumnCount();
@@ -1686,7 +1685,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             int vclength = dataRowKeySchema.getMaxFields();
             viewConstantColumnBitSet = BitSet.read(input, vclength);
         }
-        
+
         @Override
         public void write(DataOutput output) throws IOException {
             int length = getIndexPkColumnCount();
@@ -1703,12 +1702,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             return viewConstantColumnBitSet;
         }
     }
-    
+
     private static int BYTE_OFFSET = 127;
-    
+
     private class ByteSizeRowKeyMetaData extends RowKeyMetaData {
         private byte[] indexPkPosition;
-        
+
         private ByteSizeRowKeyMetaData() {
         }
 
@@ -1716,7 +1715,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             super(nIndexedColumns);
             this.indexPkPosition = new byte[nIndexedColumns];
         }
-        
+
         @Override
         protected int getIndexPkPosition(int dataPkPosition) {
             // Use offset for byte so that we can get full range of 0 - 255
@@ -1748,10 +1747,10 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             input.readFully(indexPkPosition);
         }
     }
-    
+
     private class IntSizedRowKeyMetaData extends RowKeyMetaData {
         private int[] indexPkPosition;
-        
+
         private IntSizedRowKeyMetaData() {
         }
 
@@ -1759,7 +1758,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             super(nIndexedColumns);
             this.indexPkPosition = new int[nIndexedColumns];
         }
-        
+
         @Override
         protected int getIndexPkPosition(int dataPkPosition) {
             return this.indexPkPosition[dataPkPosition];
@@ -1769,7 +1768,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         protected int setIndexPkPosition(int dataPkPosition, int 
indexPkPosition) {
             return this.indexPkPosition[dataPkPosition] = indexPkPosition;
         }
-        
+
         @Override
         public void write(DataOutput output) throws IOException {
             super.write(output);
@@ -1814,7 +1813,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
             @Override
             public byte[] getRowKey() {
-               return rowKey;
+                return rowKey;
             }
         };
     }
@@ -1822,15 +1821,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     public byte[] getDataEmptyKeyValueCF() {
         return dataEmptyKeyValueCF;
     }
-    
+
     public boolean isLocalIndex() {
         return isLocalIndex;
     }
-    
+
     public boolean isImmutableRows() {
         return immutableRows;
     }
-    
+
     public Set<ColumnReference> getIndexedColumns() {
         return indexedColumns;
     }
@@ -1849,22 +1848,22 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
             return super.visitEnter(node);
         }
-        
+
         public Map<String, UDFParseNode> getUdfParseNodes() {
             return udfParseNodes;
         }
     }
-    
+
     public byte[] getEmptyKeyValueQualifier() {
         return dataEmptyKeyValueRef.getQualifier();
     }
-    
+
     public Set<Pair<String, String>> getIndexedColumnInfo() {
         return indexedColumnsInfo;
     }
-    
+
     public ImmutableStorageScheme getIndexStorageScheme() {
         return immutableStorageScheme;
     }
-    
+
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 6093edd..11c412c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -71,7 +71,6 @@ public class PhoenixInputFormat<T extends DBWritable> extends 
InputFormat<NullWr
     @Override
     public RecordReader<NullWritable,T> createRecordReader(InputSplit split, 
TaskAttemptContext context)
             throws IOException, InterruptedException {
-        
         final Configuration configuration = context.getConfiguration();
         final QueryPlan queryPlan = getQueryPlan(context,configuration);
         @SuppressWarnings("unchecked")
@@ -163,7 +162,7 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
      * @throws IOException
      * @throws SQLException
      */
-    private QueryPlan getQueryPlan(final JobContext context, final 
Configuration configuration)
+    protected QueryPlan getQueryPlan(final JobContext context, final 
Configuration configuration)
             throws IOException {
         Preconditions.checkNotNull(context);
         try {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 58c048b..b7e1373 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -60,8 +60,8 @@ import com.google.common.collect.Lists;
 public class PhoenixRecordReader<T extends DBWritable> extends 
RecordReader<NullWritable,T> {
     
     private static final Log LOG = 
LogFactory.getLog(PhoenixRecordReader.class);
-    private final Configuration  configuration;
-    private final QueryPlan queryPlan;
+    protected final Configuration  configuration;
+    protected final QueryPlan queryPlan;
     private NullWritable key =  NullWritable.get();
     private T value = null;
     private Class<T> inputClass;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
new file mode 100644
index 0000000..f8ec393
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.*;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.util.*;
+
+import com.google.common.base.Preconditions;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+
+/**
+ * {@link InputFormat} implementation from Phoenix for building index
+ * 
+ */
+public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends 
PhoenixInputFormat {
+    QueryPlan queryPlan = null;
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixServerBuildIndexInputFormat.class);
+
+    /**
+     * instantiated by framework
+     */
+    public PhoenixServerBuildIndexInputFormat() {
+    }
+
+    @Override
+    protected  QueryPlan getQueryPlan(final JobContext context, final 
Configuration configuration)
+            throws IOException {
+        Preconditions.checkNotNull(context);
+        if (queryPlan != null) {
+            return queryPlan;
+        }
+        final String txnScnValue = 
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+        final String currentScnValue = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        final String tenantId = 
configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        final Properties overridingProps = new Properties();
+        if(txnScnValue==null && currentScnValue!=null) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
+            overridingProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, 
currentScnValue);
+        }
+        if (tenantId != null && 
configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        String dataTableFullName = getIndexToolDataTableName(configuration);
+        String indexTableFullName = getIndexToolIndexTableName(configuration);
+
+        try (final Connection connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps)) {
+            PhoenixConnection phoenixConnection = 
connection.unwrap(PhoenixConnection.class);
+            Long scn = (currentScnValue != null) ? 
Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
+            PTable indexTable = 
PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
+            ServerBuildIndexCompiler compiler =
+                    new ServerBuildIndexCompiler(phoenixConnection, 
dataTableFullName);
+            MutationPlan plan = compiler.compile(indexTable);
+            Scan scan = plan.getContext().getScan();
+
+            try {
+                scan.setTimeRange(0, scn);
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+            queryPlan = plan.getQueryPlan();
+            // since we can't set a scn on connections with txn set TX_SCN 
attribute so that the max time range is set by BaseScannerRegionObserver
+            if (txnScnValue != null) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, 
Bytes.toBytes(Long.valueOf(txnScnValue)));
+            }
+
+            // Initialize the query plan so it sets up the parallel scans
+            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+            return queryPlan;
+        } catch (Exception exception) {
+            LOG.error(String.format("Failed to get the query plan with error 
[%s]",
+                    exception.getMessage()));
+            throw new RuntimeException(exception);
+        }
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 355cc53..a665a91 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -28,7 +28,10 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -74,6 +77,7 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
 import 
org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
 import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -106,6 +110,22 @@ public class IndexTool extends Configured implements Tool {
 
     private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
 
+    private String schemaName;
+    private String dataTable;
+    private String indexTable;
+    private boolean isPartialBuild;
+    private String qDataTable;
+    private String qIndexTable;
+    private boolean useDirectApi;
+    private boolean useSnapshot;
+    private boolean isLocalIndexBuild;
+    private PTable pIndexTable;
+    private PTable pDataTable;
+    private String tenantId;
+    private Job job;
+
+
+
     private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", 
true,
             "Phoenix schema name (optional)");
     private static final Option DATA_TABLE_OPTION = new Option("dt", 
"data-table", true,
@@ -247,18 +267,30 @@ public class IndexTool extends Configured implements Tool 
{
 
         }
 
-        public Job getJob(String schemaName, String indexTable, String 
dataTable, boolean useDirectApi, boolean isPartialBuild,
-            boolean useSnapshot, String tenantId) throws Exception {
+        public Job getJob() throws Exception {
             if (isPartialBuild) {
-                return configureJobForPartialBuild(schemaName, dataTable, 
tenantId);
+                return configureJobForPartialBuild();
             } else {
-                return configureJobForAsyncIndex(schemaName, indexTable, 
dataTable, useDirectApi, useSnapshot, tenantId);
+                long maxTimeRange = pIndexTable.getTimeStamp() + 1;
+                // this is set to ensure index tables remains consistent post 
population.
+
+                if (pDataTable.isTransactional()) {
+                    configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
+                            
Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
+                }
+                configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
+                        Long.toString(maxTimeRange));
+                if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && 
pDataTable.isTransactional())) {
+                    return configureJobForAysncIndex();
+                }
+                else {
+                    //Local and non-transactional global indexes to be built 
on the server side
+                    return configureJobForServerBuildIndex();
+                }
             }
         }
-        
-        private Job configureJobForPartialBuild(String schemaName, String 
dataTable, String tenantId) throws Exception {
-            final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            final PTable pdataTable = PhoenixRuntime.getTable(connection, 
qDataTable);
+
+        private Job configureJobForPartialBuild() throws Exception {
             connection = ConnectionUtil.getInputConnection(configuration);
             long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
             PTable indexWithMinDisableTimestamp = null;
@@ -266,7 +298,7 @@ public class IndexTool extends Configured implements Tool {
             //Get Indexes in building state, minDisabledTimestamp 
             List<String> disableIndexes = new ArrayList<String>();
             List<PTable> disabledPIndexes = new ArrayList<PTable>();
-            for (PTable index : pdataTable.getIndexes()) {
+            for (PTable index : pDataTable.getIndexes()) {
                 if (index.getIndexState().equals(PIndexState.BUILDING)) {
                     disableIndexes.add(index.getTableName().getString());
                     disabledPIndexes.add(index);
@@ -299,10 +331,10 @@ public class IndexTool extends Configured implements Tool 
{
             //serialize index maintaienr in job conf with Base64 TODO: Need to 
find better way to serialize them in conf.
             List<IndexMaintainer> maintainers = 
Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
             for (PTable index : disabledPIndexes) {
-                maintainers.add(index.getIndexMaintainer(pdataTable, 
connection.unwrap(PhoenixConnection.class)));
+                maintainers.add(index.getIndexMaintainer(pDataTable, 
connection.unwrap(PhoenixConnection.class)));
             }
             ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
-            IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, 
disabledPIndexes, connection.unwrap(PhoenixConnection.class));
+            IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, 
disabledPIndexes, connection.unwrap(PhoenixConnection.class));
             PhoenixConfigurationUtil.setIndexMaintainers(configuration, 
indexMetaDataPtr);
             if (tenantId != null) {
                 PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
@@ -313,15 +345,15 @@ public class IndexTool extends Configured implements Tool 
{
             scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
             scan.setRaw(true);
             scan.setCacheBlocks(false);
-            if (pdataTable.isTransactional()) {
-                long maxTimeRange = pdataTable.getTimeStamp() + 1;
+            if (pDataTable.isTransactional()) {
+                long maxTimeRange = pDataTable.getTimeStamp() + 1;
                 scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
                         
Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
             }
             
           
-            String physicalTableName=pdataTable.getPhysicalName().getString();
-            final String jobName = String.format("Phoenix Indexes build for " 
+ pdataTable.getName().toString());
+            String physicalTableName=pDataTable.getPhysicalName().getString();
+            final String jobName = String.format("Phoenix Indexes build for " 
+ pDataTable.getName().toString());
             
             PhoenixConfigurationUtil.setInputTableName(configuration, 
qDataTable);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, 
physicalTableName);
@@ -338,7 +370,7 @@ public class IndexTool extends Configured implements Tool {
                     null, job);
             TableMapReduceUtil.initCredentials(job);
             TableInputFormat.configureSplitTable(job, 
TableName.valueOf(physicalTableName));
-            return configureSubmittableJobUsingDirectApi(job, true);
+            return configureSubmittableJobUsingDirectApi(job);
         }
         
         private long getMaxRebuildAsyncDate(String schemaName, List<String> 
disableIndexes) throws SQLException {
@@ -368,39 +400,15 @@ public class IndexTool extends Configured implements Tool 
{
             
         }
 
-        private Job configureJobForAsyncIndex(String schemaName, String 
indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String 
tenantId)
-                throws Exception {
-            final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            final String qIndexTable;
-            if (schemaName != null && !schemaName.isEmpty()) {
-                qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, 
indexTable);
-            } else {
-                qIndexTable = indexTable;
-            }
-            final PTable pdataTable = PhoenixRuntime.getTable(connection, 
qDataTable);
-            
-            final PTable pindexTable = PhoenixRuntime.getTable(connection, 
qIndexTable);
-            
-            long maxTimeRange = pindexTable.getTimeStamp() + 1;
-            // this is set to ensure index tables remains consistent post 
population.
+        private Job configureJobForAysncIndex()
 
-            if (pdataTable.isTransactional()) {
-                configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
-                    
Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
-            }
-            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
-                Long.toString(maxTimeRange));
-            
-            // check if the index type is LOCAL, if so, derive and set the 
physicalIndexName that is
-            // computed from the qDataTable name.
-            String physicalIndexTable = 
pindexTable.getPhysicalName().getString();
-            
+                throws Exception {
 
+            String physicalIndexTable = 
pIndexTable.getPhysicalName().getString();
             final PhoenixConnection pConnection = 
connection.unwrap(PhoenixConnection.class);
             final PostIndexDDLCompiler ddlCompiler =
-                    new PostIndexDDLCompiler(pConnection, new 
TableRef(pdataTable));
-            ddlCompiler.compile(pindexTable);
-
+                    new PostIndexDDLCompiler(pConnection, new 
TableRef(pDataTable));
+            ddlCompiler.compile(pIndexTable);
             final List<String> indexColumns = 
ddlCompiler.getIndexColumnNames();
             final String selectQuery = ddlCompiler.getSelectQuery();
             final String upsertQuery =
@@ -409,6 +417,7 @@ public class IndexTool extends Configured implements Tool {
             configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, 
upsertQuery);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, 
physicalIndexTable);
             PhoenixConfigurationUtil.setDisableIndexes(configuration, 
indexTable);
+
             PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
                 indexColumns.toArray(new String[indexColumns.size()]));
             if (tenantId != null) {
@@ -418,25 +427,22 @@ public class IndexTool extends Configured implements Tool 
{
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, 
indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList);
 
+            fs = outputPath.getFileSystem(configuration);
+            fs.delete(outputPath, true);
             final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
schemaName, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            if (outputPath != null) {
-                fs = outputPath.getFileSystem(configuration);
-                fs.delete(outputPath, true);
-                FileOutputFormat.setOutputPath(job, outputPath);
-            }
+            FileOutputFormat.setOutputPath(job, outputPath);
 
             if (!useSnapshot) {
-                PhoenixMapReduceUtil.setInput(job, 
PhoenixIndexDBWritable.class, qDataTable,
-                    selectQuery);
+                PhoenixMapReduceUtil.setInput(job, 
PhoenixIndexDBWritable.class, qDataTable, selectQuery);
             } else {
                 HBaseAdmin admin = null;
                 String snapshotName;
                 try {
                     admin = pConnection.getQueryServices().getAdmin();
-                    String pdataTableName = pdataTable.getName().getString();
+                    String pdataTableName = pDataTable.getName().getString();
                     snapshotName = new 
StringBuilder(pdataTableName).append("-Snapshot").toString();
                     admin.snapshot(snapshotName, 
TableName.valueOf(pdataTableName));
                 } finally {
@@ -451,17 +457,47 @@ public class IndexTool extends Configured implements Tool 
{
 
                 // set input for map reduce job using hbase snapshots
                 PhoenixMapReduceUtil
-                    .setInput(job, PhoenixIndexDBWritable.class, snapshotName, 
qDataTable, restoreDir, selectQuery);
+                            .setInput(job, PhoenixIndexDBWritable.class, 
snapshotName, qDataTable, restoreDir, selectQuery);
             }
             TableMapReduceUtil.initCredentials(job);
             
             if (useDirectApi) {
-                return configureSubmittableJobUsingDirectApi(job, false);
+                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+                return configureSubmittableJobUsingDirectApi(job);
             } else {
                 return configureRunnableJobUsingBulkLoad(job, outputPath);
-                
             }
-            
+        }
+
+        private Job configureJobForServerBuildIndex()
+                throws Exception {
+
+            PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, 
qDataTable);
+            PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, 
qIndexTable);
+
+            String physicalIndexTable = 
pIndexTable.getPhysicalName().getString();
+
+            PhoenixConfigurationUtil.setPhysicalTableName(configuration, 
physicalIndexTable);
+            PhoenixConfigurationUtil.setDisableIndexes(configuration, 
indexTable);
+            if (tenantId != null) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
+
+            fs = outputPath.getFileSystem(configuration);
+            fs.delete(outputPath, true);
+
+            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
schemaName, dataTable, indexTable);
+            final Job job = Job.getInstance(configuration, jobName);
+            job.setJarByClass(IndexTool.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            FileOutputFormat.setOutputPath(job, outputPath);
+
+            PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, 
PhoenixServerBuildIndexInputFormat.class,
+                            qDataTable, "");
+
+            TableMapReduceUtil.initCredentials(job);
+            job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+            return configureSubmittableJobUsingDirectApi(job);
         }
 
         /**
@@ -495,12 +531,9 @@ public class IndexTool extends Configured implements Tool {
          * @return
          * @throws Exception
          */
-        private Job configureSubmittableJobUsingDirectApi(Job job, boolean 
isPartialRebuild)
+        private Job configureSubmittableJobUsingDirectApi(Job job)
                 throws Exception {
-            if (!isPartialRebuild) {
-                //Don't configure mapper for partial build as it is configured 
already
-                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
-            }
+
             job.setReducerClass(PhoenixIndexImportDirectReducer.class);
             Configuration conf = job.getConfiguration();
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -519,6 +552,10 @@ public class IndexTool extends Configured implements Tool {
         
     }
 
+    public Job getJob() {
+        return job;
+    }
+
     @Override
     public int run(String[] args) throws Exception {
         Connection connection = null;
@@ -531,64 +568,75 @@ public class IndexTool extends Configured implements Tool 
{
                 printHelpAndExit(e.getMessage(), getOptions());
             }
             final Configuration configuration = 
HBaseConfiguration.addHbaseResources(getConf());
-            final String schemaName = 
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
-            final String dataTable = 
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
-            final String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
-            final boolean isPartialBuild = 
cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
-            final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            boolean useDirectApi = 
cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
-            String 
basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
-            boolean isForeground = 
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
-            boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
             boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
-            byte[][] splitKeysBeforeJob = null;
-            boolean isLocalIndexBuild = false;
-            PTable pindexTable = null;
-            String tenantId = null;
+            tenantId = null;
             if (useTenantId) {
                 tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
                 configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
             connection = ConnectionUtil.getInputConnection(configuration);
+            schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+            dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+            indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            isPartialBuild = 
cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+            qDataTable = SchemaUtil.getQualifiedTableName(schemaName, 
dataTable);
+            pDataTable = PhoenixRuntime.getTableNoCache(connection, 
qDataTable);
+            useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
+            String 
basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+            boolean isForeground = 
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+            useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
+
+            byte[][] splitKeysBeforeJob = null;
+            isLocalIndexBuild = false;
+            pIndexTable = null;
+
+            connection = ConnectionUtil.getInputConnection(configuration);
+
             if (indexTable != null) {
                 if (!isValidIndexTable(connection, qDataTable,indexTable, 
tenantId)) {
                     throw new IllegalArgumentException(String.format(
                         " %s is not an index table for %s for this 
connection", indexTable, qDataTable));
                 }
-                pindexTable = PhoenixRuntime.getTable(connection, schemaName 
!= null && !schemaName.isEmpty()
+                pIndexTable = PhoenixRuntime.getTable(connection, schemaName 
!= null && !schemaName.isEmpty()
                         ? SchemaUtil.getQualifiedTableName(schemaName, 
indexTable) : indexTable);
+
+                if (schemaName != null && !schemaName.isEmpty()) {
+                    qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, 
indexTable);
+                } else {
+                    qIndexTable = indexTable;
+                }
                 htable = 
(HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
-                        .getTable(pindexTable.getPhysicalName().getBytes());
-                if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+                        .getTable(pIndexTable.getPhysicalName().getBytes());
+
+                if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
                     isLocalIndexBuild = true;
                     splitKeysBeforeJob = 
htable.getRegionLocator().getStartKeys();
                 }
                 // presplit the index table
                 boolean autosplit = 
cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
-                boolean isSalted = pindexTable.getBucketNum() != null; // no 
need to split salted tables
-                if (!isSalted && 
IndexType.GLOBAL.equals(pindexTable.getIndexType())
+                boolean isSalted = pIndexTable.getBucketNum() != null; // no 
need to split salted tables
+                if (!isSalted && 
IndexType.GLOBAL.equals(pIndexTable.getIndexType())
                         && (autosplit || 
cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) {
                     String nOpt = 
cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
                     int autosplitNumRegions = nOpt == null ? 
DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
                     String rateOpt = 
cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
                     double samplingRate = rateOpt == null ? 
DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
                     LOG.info(String.format("Will split index %s , autosplit=%s 
, autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit, 
autosplitNumRegions, samplingRate));
-                    
splitIndexTable(connection.unwrap(PhoenixConnection.class), qDataTable, 
pindexTable, autosplit, autosplitNumRegions, samplingRate);
+
+                    
splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, 
autosplitNumRegions, samplingRate, configuration);
                 }
             }
-            
-            PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, 
qDataTable);
                        Path outputPath = null;
                        FileSystem fs = null;
                        if (basePath != null) {
-                               outputPath = 
CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null
-                                               ? 
pdataTable.getPhysicalName().getString() : 
pindexTable.getPhysicalName().getString());
+                               outputPath = 
CsvBulkImportUtil.getOutputPath(new Path(basePath), pIndexTable == null
+                                               ? 
pDataTable.getPhysicalName().getString() : 
pIndexTable.getPhysicalName().getString());
                                fs = outputPath.getFileSystem(configuration);
                                fs.delete(outputPath, true);
                        }
-            
-            Job job = new JobFactory(connection, configuration, 
outputPath).getJob(schemaName, indexTable, dataTable,
-                    useDirectApi, isPartialBuild, useSnapshot, tenantId);
+
+                       job = new JobFactory(connection, configuration, 
outputPath).getJob();
+
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and 
exit");
                 job.submit();
@@ -634,32 +682,29 @@ public class IndexTool extends Configured implements Tool 
{
         }
     }
 
-    
 
-    private void splitIndexTable(PhoenixConnection pConnection, String 
qDataTable,
-            PTable pindexTable, boolean autosplit, int autosplitNumRegions, 
double samplingRate)
+    private void splitIndexTable(PhoenixConnection pConnection, boolean 
autosplit, int autosplitNumRegions, double samplingRate, Configuration 
configuration)
             throws SQLException, IOException, IllegalArgumentException, 
InterruptedException {
-        final PTable pdataTable = PhoenixRuntime.getTable(pConnection, 
qDataTable);
         int numRegions;
         try (HTable hDataTable =
                 (HTable) pConnection.getQueryServices()
-                        .getTable(pdataTable.getPhysicalName().getBytes())) {
+                        .getTable(pDataTable.getPhysicalName().getBytes())) {
             numRegions = hDataTable.getRegionLocator().getStartKeys().length;
             if (autosplit && !(numRegions > autosplitNumRegions)) {
                 LOG.info(String.format(
                     "Will not split index %s because the data table only has 
%s regions, autoSplitNumRegions=%s",
-                    pindexTable.getPhysicalName(), numRegions, 
autosplitNumRegions));
+                    pIndexTable.getPhysicalName(), numRegions, 
autosplitNumRegions));
                 return; // do nothing if # of regions is too low
             }
         }
         // build a tablesample query to fetch index column values from the 
data table
-        DataSourceColNames colNames = new DataSourceColNames(pdataTable, 
pindexTable);
+        DataSourceColNames colNames = new DataSourceColNames(pDataTable, 
pIndexTable);
         String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", 
samplingRate);
         List<String> dataColNames = colNames.getDataColNames();
         final String dataSampleQuery =
                 QueryUtil.constructSelectStatement(qTableSample, dataColNames, 
null,
                     Hint.NO_INDEX, true);
-        IndexMaintainer maintainer = IndexMaintainer.create(pdataTable, 
pindexTable, pConnection);
+        IndexMaintainer maintainer = IndexMaintainer.create(pDataTable, 
pIndexTable, pConnection);
         ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable();
         try (final PhoenixResultSet rs =
                 pConnection.createStatement().executeQuery(dataSampleQuery)
@@ -683,7 +728,7 @@ public class IndexTool extends Configured implements Tool {
                 splitPoints[splitIdx++] = b.getRightBoundExclusive();
             }
             // drop table and recreate with appropriate splits
-            TableName indexTN = 
TableName.valueOf(pindexTable.getPhysicalName().getBytes());
+            TableName indexTN = 
TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
             HTableDescriptor descriptor = admin.getTableDescriptor(indexTN);
             admin.disableTable(indexTN);
             admin.deleteTable(indexTN);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
new file mode 100644
index 0000000..34bcc9b
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that does not do much as regions servers actually build the index 
from the data table regions directly
+ */
+public class PhoenixServerBuildIndexMapper extends
+        Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, 
IntWritable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixServerBuildIndexMapper.class);
+
+    @Override
+    protected void setup(final Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+    }
+
+    @Override
+    protected void map(NullWritable key, PhoenixIndexDBWritable record, 
Context context)
+            throws IOException, InterruptedException {
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+        // Make sure progress is reported to Application Master.
+        context.progress();
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        context.write(new 
ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), new 
IntWritable(0));
+        super.cleanup(context);
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 301f18b..6788e5f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -120,6 +120,10 @@ public final class PhoenixConfigurationUtil {
 
     public static final String SCRUTINY_INDEX_TABLE_NAME = 
"phoenix.mr.scrutiny.index.table.name";
 
+    public static final String INDEX_TOOL_DATA_TABLE_NAME = 
"phoenix.mr.index_tool.data.table.name";
+
+    public static final String INDEX_TOOL_INDEX_TABLE_NAME = 
"phoenix.mr.index_tool.index.table.name";
+
     public static final String SCRUTINY_SOURCE_TABLE = 
"phoenix.mr.scrutiny.source.table";
 
     public static final String SCRUTINY_BATCH_SIZE = 
"phoenix.mr.scrutiny.batch.size";
@@ -543,6 +547,16 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         return configuration.get(SCRUTINY_INDEX_TABLE_NAME);
     }
+    public static void setIndexToolDataTableName(Configuration configuration, 
String qDataTableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(qDataTableName);
+        configuration.set(INDEX_TOOL_DATA_TABLE_NAME, qDataTableName);
+    }
+
+    public static String getIndexToolDataTableName(Configuration 
configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(INDEX_TOOL_DATA_TABLE_NAME);
+    }
 
     public static void setScrutinyIndexTable(Configuration configuration, 
String qIndexTableName) {
         Preconditions.checkNotNull(configuration);
@@ -555,6 +569,17 @@ public final class PhoenixConfigurationUtil {
         return SourceTable.valueOf(configuration.get(SCRUTINY_SOURCE_TABLE));
     }
 
+    public static void setIndexToolIndexTableName(Configuration configuration, 
String qIndexTableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(qIndexTableName);
+        configuration.set(INDEX_TOOL_INDEX_TABLE_NAME, qIndexTableName);
+    }
+
+    public static String getIndexToolIndexTableName(Configuration 
configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME);
+    }
+
     public static void setScrutinySourceTable(Configuration configuration,
             SourceTable sourceTable) {
         Preconditions.checkNotNull(configuration);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index 3462177..bab6cee 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.mapreduce.util;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
@@ -70,6 +71,23 @@ public final class PhoenixMapReduceUtil {
     /**
      *
      * @param job
+     * @param inputClass        DBWritable class
+     * @param inputFormatClass  InputFormat class
+     * @param tableName         Input table name
+     * @param inputQuery        Select query
+     */
+
+    public static void setInput(final Job job, final Class<? extends 
DBWritable> inputClass,
+                                final Class<? extends InputFormat> 
inputFormatClass,
+                                final String tableName, final String 
inputQuery) {
+        final Configuration configuration = setInput(job, inputClass, 
inputFormatClass, tableName);
+        PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+        PhoenixConfigurationUtil.setSchemaType(configuration, 
SchemaType.QUERY);
+    }
+
+    /**
+     *
+     * @param job
      * @param inputClass DBWritable class
      * @param snapshotName The name of a snapshot (of a table) to read from
      * @param tableName Input table name
@@ -135,6 +153,15 @@ public final class PhoenixMapReduceUtil {
         return configuration;
     }
 
+    private static Configuration setInput(final Job job, final Class<? extends 
DBWritable> inputClass,
+                                          final Class<? extends InputFormat> 
inputFormatClass, final String tableName){
+        job.setInputFormatClass(inputFormatClass);
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+        PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+        return configuration;
+    }
+
     /**
      * A method to override which HBase cluster for {@link PhoenixInputFormat} 
to read from
      * @param job MapReduce Job
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 765cedd..48bbf67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -152,6 +152,7 @@ import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.ServerBuildIndexCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -1352,16 +1353,17 @@ public class MetaDataClient {
     }
     
     private MutationPlan getMutationPlanForBuildingIndex(PTable index, 
TableRef dataTableRef) throws SQLException {
-        MutationPlan mutationPlan;
         if (index.getIndexType() == IndexType.LOCAL) {
             PostLocalIndexDDLCompiler compiler =
                     new PostLocalIndexDDLCompiler(connection, 
getFullTableName(dataTableRef));
-            mutationPlan = compiler.compile(index);
-        } else {
+            return compiler.compile(index);
+        } else if (dataTableRef.getTable().isTransactional()){
             PostIndexDDLCompiler compiler = new 
PostIndexDDLCompiler(connection, dataTableRef);
-            mutationPlan = compiler.compile(index);
+            return compiler.compile(index);
+        } else {
+            ServerBuildIndexCompiler compiler = new 
ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
+            return compiler.compile(index);
         }
-        return mutationPlan;
     }
 
     private MutationState buildIndex(PTable index, TableRef dataTableRef) 
throws SQLException {
@@ -1697,6 +1699,10 @@ public class MetaDataClient {
         if (connection.getSCN() != null) {
             return buildIndexAtTimeStamp(table, statement.getTable());
         }
+
+        String dataTableFullName = SchemaUtil.getTableName(
+                tableRef.getTable().getSchemaName().getString(),
+                tableRef.getTable().getTableName().getString());
         return buildIndex(table, tableRef);
     }
 

Reply via email to