[ 
https://issues.apache.org/jira/browse/PHOENIX-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482773#comment-17482773
 ] 

ASF GitHub Bot commented on PHOENIX-6622:
-----------------------------------------

gjacoby126 commented on a change in pull request #1373:
URL: https://github.com/apache/phoenix/pull/1373#discussion_r793116217



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -56,16 +59,45 @@
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
+import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_STATUS;
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static 
org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
 import static org.apache.phoenix.schema.MetaDataClient.CREATE_LINK;
+import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.VIEW;
 
 public class Transform {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Transform.class);
+    private static String transformSelect = "SELECT " +

Review comment:
       nit: static constant should be TRANSFORM_SELECT

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_MONITOR_ENABLED;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_RETRY_COUNT;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_RETRY_COUNT_VALUE;
+
+/**
+ * Task runs periodically to monitor and orchestrate ongoing transforms in 
System.Transform table.
+ *
+ */
+public class TransformMonitorTask extends BaseTask  {
+    public static final String DEFAULT = "IndexName";
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(TransformMonitorTask.class);
+
+    private static boolean isDisabled = false;
+
+    // Called from testong
+    @VisibleForTesting
+    public static void disableTransformMonitorTask(boolean disabled) {
+        isDisabled = disabled;
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+        Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+        Configuration configuration = 
HBaseConfiguration.addHbaseResources(conf);
+        boolean transformMonitorEnabled = 
configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, 
DEFAULT_TRANSFORM_MONITOR_ENABLED);
+        if (!transformMonitorEnabled || isDisabled) {
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
"TransformMonitor is disabled");
+        }
+
+        try (PhoenixConnection conn = 
QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)){
+            SystemTransformRecord systemTransformRecord = 
Transform.getTransformRecord(taskRecord.getSchemaName(),
+                    taskRecord.getTableName(), null, taskRecord.getTenantId(), 
conn);
+            if (systemTransformRecord == null) {
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+                        "No transform record is found");
+            }
+            String tableName = 
SchemaUtil.getTableName(systemTransformRecord.getSchemaName(),
+                    systemTransformRecord.getLogicalTableName());
+
+            if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.CREATED.name()))
 {
+                LOGGER.info("Transform is created, starting the TransformTool 
", tableName);
+                // Kick a TransformTool run, it will already update transform 
record status and job id
+                TransformTool transformTool = 
TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, 
false, false);
+                if (transformTool == null) {
+                    // This is not a map/reduce error. There must be some 
unexpected issue. So, retrying will not solve the underlying issue.
+                    return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
"TransformTool run failed. Check the parameters.");
+                }
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name()))
 {
+                LOGGER.info("Transform is completed, TransformMonitor is done 
", tableName);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+                    && 
!PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType()))
 {
+                LOGGER.info("Transform is pending cutover ", tableName);
+                Transform.doCutover(conn, systemTransformRecord);
+
+                PTable.TransformType partialTransform = 
PTable.TransformType.getPartialTransform(systemTransformRecord.getTransformType());
+                if (partialTransform != null) {
+                    // Update transform to be partial
+                    SystemTransformRecord.SystemTransformBuilder builder = new 
SystemTransformRecord.SystemTransformBuilder(systemTransformRecord);
+                    builder.setTransformType(partialTransform);
+                    // Decrement retry count since TransformTool will 
increment it. Should we set it to 0?
+                    
builder.setTransformRetryCount(systemTransformRecord.getTransformRetryCount()-1);
+                    Transform.upsertTransform(builder.build(), conn);
+
+                    // Fix unverified rows. Running partial transform will 
make the transform status go back to started
+                    long startFromTs = 0;
+                    if (systemTransformRecord.getTransformLastStateTs() != 
null) {
+                        startFromTs = 
systemTransformRecord.getTransformLastStateTs().getTime()-1;
+                    }
+                    TransformTool.runTransformTool(systemTransformRecord, 
conf, true, startFromTs, null, true, false);
+
+                    // In the future, if we are changing the PK structure, we 
need to run indextools as well
+                } else {
+                    // No partial transform needed so, we update state of the 
transform
+                    LOGGER.warn("No partial type of the transform is found. 
Completing the transform ", tableName);
+                    Transform.updateTransformRecord(conn, 
systemTransformRecord, PTable.TransformStatus.COMPLETED);
+                }
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name())
 ||
+                    
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+                            && 
PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType())))
 {
+                
LOGGER.info(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name())
 ?
+                        "Transform is started, we will monitor ": "Partial 
transform is going on, we will monitor" , tableName);
+                // Monitor the job of transform tool and decide to retry
+                String jobId = systemTransformRecord.getTransformJobId();
+                if (jobId != null) {
+                    Cluster cluster = new Cluster(configuration);
+
+                    Job job = 
cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobId));
+                    if (job == null) {
+                        LOGGER.warn(String.format("Transform job with Id=%s is 
not found", jobId));
+                        return new  
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "The 
job cannot be found");
+                    }
+                    if (job != null && job.isComplete()) {
+                        if (job.isSuccessful()) {
+                            LOGGER.warn("TransformTool job is successful. 
Transform should have been in a COMPLETED state "
+                                    + taskRecord.getTableName());
+                        } else {
+                            // Retry TransformTool run
+                            int maxRetryCount = 
configuration.getInt(TRANSFORM_RETRY_COUNT_VALUE, 
DEFAULT_TRANSFORM_RETRY_COUNT);
+                            if (systemTransformRecord.getTransformRetryCount() 
< maxRetryCount) {
+                                // Retry count will be incremented in 
TransformTool
+                                
TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, 
false, true);
+                            }
+                        }
+                    }
+                }
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.FAILED.name()))
 {
+                String str = "Transform is marked as failed because either 
TransformTool is run on the foreground and failed " +
+                        "or it is run as async but there is something wrong 
with the TransformTool parameters";
+                LOGGER.error(str);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name()))
 {
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS,
+                        "Transform is paused. No need to monitor");
+            } else {
+                String str = "Transform status is not known " + 
systemTransformRecord.getString();
+                LOGGER.error(str);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+            }
+
+            // Update task status to RETRY so that it is retried
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn)
+                    .setTaskType(taskRecord.getTaskType())
+                    .setTenantId(taskRecord.getTenantId())
+                    .setSchemaName(taskRecord.getSchemaName())
+                    .setTableName(taskRecord.getTableName())
+                    .setTaskStatus(PTable.TaskStatus.RETRY.toString())
+                    .setData(taskRecord.getData())
+                    .setPriority(taskRecord.getPriority())
+                    .setStartTs(taskRecord.getTimeStamp())
+                    .setEndTs(null)
+                    .setAccessCheckEnabled(true)
+                    .build());
+            return null;
+        }
+        catch (Throwable t) {
+            LOGGER.warn("Exception while running transform monitor task. " +
+                    "It will be retried in the next system task table scan : " 
+
+                    taskRecord.getSchemaName() + "." + 
taskRecord.getTableName() +
+                    " with tenant id " + (taskRecord.getTenantId() == null ? " 
IS NULL" : taskRecord.getTenantId()) +
+                    " and data " + taskRecord.getData(), t);
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
t.toString());
+        }
+    }
+
+    public static void addTransformMonitorTask(PhoenixConnection connection, 
Configuration configuration, SystemTransformRecord systemTransformRecord,
+                                               PTable.TaskStatus taskStatus, 
Timestamp startTimestamp, Timestamp endTimestamp) throws IOException {
+        boolean transformMonitorEnabled = 
configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, 
DEFAULT_TRANSFORM_MONITOR_ENABLED);
+        if (!transformMonitorEnabled) {
+            LOGGER.warn("TransformMonitor is not enabled. Monitoring/retrying 
TransformTool and doing cutover will not be done automatically");
+            return;
+        }
+
+        Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                .setConn(connection)
+                .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+                .setTenantId(systemTransformRecord.getTenantId())
+                .setSchemaName(systemTransformRecord.getSchemaName())
+                .setTableName(systemTransformRecord.getLogicalTableName())
+                .setTaskStatus(taskStatus.toString())
+                .setStartTs(startTimestamp)
+                .setEndTs(endTimestamp)
+                .setAccessCheckEnabled(true)
+                .build());
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord 
taskRecord)
+            throws Exception {
+        return null;

Review comment:
       why is this returning null? Think we need a comment at least please. 

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
##########
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRawRowCount;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TransformMonitorIT extends ParallelStatsDisabledIT {
+    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+    private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    public TransformMonitorIT() throws IOException, InterruptedException {
+        testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, 
"ONE_CELL_PER_COLUMN");
+        testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+        testProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+
+        TaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+                .getRSForFirstRegionInTable(
+                        PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                .get(0).getCoprocessorHost()
+                
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+
+    @Before
+    public void setupTest() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("DELETE FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+            conn.createStatement().execute("DELETE FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+        }
+    }
+
+    private void testTransformTable(boolean createIndex, boolean createView, 
boolean isImmutable) throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String newTableName = dataTableName + "_1";
+        String newTableFullName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexName2 = "IDX_" + generateUniqueName();
+        String viewName = "VW_" + generateUniqueName();
+        String viewName2 = "VW2_" + generateUniqueName();
+        String viewIdxName = "VW_IDX_" + generateUniqueName();
+        String viewIdxName2 = "VW_IDX_" + generateUniqueName();
+        String view2IdxName1 = "VW2_IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableFullName + " 
(NAME) INCLUDE (ZIP) ";
+        String createViewStmt = "CREATE VIEW %s ( VIEW_COL1 INTEGER, VIEW_COL2 
VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+        String createViewIdxSql = "CREATE INDEX  %s ON " + viewName + " 
(VIEW_COL1) include (VIEW_COL2) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, 
numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+            if (createIndex) {
+                conn.createStatement().execute(String.format(createIndexStmt, 
indexName));
+            }
+
+            if (createView) {
+                conn.createStatement().execute(String.format(createViewStmt, 
viewName));
+                conn.createStatement().execute(String.format(createViewIdxSql, 
viewIdxName));
+                conn.createStatement().execute("UPSERT INTO " + viewName + 
"(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (1, 'uname11', 100, 'viewCol2')");
+            }
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, dataTableName, null, null, 
conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, 
null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, 
taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, 
dataTableFullName);
+            assertEquals(newTableName, 
oldTable.getPhysicalName(true).getString());
+
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, dataTableFullName);
+
+            ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            
assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))), 
newRowCount);
+
+            if (createIndex) {
+                assertEquals(newRowCount, countRows(conn, indexFullName));
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is 
complete, we are using the new table
+                TransformToolIT.upsertRows(conn, dataTableFullName, 
(int)newRowCount+1, additionalRows);
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
indexFullName));
+                assertEquals(newRowCount, 
getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))));
+
+                // Create another index on the new table and count
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createIndexStmt, 
indexName2));
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
dataTableFullName));
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
SchemaUtil.getTableName(schemaName, indexName2)));
+            } else if (createView) {
+                assertEquals(numOfRows, countRows(conn, viewName));
+                assertEquals(numOfRows, countRowsForViewIndex(conn, 
dataTableFullName));
+                assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName);
+                
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is 
complete, we are using the new table
+                TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1, 
additionalRows);
+                assertEquals(newRowCount+additionalRows, getRowCount(conn, 
viewName));
+                assertEquals(newRowCount+additionalRows, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                // Drop view index and create another on the new table and 
count
+                conn.createStatement().execute("DROP INDEX " + viewIdxName + " 
ON " + viewName);
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createViewIdxSql, 
viewIdxName2));
+                assertEquals(newRowCount+additionalRows, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                // Create another view and have a new index on top
+                conn.createStatement().execute(String.format(createViewStmt, 
viewName2));
+                conn.createStatement().execute(String.format(createViewIdxSql, 
view2IdxName1));
+                assertEquals((newRowCount+additionalRows)*2, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                conn.createStatement().execute("UPSERT INTO " + viewName2 + 
"(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000, 
'viewCol100')");
+                ResultSet rs = conn.createStatement().executeQuery("SELECT 
VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
+                assertTrue(rs.next());
+                assertEquals("viewCol100", rs.getString(1));
+                assertEquals("uname100", rs.getString(2));
+                assertFalse(rs.next());
+            }
+
+        }
+    }
+
+    public static int countRows(Connection conn, String tableFullName) throws 
SQLException {
+        ResultSet count = conn.createStatement().executeQuery("select  /*+ 
NO_INDEX*/ count(*) from " + tableFullName);
+        count.next();
+        int numRows = count.getInt(1);
+        return numRows;
+    }
+
+    protected int countRowsForViewIndex(Connection conn, String baseTable) 
throws IOException, SQLException {
+        String viewIndexTableName = 
MetaDataUtil.getViewIndexPhysicalName(baseTable);
+        ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+        Table indexHTable = 
queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+        return getUtility().countRows(indexHTable);
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithoutIndex() throws 
Exception {
+        testTransformTable(false, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithoutIndex() throws 
Exception {
+        testTransformTable(false, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithIndex() throws 
Exception {
+        testTransformTable(true, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_pausedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.PAUSED, 
PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_completedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.COMPLETED, 
PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_failedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.FAILED, 
PTable.TaskStatus.FAILED);
+    }
+
+    private void testTransformMonitor_checkStates(PTable.TransformStatus 
transformStatus, PTable.TaskStatus taskStatus) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            SystemTransformRecord.SystemTransformBuilder transformBuilder = 
new SystemTransformRecord.SystemTransformBuilder();
+            String logicalTableName = generateUniqueName();
+            transformBuilder.setLogicalTableName(logicalTableName);
+            transformBuilder.setTransformStatus(transformStatus.name());
+            transformBuilder.setNewPhysicalTableName(logicalTableName + "_1");
+            Transform.upsertTransform(transformBuilder.build(), 
conn.unwrap(PhoenixConnection.class));
+
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+
+            Timestamp startTs = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn.unwrap(PhoenixConnection.class))
+                    .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+                    .setTenantId(null)
+                    .setSchemaName(null)
+                    .setTableName(logicalTableName)
+                    .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+                    .setData(null)
+                    .setPriority(null)
+                    .setStartTs(startTs)
+                    .setEndTs(null)
+                    .build());
+            task.run();
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, 
logicalTableName, taskStatus);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_pauseAndResumeTransform() throws 
Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            TransformToolIT.pauseTableTransform(schemaName, dataTableName, 
conn, "");
+
+            List<String> args = TransformToolIT.getArgList(schemaName, 
dataTableName, null,
+                    null, null, null, false, false, true, false, false);
+
+            // This run resumes transform and TransformMonitor task runs and 
completes it
+            TransformToolIT.runTransformTool(args.toArray(new String[0]), 0);
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, dataTableName, null, null, 
conn.unwrap(PhoenixConnection.class));
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, 
null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, 
taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, 
dataTableName, PTable.TaskStatus.COMPLETED);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithIndex() throws Exception {
+        testTransformTable(true, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_tableWithViews() throws Exception {
+        testTransformTable(false, true, false);
+    }
+
+    @Test
+    public void testTransformMonitor_index() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String newTableFullName = indexFullName + "_1";
+        String createIndexStmt = "CREATE INDEX " + indexName + " ON " + 
dataTableFullName + " (ZIP) INCLUDE (NAME) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, 
numOfRows, "");
+            conn.createStatement().execute(createIndexStmt);
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexFullName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " 
+ dataTableFullName +
+                    " ACTIVE 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, indexName, dataTableFullName, null, 
conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, 
null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, 
taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(indexName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, 
indexFullName);
+            assertEquals(indexName+"_1", 
oldTable.getPhysicalName(true).getString());
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newTableFullName);
+            ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            
assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(indexFullName))), 
newRowCount);
+        }
+    }
+
+    @Test
+    public void testTransformTableWithTenantViews() throws Exception {
+        String tenantId = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String viewTenantName = "TENANTVW_" + generateUniqueName();
+        String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT 
NULL,ID INTEGER NOT NULL"
+                + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, 
ID)) MULTI_TENANT=true";
+        String createViewStr = "CREATE VIEW %s  (VIEW_COL1 VARCHAR) AS SELECT 
* FROM %s";
+
+        String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME, 
VIEW_COL1) VALUES('%s' , %d, '%s', '%s')";
+
+        Properties props = PropertiesUtil.deepCopy(testProps);
+        Connection connGlobal = null;
+        Connection connTenant = null;
+        try {
+            connGlobal = DriverManager.getConnection(getUrl(), props);
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            connTenant = DriverManager.getConnection(getUrl(), props);
+            connTenant.setAutoCommit(true);
+            String tableStmtGlobal = String.format(createTblStr, 
dataTableName);
+            connGlobal.createStatement().execute(tableStmtGlobal);
+            assertMetadata(connGlobal, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+            String viewStmtTenant = String.format(createViewStr, 
viewTenantName, dataTableName);
+            connTenant.createStatement().execute(viewStmtTenant);
+
+            // TODO: Fix this as part of implementing TransformTool so that 
the tenant view rows could be read from the tool
+//            connTenant.createStatement()
+//                    .execute(String.format(upsertQueryStr, viewTenantName, 
tenantId, 1, "x", "xx"));
+            try {
+                connTenant.createStatement().execute("ALTER TABLE " + 
dataTableName
+                        + " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+                fail("Tenant connection cannot do alter");
+            } catch (SQLException e) {
+                
assertEquals(CANNOT_CREATE_TENANT_SPECIFIC_TABLE.getErrorCode(), 
e.getErrorCode());
+            }
+            connGlobal.createStatement().execute("ALTER TABLE " + dataTableName
+                    + " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord tableRecord = 
Transform.getTransformRecord(null, dataTableName, null, null, 
connGlobal.unwrap(PhoenixConnection.class));
+            assertNotNull(tableRecord);
+
+            
waitForTransformToGetToState(connGlobal.unwrap(PhoenixConnection.class), 
tableRecord, PTable.TransformStatus.COMPLETED);
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, viewTenantName, 
tenantId, 2, "y", "yy"));
+
+            ResultSet rs = connTenant.createStatement().executeQuery("SELECT 
/*+ NO_INDEX */ VIEW_COL1 FROM " + viewTenantName);
+            assertTrue(rs.next());
+//            assertEquals("xx", rs.getString(1));
+//            assertTrue(rs.next());
+            assertEquals("yy", rs.getString(1));
+            assertFalse(rs.next());
+        } finally {
+            if (connGlobal != null) {
+                connGlobal.close();
+            }
+            if (connTenant != null) {
+                connTenant.close();
+            }
+        }
+    }
+
+    @Test
+    public void testTransformAlreadyTransformedIndex() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableName + " 
(NAME) INCLUDE (ZIP) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableName, 
numOfRows, "");
+            conn.createStatement().execute(String.format(createIndexStmt, 
indexName));
+            assertEquals(numOfRows, countRows(conn, indexName));
+
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " 
+ dataTableName +
+                    " ACTIVE 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(null, 
indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+
+            TransformToolIT.upsertRows(conn, dataTableName, 2, 1);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " 
+ dataTableName +
+                    " ACTIVE SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, 
COLUMN_ENCODED_BYTES=0");
+            record = Transform.getTransformRecord(null, indexName, 
dataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            TransformToolIT.upsertRows(conn, dataTableName, 3, 1);
+            assertEquals(numOfRows + 2, countRows(conn, indexName));
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, 
record.getNewPhysicalTableName());
+            ResultSet rs = conn.createStatement().executeQuery("SELECT 
\":ID\", \"0:ZIP\" FROM " + indexName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals( 95051, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertEquals( 95052, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("3", rs.getString(1));
+            assertEquals( 95053, rs.getInt(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testTransformAlreadyTransformedTable() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            String stmString1 =
+                    "CREATE TABLE IF NOT EXISTS " + dataTableName
+                            + " (ID INTEGER NOT NULL, CITY_PK VARCHAR NOT 
NULL, NAME_PK VARCHAR NOT NULL,NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY 
KEY(ID, CITY_PK, NAME_PK)) ";
+            conn.createStatement().execute(stmString1);
+
+            String upsertQuery = "UPSERT INTO %s VALUES(%d, '%s', '%s', '%s', 
%d)";
+
+            // insert rows
+            conn.createStatement().execute(String.format(upsertQuery, 
dataTableName, 1, "city1", "name1", "uname1", 95051));
+
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(null, 
dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+            conn.createStatement().execute(String.format(upsertQuery, 
dataTableName, 2, "city2", "name2", "uname2", 95052));
+
+            assertEquals(numOfRows+1, countRows(conn, dataTableName));
+
+            // Make sure that we are not accessing the original table. We are 
supposed to read from the new table above
+            Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName hTableName = TableName.valueOf(dataTableName);
+            admin.disableTable(hTableName);
+            admin.deleteTable(hTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, 
COLUMN_ENCODED_BYTES=0");

Review comment:
       Ditto

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
##########
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRawRowCount;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TransformMonitorIT extends ParallelStatsDisabledIT {
+    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+    private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    public TransformMonitorIT() throws IOException, InterruptedException {
+        testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, 
"ONE_CELL_PER_COLUMN");
+        testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+        testProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+
+        TaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+                .getRSForFirstRegionInTable(
+                        PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                .get(0).getCoprocessorHost()
+                
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+
+    @Before
+    public void setupTest() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("DELETE FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+            conn.createStatement().execute("DELETE FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+        }
+    }
+
+    private void testTransformTable(boolean createIndex, boolean createView, 
boolean isImmutable) throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String newTableName = dataTableName + "_1";
+        String newTableFullName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexName2 = "IDX_" + generateUniqueName();
+        String viewName = "VW_" + generateUniqueName();
+        String viewName2 = "VW2_" + generateUniqueName();
+        String viewIdxName = "VW_IDX_" + generateUniqueName();
+        String viewIdxName2 = "VW_IDX_" + generateUniqueName();
+        String view2IdxName1 = "VW2_IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableFullName + " 
(NAME) INCLUDE (ZIP) ";
+        String createViewStmt = "CREATE VIEW %s ( VIEW_COL1 INTEGER, VIEW_COL2 
VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+        String createViewIdxSql = "CREATE INDEX  %s ON " + viewName + " 
(VIEW_COL1) include (VIEW_COL2) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, 
numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+            if (createIndex) {
+                conn.createStatement().execute(String.format(createIndexStmt, 
indexName));
+            }
+
+            if (createView) {
+                conn.createStatement().execute(String.format(createViewStmt, 
viewName));
+                conn.createStatement().execute(String.format(createViewIdxSql, 
viewIdxName));
+                conn.createStatement().execute("UPSERT INTO " + viewName + 
"(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (1, 'uname11', 100, 'viewCol2')");
+            }
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, dataTableName, null, null, 
conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, 
null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, 
taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, 
dataTableFullName);
+            assertEquals(newTableName, 
oldTable.getPhysicalName(true).getString());
+
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, dataTableFullName);
+
+            ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            
assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))), 
newRowCount);
+
+            if (createIndex) {
+                assertEquals(newRowCount, countRows(conn, indexFullName));
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is 
complete, we are using the new table
+                TransformToolIT.upsertRows(conn, dataTableFullName, 
(int)newRowCount+1, additionalRows);
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
indexFullName));
+                assertEquals(newRowCount, 
getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))));
+
+                // Create another index on the new table and count
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createIndexStmt, 
indexName2));
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
dataTableFullName));
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
SchemaUtil.getTableName(schemaName, indexName2)));
+            } else if (createView) {
+                assertEquals(numOfRows, countRows(conn, viewName));
+                assertEquals(numOfRows, countRowsForViewIndex(conn, 
dataTableFullName));
+                assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName);
+                
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is 
complete, we are using the new table
+                TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1, 
additionalRows);
+                assertEquals(newRowCount+additionalRows, getRowCount(conn, 
viewName));
+                assertEquals(newRowCount+additionalRows, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                // Drop view index and create another on the new table and 
count
+                conn.createStatement().execute("DROP INDEX " + viewIdxName + " 
ON " + viewName);
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createViewIdxSql, 
viewIdxName2));
+                assertEquals(newRowCount+additionalRows, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                // Create another view and have a new index on top
+                conn.createStatement().execute(String.format(createViewStmt, 
viewName2));
+                conn.createStatement().execute(String.format(createViewIdxSql, 
view2IdxName1));
+                assertEquals((newRowCount+additionalRows)*2, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                conn.createStatement().execute("UPSERT INTO " + viewName2 + 
"(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000, 
'viewCol100')");
+                ResultSet rs = conn.createStatement().executeQuery("SELECT 
VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
+                assertTrue(rs.next());
+                assertEquals("viewCol100", rs.getString(1));
+                assertEquals("uname100", rs.getString(2));
+                assertFalse(rs.next());
+            }
+
+        }
+    }
+
+    public static int countRows(Connection conn, String tableFullName) throws 
SQLException {
+        ResultSet count = conn.createStatement().executeQuery("select  /*+ 
NO_INDEX*/ count(*) from " + tableFullName);
+        count.next();
+        int numRows = count.getInt(1);
+        return numRows;
+    }
+
+    protected int countRowsForViewIndex(Connection conn, String baseTable) 
throws IOException, SQLException {
+        String viewIndexTableName = 
MetaDataUtil.getViewIndexPhysicalName(baseTable);
+        ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+        Table indexHTable = 
queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+        return getUtility().countRows(indexHTable);
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithoutIndex() throws 
Exception {
+        testTransformTable(false, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithoutIndex() throws 
Exception {
+        testTransformTable(false, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithIndex() throws 
Exception {
+        testTransformTable(true, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_pausedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.PAUSED, 
PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_completedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.COMPLETED, 
PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_failedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.FAILED, 
PTable.TaskStatus.FAILED);
+    }
+
+    private void testTransformMonitor_checkStates(PTable.TransformStatus 
transformStatus, PTable.TaskStatus taskStatus) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            SystemTransformRecord.SystemTransformBuilder transformBuilder = 
new SystemTransformRecord.SystemTransformBuilder();
+            String logicalTableName = generateUniqueName();
+            transformBuilder.setLogicalTableName(logicalTableName);
+            transformBuilder.setTransformStatus(transformStatus.name());
+            transformBuilder.setNewPhysicalTableName(logicalTableName + "_1");
+            Transform.upsertTransform(transformBuilder.build(), 
conn.unwrap(PhoenixConnection.class));
+
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+
+            Timestamp startTs = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn.unwrap(PhoenixConnection.class))
+                    .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+                    .setTenantId(null)
+                    .setSchemaName(null)
+                    .setTableName(logicalTableName)
+                    .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+                    .setData(null)
+                    .setPriority(null)
+                    .setStartTs(startTs)
+                    .setEndTs(null)
+                    .build());
+            task.run();
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, 
logicalTableName, taskStatus);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_pauseAndResumeTransform() throws 
Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            TransformToolIT.pauseTableTransform(schemaName, dataTableName, 
conn, "");
+
+            List<String> args = TransformToolIT.getArgList(schemaName, 
dataTableName, null,
+                    null, null, null, false, false, true, false, false);
+
+            // This run resumes transform and TransformMonitor task runs and 
completes it
+            TransformToolIT.runTransformTool(args.toArray(new String[0]), 0);
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, dataTableName, null, null, 
conn.unwrap(PhoenixConnection.class));
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, 
null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, 
taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, 
dataTableName, PTable.TaskStatus.COMPLETED);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithIndex() throws Exception {
+        testTransformTable(true, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_tableWithViews() throws Exception {
+        testTransformTable(false, true, false);
+    }
+
+    @Test
+    public void testTransformMonitor_index() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String newTableFullName = indexFullName + "_1";
+        String createIndexStmt = "CREATE INDEX " + indexName + " ON " + 
dataTableFullName + " (ZIP) INCLUDE (NAME) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, 
numOfRows, "");
+            conn.createStatement().execute(createIndexStmt);
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexFullName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " 
+ dataTableFullName +
+                    " ACTIVE 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, indexName, dataTableFullName, null, 
conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, 
null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, 
taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(indexName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, 
indexFullName);
+            assertEquals(indexName+"_1", 
oldTable.getPhysicalName(true).getString());
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newTableFullName);
+            ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            
assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(indexFullName))), 
newRowCount);
+        }
+    }
+
+    @Test
+    public void testTransformTableWithTenantViews() throws Exception {
+        String tenantId = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String viewTenantName = "TENANTVW_" + generateUniqueName();
+        String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT 
NULL,ID INTEGER NOT NULL"
+                + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, 
ID)) MULTI_TENANT=true";
+        String createViewStr = "CREATE VIEW %s  (VIEW_COL1 VARCHAR) AS SELECT 
* FROM %s";
+
+        String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME, 
VIEW_COL1) VALUES('%s' , %d, '%s', '%s')";
+
+        Properties props = PropertiesUtil.deepCopy(testProps);
+        Connection connGlobal = null;
+        Connection connTenant = null;
+        try {
+            connGlobal = DriverManager.getConnection(getUrl(), props);
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            connTenant = DriverManager.getConnection(getUrl(), props);
+            connTenant.setAutoCommit(true);
+            String tableStmtGlobal = String.format(createTblStr, 
dataTableName);
+            connGlobal.createStatement().execute(tableStmtGlobal);
+            assertMetadata(connGlobal, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+            String viewStmtTenant = String.format(createViewStr, 
viewTenantName, dataTableName);
+            connTenant.createStatement().execute(viewStmtTenant);
+
+            // TODO: Fix this as part of implementing TransformTool so that 
the tenant view rows could be read from the tool
+//            connTenant.createStatement()
+//                    .execute(String.format(upsertQueryStr, viewTenantName, 
tenantId, 1, "x", "xx"));
+            try {
+                connTenant.createStatement().execute("ALTER TABLE " + 
dataTableName
+                        + " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+                fail("Tenant connection cannot do alter");
+            } catch (SQLException e) {
+                
assertEquals(CANNOT_CREATE_TENANT_SPECIFIC_TABLE.getErrorCode(), 
e.getErrorCode());
+            }
+            connGlobal.createStatement().execute("ALTER TABLE " + dataTableName
+                    + " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord tableRecord = 
Transform.getTransformRecord(null, dataTableName, null, null, 
connGlobal.unwrap(PhoenixConnection.class));
+            assertNotNull(tableRecord);
+
+            
waitForTransformToGetToState(connGlobal.unwrap(PhoenixConnection.class), 
tableRecord, PTable.TransformStatus.COMPLETED);
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, viewTenantName, 
tenantId, 2, "y", "yy"));
+
+            ResultSet rs = connTenant.createStatement().executeQuery("SELECT 
/*+ NO_INDEX */ VIEW_COL1 FROM " + viewTenantName);
+            assertTrue(rs.next());
+//            assertEquals("xx", rs.getString(1));
+//            assertTrue(rs.next());
+            assertEquals("yy", rs.getString(1));
+            assertFalse(rs.next());
+        } finally {
+            if (connGlobal != null) {
+                connGlobal.close();
+            }
+            if (connTenant != null) {
+                connTenant.close();
+            }
+        }
+    }
+
+    @Test
+    public void testTransformAlreadyTransformedIndex() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableName + " 
(NAME) INCLUDE (ZIP) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableName, 
numOfRows, "");
+            conn.createStatement().execute(String.format(createIndexStmt, 
indexName));
+            assertEquals(numOfRows, countRows(conn, indexName));
+
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " 
+ dataTableName +
+                    " ACTIVE 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(null, 
indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+
+            TransformToolIT.upsertRows(conn, dataTableName, 2, 1);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " 
+ dataTableName +

Review comment:
       I think this comment is still waiting to be implemented. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TransformMonitor should orchestrate transform and do retries
> ------------------------------------------------------------
>
>                 Key: PHOENIX-6622
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6622
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Gokcen Iskender
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to