This is an automated email from the ASF dual-hosted git repository.

pboado pushed a commit to branch 5.x-cdh6
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 8cedf240d4fcf3188bc43ef19511d58aaca42119
Author: Gokcen Iskender <gisken...@salesforce.com>
AuthorDate: Wed Mar 6 17:58:21 2019 +0000

    PHOENIX-5190 Implement TaskRegionObserver for Index rebuild
    
    Signed-off-by: Geoffrey Jacoby <gjac...@apache.org>
---
 .../phoenix/end2end/DropTableWithViewsIT.java      |  37 ++-
 .../apache/phoenix/end2end/IndexRebuildTaskIT.java | 176 ++++++++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |   3 +-
 .../phoenix/coprocessor/TaskRegionObserver.java    | 293 ++++++++--------
 .../apache/phoenix/coprocessor/tasks/BaseTask.java |  17 +
 .../coprocessor/tasks/DropChildViewsTask.java      |  81 +++++
 .../coprocessor/tasks/IndexRebuildTask.java        | 151 +++++++++
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   5 +
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  58 +++-
 .../index/PhoenixIndexImportDirectReducer.java     |  43 +++
 .../phoenix/query/ConnectionQueryServicesImpl.java |  30 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   6 +
 .../java/org/apache/phoenix/schema/PTable.java     |  26 +-
 .../java/org/apache/phoenix/schema/task/Task.java  | 369 +++++++++++++++++++++
 14 files changed, 1115 insertions(+), 180 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
index a4cd354..6e1f8aa 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
@@ -18,12 +18,16 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -103,6 +107,9 @@ public class DropTableWithViewsIT extends 
SplitSystemCatalogIT {
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Connection viewConn =
                         isMultiTenant ? 
DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn) {
+            // Empty the task table first.
+            conn.createStatement().execute("DELETE " + " FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+
             String ddlFormat =
                     "CREATE TABLE IF NOT EXISTS " + baseTable + "  ("
                             + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 
VARCHAR "
@@ -126,16 +133,14 @@ public class DropTableWithViewsIT extends 
SplitSystemCatalogIT {
             // Run DropChildViewsTask to complete the tasks for dropping child 
views. The depth of the view tree is 2,
             // so we expect that this will be done in two task handling runs 
as each non-root level will be processed
             // in one run
-            TaskRegionObserver.DropChildViewsTask task =
-                    new TaskRegionObserver.DropChildViewsTask(
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
                             TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
             task.run();
             task.run();
-            ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
-                    " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
-                    " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
-                    PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue());
-            assertFalse(rs.next());
+
+            assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), 
PTable.TaskType.DROP_CHILD_VIEWS, null);
+
             // Views should be dropped by now
             TableName linkTable = 
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES);
             TableViewFinderResult childViewsResult = new 
TableViewFinderResult();
@@ -147,9 +152,25 @@ public class DropTableWithViewsIT extends 
SplitSystemCatalogIT {
                     childViewsResult);
             assertTrue(childViewsResult.getLinks().size() == 0);
             // There should not be any orphan views
-            rs = conn.createStatement().executeQuery("SELECT * FROM " + 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
                     " WHERE " + PhoenixDatabaseMetaData.TABLE_SCHEM + " = '" + 
SCHEMA2 +"'");
             assertFalse(rs.next());
         }
     }
+
+    public static void assertTaskColumns(Connection conn, String 
expectedStatus, PTable.TaskType taskType, String expectedData)
+            throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
+                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+                " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
+                taskType.getSerializedValue());
+        assertTrue(rs.next());
+        String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
+        assertEquals(expectedStatus, taskStatus);
+
+        if (expectedData != null) {
+            String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA);
+            assertEquals(expectedData, data);
+        }
+    }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
new file mode 100644
index 0000000..b1a6ba3
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
+    protected static String TENANT1 = "tenant1";
+    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        HashMap<String, String> props = new HashMap<>();
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        TaskRegionEnvironment =
+                getUtility()
+                        .getRSForFirstRegionInTable(
+                                
PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                        
.getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+
+    private String generateDDL(String format) {
+        StringBuilder optionsBuilder = new StringBuilder();
+
+        if (optionsBuilder.length() != 0) optionsBuilder.append(",");
+        optionsBuilder.append("MULTI_TENANT=true");
+
+        return String.format(format, "TENANT_ID VARCHAR NOT NULL, ", 
"TENANT_ID, ", optionsBuilder.toString());
+    }
+
+    @Test
+    public void testIndexRebuildTask() throws Throwable {
+        String baseTable = generateUniqueName();
+        Connection conn = null;
+        Connection viewConn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT1);
+
+            viewConn =DriverManager.getConnection(getUrl(), props);
+            String ddlFormat =
+                    "CREATE TABLE IF NOT EXISTS " + baseTable + "  ("
+                            + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 
VARCHAR "
+                            + " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" + " ) 
%s";
+            conn.createStatement().execute(generateDDL(ddlFormat));
+            conn.commit();
+            // Create a view
+            String viewName = generateUniqueName();
+            String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " 
+ baseTable;
+            viewConn.createStatement().execute(viewDDL);
+
+            // Create index
+            String indexName = generateUniqueName();
+            String idxSDDL = String.format("CREATE INDEX %s ON %s (V1)", 
indexName, viewName);
+
+            viewConn.createStatement().execute(idxSDDL);
+
+            // Insert rows
+            int numOfValues = 1;
+            for (int i=0; i < numOfValues; i++){
+                viewConn.createStatement().execute(
+                        String.format("UPSERT INTO %s VALUES('%s', '%s', 
'%s')", viewName, String.valueOf(i), "y",
+                                "z"));
+            }
+            viewConn.commit();
+
+            String data = "{IndexName:" + indexName + "}";
+            // Run IndexRebuildTask
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+
+            Timestamp startTs = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            // Add a task to System.Task to build indexes
+            Task.addTask(conn.unwrap(PhoenixConnection.class), 
PTable.TaskType.INDEX_REBUILD,
+                    TENANT1, null, viewName,
+                    PTable.TaskStatus.CREATED.toString(), data, null, startTs, 
null, true);
+
+
+            task.run();
+
+            String viewIndexTableName = 
MetaDataUtil.getViewIndexPhysicalName(baseTable);
+            ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            int count = 
getUtility().countRows(queryServices.getTable(Bytes.toBytes(viewIndexTableName)));
+            assertTrue(count == numOfValues);
+
+
+            // Remove index contents and try again
+            Admin admin = queryServices.getAdmin();
+            TableName tableName = TableName.valueOf(viewIndexTableName);
+            admin.disableTable(tableName);
+            admin.truncateTable(tableName, false);
+
+            data = "{IndexName:" + indexName + ", DisableBefore:true}";
+
+            // Add a new task (update status to created) to System.Task to 
rebuild indexes
+            Task.addTask(conn.unwrap(PhoenixConnection.class), 
PTable.TaskType.INDEX_REBUILD,
+                    TENANT1, null, viewName,
+                    PTable.TaskStatus.CREATED.toString(), data, null, startTs, 
null, true);
+            task.run();
+
+            Thread.sleep(15000);
+
+            Table systemHTable= 
queryServices.getTable(Bytes.toBytes("SYSTEM."+PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE));
+            count = getUtility().countRows(systemHTable);
+            assertEquals(1, count);
+
+            // Check task status and other column values.
+            DropTableWithViewsIT.assertTaskColumns(conn, 
PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.INDEX_REBUILD,
+                    null);
+
+            // See that index is rebuilt and confirm index has rows
+            Table htable= 
queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+            count = getUtility().countRows(htable);
+            assertEquals(numOfValues, count);
+        } finally {
+            conn.createStatement().execute("DELETE " + " FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+            conn.commit();
+            if (conn != null) {
+                conn.close();
+            }
+            if (viewConn != null) {
+                viewConn.close();
+            }
+        }
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 192d004..45712a4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -227,6 +227,7 @@ import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.task.Task;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
@@ -2837,7 +2838,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements RegionCopr
                         }
                         try {
                             PhoenixConnection conn = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
-                            TaskRegionObserver.addTask(conn, 
PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId),
+                            Task.addTask(conn, 
PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId),
                                 Bytes.toString(schemaName), 
Bytes.toString(tableName), this.accessCheckEnabled);
                         } catch (Throwable t) {
                             logger.error("Adding a task to drop child views 
failed!", t);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 2d94aa6..a6c5328 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -18,21 +18,24 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
+import java.lang.reflect.Method;
 import java.sql.SQLException;
 import java.sql.Timestamp;
-import java.sql.Types;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,20 +44,16 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.ipc.RpcCall;
-import org.apache.hadoop.hbase.ipc.RpcUtil;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.TaskType;
 
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 
 
@@ -65,12 +64,51 @@ import org.apache.phoenix.util.QueryUtil;
 
 public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
     public static final Log LOG = LogFactory.getLog(TaskRegionObserver.class);
+
     protected ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(TaskType.values().length);
     private long timeInterval = 
QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS;
     private long timeMaxInterval = 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS;
     @GuardedBy("TaskRegionObserver.class")
     private long initialDelay = 
QueryServicesOptions.DEFAULT_TASK_HANDLING_INITIAL_DELAY_MS;
 
+    private static Map<TaskType, String> classMap = ImmutableMap.<TaskType, 
String>builder()
+            .put(TaskType.DROP_CHILD_VIEWS, 
"org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
+            .put(TaskType.INDEX_REBUILD, 
"org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
+            .build();
+
+    public enum TaskResultCode {
+        SUCCESS,
+        FAIL,
+        SKIPPED,
+    }
+
+    public static class TaskResult {
+        private TaskResultCode resultCode;
+        private String details;
+
+        public TaskResult(TaskResultCode resultCode, String details) {
+            this.resultCode = resultCode;
+            this.details = details;
+        }
+
+        public TaskResultCode getResultCode() {
+            return resultCode;
+        }
+
+        public String getDetails() {
+            return details;
+        }
+
+        @Override
+        public String toString() {
+            String result = resultCode.name();
+            if (!Strings.isNullOrEmpty(details)) {
+                result = result + " - " + details;
+            }
+            return result;
+        }
+    }
+
     @Override
     public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
             boolean abortRequested) {
@@ -109,102 +147,16 @@ public class TaskRegionObserver implements 
RegionObserver, RegionCoprocessor {
             deprecationLogger.setLevel(Level.WARN);
         }
 
-        DropChildViewsTask task = new DropChildViewsTask(e.getEnvironment(), 
timeMaxInterval);
+        SelfHealingTask task = new SelfHealingTask(e.getEnvironment(), 
timeMaxInterval);
         executor.scheduleWithFixedDelay(task, initialDelay, timeInterval, 
TimeUnit.MILLISECONDS);
     }
 
-    private static void mutateSystemTaskTable(PhoenixConnection conn, 
PreparedStatement stmt, boolean accessCheckEnabled)
-            throws IOException {
-        // we need to mutate SYSTEM.TASK with HBase/login user if access is 
enabled.
-        if (accessCheckEnabled) {
-            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws Exception {
-                    final RpcCall rpcContext = RpcUtil.getRpcContext();
-                    // setting RPC context as null so that user can be reset
-                    try {
-                        RpcUtil.setRpcContext(null);
-                        stmt.execute();
-                        conn.commit();
-                    } catch (SQLException e) {
-                        throw new IOException(e);
-                    } finally {
-                      // setting RPC context back to original context of the 
RPC
-                      RpcUtil.setRpcContext(rpcContext);
-                    }
-                    return null;
-                }
-            });
-        }
-        else {
-            try {
-                stmt.execute();
-                conn.commit();
-            } catch (SQLException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    public static void addTask(PhoenixConnection conn, TaskType taskType, 
String tenantId, String schemaName,
-                               String tableName, boolean accessCheckEnabled)
-            throws IOException {
-        PreparedStatement stmt = null;
-        try {
-            stmt = conn.prepareStatement("UPSERT INTO " +
-                    PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
-                    PhoenixDatabaseMetaData.TASK_TYPE + ", " +
-                    PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                    PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                    PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)");
-            stmt.setByte(1, taskType.getSerializedValue());
-            if (tenantId != null) {
-                stmt.setString(2, tenantId);
-            } else {
-                stmt.setNull(2, Types.VARCHAR);
-            }
-            if (schemaName != null) {
-                stmt.setString(3, schemaName);
-            } else {
-                stmt.setNull(3, Types.VARCHAR);
-            }
-            stmt.setString(4, tableName);
-        } catch (SQLException e) {
-            throw new IOException(e);
-        }
-        mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
-    }
-
-    public static void deleteTask(PhoenixConnection conn, TaskType taskType, 
Timestamp ts, String tenantId,
-                                  String schemaName, String tableName, boolean 
accessCheckEnabled) throws IOException {
-        PreparedStatement stmt = null;
-        try {
-            stmt = conn.prepareStatement("DELETE FROM " +
-                    PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
-                    " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND 
" +
-                    PhoenixDatabaseMetaData.TASK_TS + " = ? AND " +
-                    PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? " 
IS NULL " : " = '" + tenantId + "'") + " AND " +
-                    PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null 
? " IS NULL " : " = '" + schemaName + "'") + " AND " +
-                    PhoenixDatabaseMetaData.TABLE_NAME + " = ?");
-            stmt.setByte(1, taskType.getSerializedValue());
-            stmt.setTimestamp(2, ts);
-            stmt.setString(3, tableName);
-        } catch (SQLException e) {
-            throw new IOException(e);
-        }
-        mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
-    }
-
-    /**
-     * Task runs periodically to clean up task of child views whose parent is 
dropped
-     *
-     */
-    public static class DropChildViewsTask extends TimerTask {
-        private RegionCoprocessorEnvironment env;
-        private long timeMaxInterval;
-        private boolean accessCheckEnabled;
+    public static class SelfHealingTask extends TimerTask {
+        protected RegionCoprocessorEnvironment env;
+        protected long timeMaxInterval;
+        protected boolean accessCheckEnabled;
 
-        public DropChildViewsTask(RegionCoprocessorEnvironment env, long 
timeMaxInterval) {
+        public SelfHealingTask(RegionCoprocessorEnvironment env, long 
timeMaxInterval) {
             this.env = env;
             this.accessCheckEnabled = 
env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
                     QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
@@ -214,88 +166,105 @@ public class TaskRegionObserver implements 
RegionObserver, RegionCoprocessor {
         @Override
         public void run() {
             PhoenixConnection connForTask = null;
-            Timestamp timestamp = null;
-            String tenantId = null;
-            byte[] tenantIdBytes;
-            String schemaName= null;
-            byte[] schemaNameBytes;
-            String tableName = null;
-            byte[] tableNameBytes;
-            PhoenixConnection pconn;
             try {
-                String taskQuery = "SELECT " +
-                        PhoenixDatabaseMetaData.TASK_TS + ", " +
-                        PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                        PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                        PhoenixDatabaseMetaData.TABLE_NAME +
-                        " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
-                        " WHERE "+ PhoenixDatabaseMetaData.TASK_TYPE + " = " + 
PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue();
-
                 connForTask = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
-                PreparedStatement taskStatement = 
connForTask.prepareStatement(taskQuery);
-                ResultSet rs = taskStatement.executeQuery();
-                while (rs.next()) {
+                String[] excludeStates = new String[] { 
PTable.TaskStatus.FAILED.toString(),
+                        PTable.TaskStatus.COMPLETED.toString() };
+                List<Task.TaskRecord> taskRecords = 
Task.queryTaskTable(connForTask,  excludeStates);
+                for (Task.TaskRecord taskRecord : taskRecords){
                     try {
-                        // delete child views only if the parent table is 
deleted from the system catalog
-                        timestamp = rs.getTimestamp(1);
-                        tenantId = rs.getString(2);
-                        tenantIdBytes= rs.getBytes(2);
-                        schemaName= rs.getString(3);
-                        schemaNameBytes = rs.getBytes(3);
-                        tableName= rs.getString(4);
-                        tableNameBytes = rs.getBytes(4);
-
-                        if (tenantId != null) {
-                            Properties tenantProps = new Properties();
-                            
tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-                            pconn = 
QueryUtil.getConnectionOnServer(tenantProps, 
env.getConfiguration()).unwrap(PhoenixConnection.class);
-
+                        TaskType taskType = taskRecord.getTaskType();
+                        if (!classMap.containsKey(taskType)) {
+                            LOG.warn("Don't know how to execute task type: " + 
taskType.name());
+                            continue;
                         }
-                        else {
-                            pconn = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+
+                        String className = classMap.get(taskType);
+
+                        Class<?> concreteClass = Class.forName(className);
+
+                        Object obj = concreteClass.newInstance();
+                        Method runMethod = 
concreteClass.getDeclaredMethod("run",
+                                Task.TaskRecord.class);
+                        Method checkCurretResult = 
concreteClass.getDeclaredMethod("checkCurrentResult", Task.TaskRecord.class);
+                        Method initMethod = 
concreteClass.getSuperclass().getDeclaredMethod("init",
+                                RegionCoprocessorEnvironment.class, 
Long.class);
+                        initMethod.invoke(obj, env, timeMaxInterval);
+
+                        // if current status is already Started, check if we 
need to re-run.
+                        // Task can be async and already Started before.
+                        TaskResult result = null;
+                        if (taskRecord.getStatus() != null && 
taskRecord.getStatus().equals(PTable.TaskStatus.STARTED.toString())) {
+                            result = (TaskResult) 
checkCurretResult.invoke(obj, taskRecord);
                         }
 
-                        MetaDataProtocol.MetaDataMutationResult result = new 
MetaDataClient(pconn).updateCache(pconn.getTenantId(),
-                                schemaName, tableName, true);
-                        if (result.getMutationCode() != 
MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) {
-                            MetaDataEndpointImpl.dropChildViews(env, 
tenantIdBytes, schemaNameBytes, tableNameBytes);
-                        } else if (System.currentTimeMillis() < 
timeMaxInterval + timestamp.getTime()) {
-                            // skip this task as it has not been expired and 
its parent table has not been dropped yet
-                            LOG.info("Skipping a child view drop task. The 
parent table has not been dropped yet : " +
-                                    schemaName + "." + tableName +
-                                    " with tenant id " + (tenantId == null ? " 
IS NULL" : tenantId) +
-                                    " and timestamp " + timestamp.toString());
-                            continue;
+                        if (result == null) {
+                            // reread task record. There might be async 
setting of task status
+                            taskRecord = Task.queryTaskTable(connForTask, 
taskRecord.getSchemaName(), taskRecord.getTableName(),
+                                    taskType, taskRecord.getTenantId(), 
null).get(0);
+                            if (taskRecord.getStatus() != null && 
Arrays.stream(excludeStates).anyMatch(taskRecord.getStatus()::equals)) {
+                                continue;
+                            }
+                            // Change task status to STARTED
+                            Task.addTask(connForTask, 
taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
+                                    taskRecord.getTableName(), 
PTable.TaskStatus.STARTED.toString(),
+                                    taskRecord.getData(), 
taskRecord.getPriority(), taskRecord.getTimeStamp(), null, true);
+
+                            // invokes the method at runtime
+                            result = (TaskResult) runMethod.invoke(obj, 
taskRecord);
                         }
-                        else {
-                            LOG.warn(" A drop child view task has expired and 
will be removed from the system task table : " +
-                                    schemaName + "." + tableName +
-                                    " with tenant id " + (tenantId == null ? " 
IS NULL" : tenantId) +
-                                    " and timestamp " + timestamp.toString());
+
+                        if (result != null) {
+                            String taskStatus = 
PTable.TaskStatus.FAILED.toString();
+                            if (result.getResultCode() == 
TaskResultCode.SUCCESS) {
+                                taskStatus = 
PTable.TaskStatus.COMPLETED.toString();
+                            } else if (result.getResultCode() == 
TaskResultCode.SKIPPED) {
+                                // We will pickup this task again
+                                continue;
+                            }
+
+                            setEndTaskStatus(connForTask, taskRecord, 
taskStatus);
                         }
 
-                        deleteTask(connForTask, 
PTable.TaskType.DROP_CHILD_VIEWS, timestamp, tenantId, schemaName,
-                                tableName, this.accessCheckEnabled);
                     }
                     catch (Throwable t) {
-                        LOG.warn("Exception while dropping a child view task. 
" +
+                        LOG.warn("Exception while running self healingtask. " +
                                 "It will be retried in the next system task 
table scan : " +
-                                schemaName + "." + tableName +
-                                " with tenant id " + (tenantId == null ? " IS 
NULL" : tenantId) +
-                                " and timestamp " + timestamp.toString(), t);
+                                " taskType : " + 
taskRecord.getTaskType().name() +
+                                taskRecord.getSchemaName()  + "." + 
taskRecord.getTableName() +
+                                " with tenant id " + (taskRecord.getTenantId() 
== null ? " IS NULL" : taskRecord.getTenantId()) +
+                                " and timestamp " + 
taskRecord.getTimeStamp().toString(), t);
                     }
                 }
             } catch (Throwable t) {
-                LOG.error("DropChildViewsTask failed!", t);
+                LOG.error("SelfHealingTask failed!", t);
             } finally {
                 if (connForTask != null) {
                     try {
                         connForTask.close();
                     } catch (SQLException ignored) {
-                        LOG.debug("DropChildViewsTask can't close connection", 
ignored);
+                        LOG.debug("SelfHealingTask can't close connection", 
ignored);
                     }
                 }
             }
         }
+
+        public static void setEndTaskStatus(PhoenixConnection connForTask, 
Task.TaskRecord taskRecord, String taskStatus)
+                throws IOException {
+            // update data with details.
+            String data = taskRecord.getData();
+            if (Strings.isNullOrEmpty(data)) {
+                data = "{}";
+            }
+            JsonParser jsonParser = new JsonParser();
+            JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
+            jsonObject.addProperty("TaskDetails", taskStatus);
+            data = jsonObject.toString();
+
+            Timestamp endTs = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            Task.addTask(connForTask, taskRecord.getTaskType(), 
taskRecord.getTenantId(), taskRecord.getSchemaName(),
+                    taskRecord.getTableName(), taskStatus, data, 
taskRecord.getPriority(),
+                    taskRecord.getTimeStamp(), endTs, true);
+        }
     }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java
new file mode 100644
index 0000000..5c9a5c4
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java
@@ -0,0 +1,17 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.schema.task.Task;
+
+public abstract class BaseTask {
+    protected long timeMaxInterval;
+    protected RegionCoprocessorEnvironment env;
+    public void init(RegionCoprocessorEnvironment env, Long interval) {
+        this.env = env;
+        this.timeMaxInterval = interval;
+    }
+    public abstract TaskRegionObserver.TaskResult run(Task.TaskRecord 
taskRecord);
+
+    public abstract TaskRegionObserver.TaskResult 
checkCurrentResult(Task.TaskRecord taskRecord) throws Exception;
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java
new file mode 100644
index 0000000..f00e1f6
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java
@@ -0,0 +1,81 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Properties;
+
+/**
+ * Task runs periodically to clean up task of child views whose parent is 
dropped
+ *
+ */
+public class DropChildViewsTask extends BaseTask {
+    public static final Log LOG = LogFactory.getLog(DropChildViewsTask.class);
+
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+        PhoenixConnection pconn = null;
+        Timestamp timestamp = taskRecord.getTimeStamp();
+        try {
+            String tenantId = taskRecord.getTenantId();
+            if (tenantId != null) {
+                Properties tenantProps = new Properties();
+                tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId);
+                pconn = QueryUtil.getConnectionOnServer(tenantProps, 
env.getConfiguration()).unwrap(PhoenixConnection.class);
+            }
+            else {
+                pconn = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+            }
+
+            MetaDataProtocol.MetaDataMutationResult result = new 
MetaDataClient(pconn).updateCache(pconn.getTenantId(),
+                    taskRecord.getSchemaName(), taskRecord.getTableName(), 
true);
+            if (result.getMutationCode() != 
MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) {
+                MetaDataEndpointImpl
+                        .dropChildViews(env, taskRecord.getTenantIdBytes(), 
taskRecord.getSchemaNameBytes(), taskRecord.getTableNameBytes());
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+            } else if (System.currentTimeMillis() < timeMaxInterval + 
timestamp.getTime()) {
+                // skip this task as it has not been expired and its parent 
table has not been dropped yet
+                LOG.info("Skipping a child view drop task. The parent table 
has not been dropped yet : " +
+                        taskRecord.getSchemaName() + "." + 
taskRecord.getTableName() +
+                        " with tenant id " + (tenantId == null ? " IS NULL" : 
tenantId) +
+                        " and timestamp " + timestamp.toString());
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "");
+            }
+            else {
+                LOG.warn(" A drop child view task has expired and will be 
marked as failed : " +
+                        taskRecord.getSchemaName() + "." + 
taskRecord.getTableName() +
+                        " with tenant id " + (tenantId == null ? " IS NULL" : 
tenantId) +
+                        " and timestamp " + timestamp.toString());
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
"Expired");
+            }
+        }
+        catch (Throwable t) {
+            LOG.warn("Exception while dropping a child view task. " +
+                    taskRecord.getSchemaName()  + "." + 
taskRecord.getTableName() +
+                    " with tenant id " + (taskRecord.getTenantId() == null ? " 
IS NULL" : taskRecord.getTenantId()) +
+                    " and timestamp " + timestamp.toString(), t);
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
t.toString());
+        } finally {
+            if (pconn != null) {
+                try {
+                    pconn.close();
+                } catch (SQLException ignored) {
+                    LOG.debug("DropChildViewsTask can't close connection", 
ignored);
+                }
+            }
+        }
+    }
+
+    public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord 
taskRecord) throws Exception {
+        return null;
+    }
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
new file mode 100644
index 0000000..c2bdf51
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -0,0 +1,151 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import com.google.common.base.Strings;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.QueryUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * Task runs periodically to rebuild indexes for System.Task entries.
+ *
+ */
+public class IndexRebuildTask extends BaseTask  {
+    public static final String IndexName = "IndexName";
+    public static final String JobID = "JobID";
+    public static final Log LOG = LogFactory.getLog(IndexRebuildTask.class);
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+        Connection conn = null;
+
+        try {
+            // We have to clone the configuration because env.getConfiguration 
is readonly.
+            Configuration conf = 
HBaseConfiguration.create(env.getConfiguration());
+            conn = QueryUtil.getConnectionOnServer(env.getConfiguration());
+
+            conf.set(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
+
+            String data = taskRecord.getData();
+            if (Strings.isNullOrEmpty(taskRecord.getData())) {
+                data = "{}";
+            }
+            JsonParser jsonParser = new JsonParser();
+            JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
+            String indexName = getIndexName(jsonObject);
+            if (Strings.isNullOrEmpty(indexName)) {
+                String str = "Index name is not found. Index rebuild cannot 
continue " +
+                        "Data : " + data;
+                LOG.warn(str);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+            }
+
+            boolean shouldDisable = false;
+            if (jsonObject.has("DisableBefore")) {
+                String disableBefore = 
jsonObject.get("DisableBefore").toString();
+                if (!Strings.isNullOrEmpty(disableBefore)) {
+                    shouldDisable = Boolean.valueOf(disableBefore);
+                }
+            }
+
+            // Run index tool async.
+            boolean runForeground = false;
+            Map.Entry<Integer, Job> indexToolRes = IndexTool
+                    .run(conf, taskRecord.getSchemaName(), 
taskRecord.getTableName(), indexName, true,
+                            false, taskRecord.getTenantId(), shouldDisable, 
runForeground);
+            int status = indexToolRes.getKey();
+            if (status != 0) {
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "Index 
tool returned : " + status);
+            }
+
+            Job job = indexToolRes.getValue();
+
+            jsonObject.addProperty(JobID, job.getJobID().toString());
+            Task.addTask(conn.unwrap(PhoenixConnection.class ), 
taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
+                    taskRecord.getTableName(), 
PTable.TaskStatus.STARTED.toString(), jsonObject.toString(), 
taskRecord.getPriority(),
+                    taskRecord.getTimeStamp(), null, true);
+            // It will take some time to finish, so we will check the status 
in a separate task.
+            return null;
+        }
+        catch (Throwable t) {
+            LOG.warn("Exception while running index rebuild task. " +
+                    "It will be retried in the next system task table scan : " 
+
+                    taskRecord.getSchemaName() + "." + 
taskRecord.getTableName() +
+                    " with tenant id " + (taskRecord.getTenantId() == null ? " 
IS NULL" : taskRecord.getTenantId()) +
+                    " and data " + taskRecord.getData(), t);
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
t.toString());
+        } finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    LOG.debug("IndexRebuildTask can't close connection");
+                }
+            }
+        }
+
+    }
+
+    private String getIndexName(JsonObject jsonObject) {
+        String indexName = null;
+        // Get index name from data column.
+        if (jsonObject.has(IndexName)) {
+            indexName = jsonObject.get(IndexName).toString().replaceAll("\"", 
"");
+        }
+        return indexName;
+    }
+
+    private String getJobID(String data) {
+        if (Strings.isNullOrEmpty(data)) {
+            data = "{}";
+        }
+        JsonParser jsonParser = new JsonParser();
+        JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
+        String jobId = null;
+        if (jsonObject.has(JobID)) {
+            jobId = jsonObject.get(JobID).toString().replaceAll("\"", "");
+        }
+        return jobId;
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord 
taskRecord)
+            throws Exception {
+
+        String jobID = getJobID(taskRecord.getData());
+        if (jobID != null) {
+            Configuration conf = 
HBaseConfiguration.create(env.getConfiguration());
+            Configuration configuration = 
HBaseConfiguration.addHbaseResources(conf);
+            Cluster cluster = new Cluster(configuration);
+
+            Job job = 
cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobID));
+
+            if (job != null && job.isComplete()) {
+                if (job.isSuccessful()) {
+                    LOG.warn("IndexRebuildTask checkCurrentResult job is 
successful " + taskRecord.getTableName());
+                    return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+                } else {
+                    return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+                            "Index is DISABLED");
+                }
+            }
+
+        }
+        return null;
+    }
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 5427b5f..d56eaa4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -219,6 +219,11 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] TASK_TYPE_BYTES = Bytes.toBytes(TASK_TYPE);
     public static final String TASK_TS = "TASK_TS";
     public static final byte[] TASK_TS_BYTES = Bytes.toBytes(TASK_TS);
+    public static final String TASK_STATUS = "TASK_STATUS";
+    public static final String TASK_END_TS = "TASK_END_TS";
+    public static final String TASK_PRIORITY = "TASK_PRIORITY";
+    public static final String TASK_DATA = "TASK_DATA";
+    public static final String TASK_TABLE_TTL = "864000";
     public static final String ARRAY_SIZE = "ARRAY_SIZE";
     public static final byte[] ARRAY_SIZE_BYTES = Bytes.toBytes(ARRAY_SIZE);
     public static final String VIEW_CONSTANT = "VIEW_CONSTANT";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index bd1a310..6fefe4e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -28,10 +28,12 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
@@ -493,14 +495,17 @@ public class IndexTool extends Configured implements Tool 
{
                 PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
             }
 
-            fs = outputPath.getFileSystem(configuration);
-            fs.delete(outputPath, true);
-
+            if (outputPath != null) {
+                fs = outputPath.getFileSystem(configuration);
+                fs.delete(outputPath, true);
+            }
             final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
schemaName, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            FileOutputFormat.setOutputPath(job, outputPath);
+            if (outputPath != null) {
+                FileOutputFormat.setOutputPath(job, outputPath);
+            }
 
             PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, 
PhoenixServerBuildIndexInputFormat.class,
                             qDataTable, "");
@@ -590,6 +595,7 @@ public class IndexTool extends Configured implements Tool {
                 tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
                 configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
+
             schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
             dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
             indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
@@ -608,7 +614,7 @@ public class IndexTool extends Configured implements Tool {
             pIndexTable = null;
 
             connection = ConnectionUtil.getInputConnection(configuration);
-
+            
             if (indexTable != null) {
                 if (!isValidIndexTable(connection, qDataTable,indexTable, 
tenantId)) {
                     throw new IllegalArgumentException(String.format(
@@ -875,6 +881,48 @@ public class IndexTool extends Configured implements Tool {
         return false;
     }
 
+    public static Map.Entry<Integer, Job> run(Configuration conf, String 
schemaName, String dataTable, String indexTable,
+            boolean directApi, boolean useSnapshot, String tenantId, boolean 
disableBefore, boolean runForeground) throws Exception {
+        final List<String> args = Lists.newArrayList();
+        if (schemaName != null) {
+            args.add("-s");
+            args.add(schemaName);
+        }
+        args.add("-dt");
+        args.add(dataTable);
+        args.add("-it");
+        args.add(indexTable);
+        if (directApi) {
+            args.add("-direct");
+        }
+
+        if (runForeground) {
+            args.add("-runfg");
+        }
+
+        if (useSnapshot) {
+            args.add("-snap");
+        }
+
+        if (tenantId != null) {
+            args.add("-tenant");
+            args.add(tenantId);
+        }
+
+        args.add("-op");
+        args.add("/tmp/" + UUID.randomUUID().toString());
+
+        if (disableBefore) {
+            PhoenixConfigurationUtil.setDisableIndexes(conf, indexTable);
+        }
+
+        IndexTool indexingTool = new IndexTool();
+        indexingTool.setConf(conf);
+        int status = indexingTool.run(args.toArray(new String[0]));
+        Job job = indexingTool.getJob();
+        return new AbstractMap.SimpleEntry<Integer, Job>(status, job);
+    }
+
     public static void main(final String[] args) throws Exception {
         int result = ToolRunner.run(new IndexTool(), args);
         System.exit(result);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 0786b9b..57688fd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -18,17 +18,31 @@
 package org.apache.phoenix.mapreduce.index;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
+
 /**
  * Reducer class that does only one task and that is to update the index state 
of the table.
  */
@@ -41,9 +55,38 @@ public class PhoenixIndexImportDirectReducer extends
     protected void cleanup(Context context) throws IOException, 
InterruptedException{
         try {
             IndexToolUtil.updateIndexState(context.getConfiguration(), 
PIndexState.ACTIVE);
+
+            updateTasksTable(context);
         } catch (SQLException e) {
             LOG.error(" Failed to update the status to Active");
             throw new RuntimeException(e.getMessage());
         }
     }
+
+    private void updateTasksTable(Context context) throws SQLException, 
IOException {
+        final Properties overrideProps = new Properties();
+        final Connection
+                connection = ConnectionUtil
+                .getOutputConnection(context.getConfiguration(), 
overrideProps);
+        try {
+            String fullTableName = 
PhoenixConfigurationUtil.getInputTableName(context.getConfiguration());
+            String tenantId = 
context.getConfiguration().get(MAPREDUCE_TENANT_ID, null);
+            String schemaName = 
SchemaUtil.getSchemaNameFromFullName(fullTableName);
+            String tableName = 
SchemaUtil.getTableNameFromFullName(fullTableName);
+            String indexName = 
PhoenixConfigurationUtil.getDisableIndexes(context.getConfiguration());
+            List<Task.TaskRecord> taskRecords = 
Task.queryTaskTable(connection, schemaName, tableName,
+                    PTable.TaskType.INDEX_REBUILD, tenantId, indexName);
+            if (taskRecords != null && taskRecords.size() > 0) {
+                for (Task.TaskRecord taskRecord : taskRecords) {
+                    TaskRegionObserver.SelfHealingTask.setEndTaskStatus(
+                            connection.unwrap(PhoenixConnection.class), 
taskRecords.get(0),
+                            PTable.TaskStatus.COMPLETED.toString());
+                }
+            }
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1f5cd48..9dcdd0f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -47,8 +47,10 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TABLE_TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
@@ -241,6 +243,7 @@ import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
@@ -3521,6 +3524,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     clearCache();
                 }
             }
+
+            try {
+                metaConnection.createStatement().executeUpdate(getTaskDDL());
+            } catch (NewerTableAlreadyExistsException e) {
+
+            } catch (TableAlreadyExistsException e) {
+                long currentServerSideTableTimeStamp = 
e.getTable().getTimeStamp();
+                if (currentServerSideTableTimeStamp <= 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) {
+                    String
+                            columnsToAdd =
+                            PhoenixDatabaseMetaData.TASK_STATUS + " " + 
PVarchar.INSTANCE.getSqlTypeName() + ", "
+                                    + PhoenixDatabaseMetaData.TASK_END_TS + " 
" + PTimestamp.INSTANCE.getSqlTypeName() + ", "
+                                    + PhoenixDatabaseMetaData.TASK_PRIORITY + 
" " + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", "
+                                    + PhoenixDatabaseMetaData.TASK_DATA + " " 
+ PVarchar.INSTANCE.getSqlTypeName();
+                    String taskTableFullName = 
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TASK_TABLE);
+                    metaConnection =
+                            addColumnsIfNotExists(metaConnection, 
taskTableFullName,
+                                    
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
+                    metaConnection.createStatement().executeUpdate(
+                            "ALTER TABLE " + taskTableFullName + " SET " + TTL 
+ "=" + TASK_TABLE_TTL);
+                    clearCache();
+                }
+            }
+
             try {
                 
metaConnection.createStatement().executeUpdate(getFunctionTableDDL());
             } catch (NewerTableAlreadyExistsException e) {} catch 
(TableAlreadyExistsException e) {}
@@ -3533,9 +3560,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             try {
                 metaConnection.createStatement().executeUpdate(getMutexDDL());
             } catch (NewerTableAlreadyExistsException e) {} catch 
(TableAlreadyExistsException e) {}
-            try {
-                metaConnection.createStatement().executeUpdate(getTaskDDL());
-            } catch (NewerTableAlreadyExistsException e) {} catch 
(TableAlreadyExistsException e) {}
 
             // In case namespace mapping is enabled and system table to system 
namespace mapping is also enabled,
             // create an entry for the SYSTEM namespace in the SYSCAT table, 
so that GRANT/REVOKE commands can work
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 028adfd..a8f332c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -384,9 +384,15 @@ public interface QueryConstants {
             TENANT_ID + " VARCHAR NULL," +
             TABLE_SCHEM + " VARCHAR NULL," +
             TABLE_NAME + " VARCHAR NOT NULL,\n" +
+            // Non-PK columns
+            TASK_STATUS + " VARCHAR NULL," +
+            TASK_END_TS + " TIMESTAMP NULL," +
+            TASK_PRIORITY + " UNSIGNED_TINYINT NULL," +
+            TASK_DATA + " VARCHAR NULL,\n" +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TASK_TYPE + "," + TASK_TS + " ROW_TIMESTAMP," + TENANT_ID + "," + TABLE_SCHEM + 
"," +
             TABLE_NAME + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
+            ColumnFamilyDescriptorBuilder.TTL + "=" + TASK_TABLE_TTL + ",\n" + 
    // 10 days
             PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 3e22225..6c4d3a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -194,7 +194,8 @@ public interface PTable extends PMetaDataEntity {
     }
 
     public enum TaskType {
-        DROP_CHILD_VIEWS((byte)1);
+        DROP_CHILD_VIEWS((byte)1),
+        INDEX_REBUILD((byte)2);
 
         private final byte[] byteValue;
         private final byte serializedValue;
@@ -222,6 +223,29 @@ public interface PTable extends PMetaDataEntity {
         }
     }
 
+    public enum TaskStatus {
+        CREATED {
+            public String toString() {
+                return  "CREATED";
+            }
+        },
+        STARTED {
+            public String toString() {
+                return  "STARTED";
+            }
+        },
+        COMPLETED {
+            public String toString() {
+                return  "COMPLETED";
+            }
+        },
+        FAILED {
+            public String toString() {
+                return  "FAILED";
+            }
+        },
+    }
+
     public enum ImmutableStorageScheme implements 
ColumnValueEncoderDecoderSupplier {
         ONE_CELL_PER_COLUMN((byte)1) {
             @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
new file mode 100644
index 0000000..0f79634
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -0,0 +1,369 @@
+package org.apache.phoenix.schema.task;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+public class Task {
+    private static void mutateSystemTaskTable(PhoenixConnection conn, 
PreparedStatement stmt, boolean accessCheckEnabled)
+            throws IOException {
+        // we need to mutate SYSTEM.TASK with HBase/login user if access is 
enabled.
+        if (accessCheckEnabled) {
+            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    final RpcCall rpcContext = RpcUtil.getRpcContext();
+                    // setting RPC context as null so that user can be reset
+                    try {
+                        RpcUtil.setRpcContext(null);
+                        stmt.execute();
+                        conn.commit();
+                    } catch (SQLException e) {
+                        throw new IOException(e);
+                    } finally {
+                        // setting RPC context back to original context of the 
RPC
+                        RpcUtil.setRpcContext(rpcContext);
+                    }
+                    return null;
+                }
+            });
+        }
+        else {
+            try {
+                stmt.execute();
+                conn.commit();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    private static  PreparedStatement setValuesToAddTaskPS(PreparedStatement 
stmt, PTable.TaskType taskType,
+            String tenantId, String schemaName, String tableName) throws 
SQLException {
+        stmt.setByte(1, taskType.getSerializedValue());
+        if (tenantId != null) {
+            stmt.setString(2, tenantId);
+        } else {
+            stmt.setNull(2, Types.VARCHAR);
+        }
+        if (schemaName != null) {
+            stmt.setString(3, schemaName);
+        } else {
+            stmt.setNull(3, Types.VARCHAR);
+        }
+        stmt.setString(4, tableName);
+        return stmt;
+    }
+
+    private static  PreparedStatement setValuesToAddTaskPS(PreparedStatement 
stmt, PTable.TaskType taskType,
+            String tenantId, String schemaName, String tableName, String 
taskStatus, String data,
+            Integer priority, Timestamp startTs, Timestamp endTs) throws 
SQLException {
+        stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, 
tableName);
+        if (taskStatus != null) {
+            stmt.setString(5, taskStatus);
+        } else {
+            stmt.setString(5, PTable.TaskStatus.CREATED.toString());
+        }
+        if (priority != null) {
+            stmt.setInt(6, priority);
+        } else {
+            byte defaultPri = 4;
+            stmt.setInt(6, defaultPri);
+        }
+        if (startTs == null) {
+            startTs = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+        }
+        stmt.setTimestamp(7, startTs);
+        if (endTs != null) {
+            stmt.setTimestamp(8, endTs);
+        } else {
+            if (taskStatus != null && 
taskStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
+                endTs = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+                stmt.setTimestamp(8, endTs);
+            } else {
+                stmt.setNull(8, Types.TIMESTAMP);
+            }
+        }
+        if (data != null) {
+            stmt.setString(9, data);
+        } else {
+            stmt.setNull(9, Types.VARCHAR);
+        }
+        return stmt;
+    }
+
+    public static void addTask(PhoenixConnection conn, PTable.TaskType 
taskType, String tenantId, String schemaName,
+            String tableName, boolean accessCheckEnabled)
+            throws IOException {
+        PreparedStatement stmt = null;
+        try {
+            stmt = conn.prepareStatement("UPSERT INTO " +
+                    PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
+                    PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+                    PhoenixDatabaseMetaData.TENANT_ID + ", " +
+                    PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+                    PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)");
+            stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, 
tableName);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+        mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+    }
+
+    public static void addTask(PhoenixConnection conn, PTable.TaskType 
taskType, String tenantId, String schemaName,
+            String tableName, String taskStatus, String data, Integer 
priority, Timestamp startTs, Timestamp endTs,
+            boolean accessCheckEnabled)
+            throws IOException {
+        PreparedStatement stmt = null;
+        try {
+            stmt = conn.prepareStatement("UPSERT INTO " +
+                    PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
+                    PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+                    PhoenixDatabaseMetaData.TENANT_ID + ", " +
+                    PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+                    PhoenixDatabaseMetaData.TABLE_NAME + ", " +
+                    PhoenixDatabaseMetaData.TASK_STATUS + ", " +
+                    PhoenixDatabaseMetaData.TASK_PRIORITY + ", " +
+                    PhoenixDatabaseMetaData.TASK_TS + ", " +
+                    PhoenixDatabaseMetaData.TASK_END_TS + ", " +
+                    PhoenixDatabaseMetaData.TASK_DATA +
+                    " ) VALUES(?,?,?,?,?,?,?,?,?)");
+            stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, 
tableName, taskStatus, data, priority, startTs, endTs);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+        mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+    }
+
+    public static void deleteTask(PhoenixConnection conn, PTable.TaskType 
taskType, Timestamp ts, String tenantId,
+            String schemaName, String tableName, boolean accessCheckEnabled) 
throws IOException {
+        PreparedStatement stmt = null;
+        try {
+            stmt = conn.prepareStatement("DELETE FROM " +
+                    PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+                    " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND 
" +
+                    PhoenixDatabaseMetaData.TASK_TS + " = ? AND " +
+                    PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? " 
IS NULL " : " = '" + tenantId + "'") + " AND " +
+                    PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null 
? " IS NULL " : " = '" + schemaName + "'") + " AND " +
+                    PhoenixDatabaseMetaData.TABLE_NAME + " = ?");
+            stmt.setByte(1, taskType.getSerializedValue());
+            stmt.setTimestamp(2, ts);
+            stmt.setString(3, tableName);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+        mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+    }
+
+    private static List<TaskRecord> populateTasks(Connection connection, 
String taskQuery)
+            throws SQLException {
+        PreparedStatement taskStatement = 
connection.prepareStatement(taskQuery);
+        ResultSet rs = taskStatement.executeQuery();
+
+        List<TaskRecord> result = new ArrayList<>();
+        while (rs.next()) {
+            // delete child views only if the parent table is deleted from the 
system catalog
+            TaskRecord taskRecord = parseResult(rs);
+            result.add(taskRecord);
+        }
+        return result;
+    }
+
+    public static List<TaskRecord> queryTaskTable(Connection connection, 
String schema, String tableName,
+            PTable.TaskType taskType, String tenantId, String indexName)
+            throws SQLException {
+        String taskQuery = "SELECT " +
+                PhoenixDatabaseMetaData.TASK_TS + ", " +
+                PhoenixDatabaseMetaData.TENANT_ID + ", " +
+                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+                PhoenixDatabaseMetaData.TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.TASK_STATUS + ", " +
+                PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+                PhoenixDatabaseMetaData.TASK_PRIORITY + ", " +
+                PhoenixDatabaseMetaData.TASK_DATA +
+                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+            taskQuery += " WHERE " +
+                    PhoenixDatabaseMetaData.TABLE_NAME + " ='" + tableName + 
"' AND " +
+                    PhoenixDatabaseMetaData.TASK_TYPE + "=" + 
taskType.getSerializedValue();
+            if (!Strings.isNullOrEmpty(tenantId)) {
+                taskQuery += " AND " + PhoenixDatabaseMetaData.TENANT_ID + 
"='" + tenantId + "' ";
+            }
+
+            if (!Strings.isNullOrEmpty(schema)) {
+                taskQuery += " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + 
"='" + schema + "' ";
+            }
+
+            if (!Strings.isNullOrEmpty(indexName)) {
+                taskQuery += " AND " + PhoenixDatabaseMetaData.TASK_DATA + " 
LIKE '%" + indexName + "%'";
+            }
+
+        return populateTasks(connection, taskQuery);
+    }
+
+    public static List<TaskRecord> queryTaskTable(Connection connection, 
String[] excludedTaskStatus)
+            throws SQLException {
+        String taskQuery = "SELECT " +
+                PhoenixDatabaseMetaData.TASK_TS + ", " +
+                PhoenixDatabaseMetaData.TENANT_ID + ", " +
+                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+                PhoenixDatabaseMetaData.TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.TASK_STATUS + ", " +
+                PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+                PhoenixDatabaseMetaData.TASK_PRIORITY + ", " +
+                PhoenixDatabaseMetaData.TASK_DATA +
+                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+        if (excludedTaskStatus != null && excludedTaskStatus.length > 0) {
+            taskQuery += " WHERE " + PhoenixDatabaseMetaData.TASK_STATUS + " 
IS NULL OR " +
+            PhoenixDatabaseMetaData.TASK_STATUS + " NOT IN (";
+            String[] values = new String[excludedTaskStatus.length];
+            for (int i=0; i < excludedTaskStatus.length; i++) {
+                values[i] = String.format("'%s'", 
excludedTaskStatus[i].trim());
+            }
+
+            //Delimit with comma
+            taskQuery += String.join(",", values);
+            taskQuery += ")";
+        }
+
+        return populateTasks(connection, taskQuery);
+    }
+
+    public static TaskRecord parseResult(ResultSet rs) throws SQLException {
+        TaskRecord taskRecord = new TaskRecord();
+        
taskRecord.setTimeStamp(rs.getTimestamp(PhoenixDatabaseMetaData.TASK_TS));
+        
taskRecord.setTenantId(rs.getString(PhoenixDatabaseMetaData.TENANT_ID));
+        
taskRecord.setTenantIdBytes(rs.getBytes(PhoenixDatabaseMetaData.TENANT_ID));
+        
taskRecord.setSchemaName(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM));
+        
taskRecord.setSchemaNameBytes(rs.getBytes(PhoenixDatabaseMetaData.TABLE_SCHEM));
+        
taskRecord.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
+        
taskRecord.setTableNameBytes(rs.getBytes(PhoenixDatabaseMetaData.TABLE_NAME));
+        
taskRecord.setStatus(rs.getString(PhoenixDatabaseMetaData.TASK_STATUS));
+        
taskRecord.setTaskType(PTable.TaskType.fromSerializedValue(rs.getByte(PhoenixDatabaseMetaData.TASK_TYPE
 )));
+        
taskRecord.setPriority(rs.getInt(PhoenixDatabaseMetaData.TASK_PRIORITY));
+        taskRecord.setData(rs.getString(PhoenixDatabaseMetaData.TASK_DATA));
+        return taskRecord;
+    }
+
+    public static class TaskRecord {
+        private String tenantId;
+        private Timestamp timeStamp;
+        private byte[] tenantIdBytes;
+        private String schemaName= null;
+        private byte[] schemaNameBytes;
+        private String tableName = null;
+        private byte[] tableNameBytes;
+
+        private PTable.TaskType taskType;
+        private String status;
+        private int priority;
+        private String data;
+
+        public String getTenantId() {
+            return tenantId;
+        }
+
+        public void setTenantId(String tenantId) {
+            this.tenantId = tenantId;
+        }
+
+        public Timestamp getTimeStamp() {
+            return timeStamp;
+        }
+
+        public void setTimeStamp(Timestamp timeStamp) {
+            this.timeStamp = timeStamp;
+        }
+
+        public byte[] getTenantIdBytes() {
+            return tenantIdBytes;
+        }
+
+        public void setTenantIdBytes(byte[] tenantIdBytes) {
+            this.tenantIdBytes = tenantIdBytes;
+        }
+
+        public String getSchemaName() {
+            return schemaName;
+        }
+
+        public void setSchemaName(String schemaName) {
+            this.schemaName = schemaName;
+        }
+
+        public byte[] getSchemaNameBytes() {
+            return schemaNameBytes;
+        }
+
+        public void setSchemaNameBytes(byte[] schemaNameBytes) {
+            this.schemaNameBytes = schemaNameBytes;
+        }
+
+        public String getTableName() {
+            return tableName;
+        }
+
+        public void setTableName(String tableName) {
+            this.tableName = tableName;
+        }
+
+        public byte[] getTableNameBytes() {
+            return tableNameBytes;
+        }
+
+        public void setTableNameBytes(byte[] tableNameBytes) {
+            this.tableNameBytes = tableNameBytes;
+        }
+
+        public String getData() {
+            if (data == null) {
+                return "";
+            }
+            return data;
+        }
+
+        public int getPriority() {
+            return priority;
+        }
+
+        public void setPriority(int priority) {
+            this.priority = priority;
+        }
+
+        public void setData(String data) {
+            this.data = data;
+        }
+
+        public String getStatus() {
+            return status;
+        }
+
+        public void setStatus(String status) {
+            this.status = status;
+        }
+
+        public PTable.TaskType getTaskType() {
+            return taskType;
+        }
+
+        public void setTaskType(PTable.TaskType taskType) {
+            this.taskType = taskType;
+        }
+
+    }
+}

Reply via email to