http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 73554c9..15f60df 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -18,6 +18,15 @@
 package org.apache.phoenix.end2end;
 
 import static 
org.apache.hadoop.hbase.HColumnDescriptor.DEFAULT_REPLICATION_SCOPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static 
org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.closeConnection;
 import static org.apache.phoenix.util.TestUtil.closeStatement;
@@ -35,6 +44,8 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -55,6 +66,8 @@ import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.IndexUtil;
@@ -62,6 +75,9 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  *
@@ -74,14 +90,28 @@ import org.junit.Test;
  * or at the end of test class.
  *
  */
+@RunWith(Parameterized.class)
 public class AlterTableIT extends ParallelStatsDisabledIT {
     private String schemaName;
     private String dataTableName;
     private String indexTableName;
     private String localIndexTableName;
+    private String viewName;
     private String dataTableFullName;
     private String indexTableFullName;
     private String localIndexTableFullName;
+    private String tableDDLOptions;
+    private final boolean columnEncoded;
+    
+    public AlterTableIT(boolean columnEncoded) {
+        this.columnEncoded = columnEncoded;
+        this.tableDDLOptions = columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0";
+    }
+    
+    @Parameters(name="AlterTableIT_columnEncoded={0}") // name is used by 
failsafe as file name in reports
+    public static Collection<Boolean> data() {
+        return Arrays.asList( false, true);
+    }
 
     @Before
     public void setupTableNames() throws Exception {
@@ -92,6 +122,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
         indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
         localIndexTableFullName = SchemaUtil.getTableName(schemaName, 
localIndexTableName);
+        viewName = generateUniqueName();
     }
 
     @Test
@@ -103,7 +134,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         try {
             String ddl = "CREATE TABLE  " + dataTableFullName +
                     "  (a_string varchar not null, a_binary varbinary not 
null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary)) " + 
tableDDLOptions;
             createTestTable(getUrl(), ddl);
 
             ddl = "ALTER TABLE " + dataTableFullName + " ADD b_string VARCHAR 
NULL PRIMARY KEY";
@@ -144,7 +175,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         try {
             String ddl = "CREATE TABLE " + dataTableFullName +
                     "  (a_string varchar not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                    "  CONSTRAINT pk PRIMARY KEY (a_string)) " + 
tableDDLOptions;
             conn.createStatement().execute(ddl);
 
             String dml = "UPSERT INTO " + dataTableFullName + " VALUES(?)";
@@ -211,7 +242,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String ddl = "CREATE TABLE  " + dataTableFullName +
                 "  (a_string varchar not null, col1 integer" +
-                "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                "  CONSTRAINT pk PRIMARY KEY (a_string)) " + tableDDLOptions;
         try {
             conn.createStatement().execute(ddl);
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName 
+ " ADD CF.col2 integer CF.IN_MEMORY=true");
@@ -233,8 +264,31 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         assertEquals(exists, rs.next());
     }
 
+    
     @Test
-    public void testDropIndexedColumn() throws Exception {
+    public void testDropIndexedColumnImmutableIndex() throws Exception {
+        helpTestDropIndexedColumn(true);
+    }
+    
+    @Test
+    public void testDropIndexedColumnMutableIndex() throws Exception {
+        helpTestDropIndexedColumn(false);
+    }
+    
+    private String generateDDLOptions(String options) {
+        StringBuilder sb = new StringBuilder();
+        if (!options.isEmpty()) {
+            sb.append(options);
+        }
+        if (!tableDDLOptions.isEmpty()) {
+            if (sb.length()!=0)
+                sb.append(",");
+            sb.append(tableDDLOptions);
+        }
+        return sb.toString();
+    }
+    
+    private void helpTestDropIndexedColumn(boolean immutable) throws Exception 
{
         String query;
         ResultSet rs;
         PreparedStatement stmt;
@@ -246,7 +300,9 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         // make sure that the tables are empty, but reachable
         conn.createStatement().execute(
           "CREATE TABLE " + dataTableFullName
-              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "
+              + generateDDLOptions(immutable ? "IMMUTABLE_ROWS = true" : "")
+              + (!columnEncoded ? ",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ""));
         query = "SELECT * FROM " + dataTableFullName;
         rs = conn.createStatement().executeQuery(query);
         assertFalse(rs.next());
@@ -306,7 +362,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         // make sure that the tables are empty, but reachable
         conn.createStatement().execute(
           "CREATE TABLE " + dataTableFullName
-              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 
VARCHAR)");
+              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 
VARCHAR) " +  tableDDLOptions);
         String dataTableQuery = "SELECT * FROM " + dataTableFullName;
         rs = conn.createStatement().executeQuery(dataTableQuery);
         assertFalse(rs.next());
@@ -421,7 +477,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         // make sure that the tables are empty, but reachable
         conn.createStatement().execute(
           "CREATE TABLE " + dataTableFullName
-              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + 
tableDDLOptions);
         query = "SELECT * FROM " + dataTableFullName;
         rs = conn.createStatement().executeQuery(query);
         assertFalse(rs.next());
@@ -517,7 +573,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                        "        B.M_TEXT VARCHAR\n" +
                        "        CONSTRAINT ROWKEY PRIMARY KEY\n" +
                        "(SENDER_ID,RECIPIENT_ID,M_TIMESTAMP DESC,ROW_ID))\n" +
-                       "SALT_BUCKETS=4";
+                       generateDDLOptions("SALT_BUCKETS=4");
             conn.createStatement().execute(ddl);
 
             ddl = "ALTER TABLE " + dataTableFullName + " SET 
IMMUTABLE_ROWS=true";
@@ -551,7 +607,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                     "        B.M_TEXT VARCHAR\n" +
                     "        CONSTRAINT ROWKEY PRIMARY KEY\n" +
                     "(SENDER_ID,RECIPIENT_ID,M_TIMESTAMP DESC,ROW_ID))\n" +
-                    "SALT_BUCKETS=4";
+                    generateDDLOptions("SALT_BUCKETS=4");
             conn.createStatement().execute(ddl);
 
             ddl = "ALTER TABLE " + dataTableFullName + " DROP COLUMN B.JSON";
@@ -574,7 +630,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         try {
             String ddl = "CREATE TABLE " + dataTableFullName +
                     "  (a_string varchar not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                    "  CONSTRAINT pk PRIMARY KEY (a_string)) " + 
tableDDLOptions;
             conn.createStatement().execute(ddl);
 
             String dml = "UPSERT INTO " + dataTableFullName + " VALUES(?)";
@@ -681,7 +737,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         conn.setAutoCommit(false);
         try {
             String ddl = "CREATE TABLE " + dataTableFullName + " " + "  
(a_string varchar not null, col1 integer, cf1.col2 integer"
-                    + "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                    + "  CONSTRAINT pk PRIMARY KEY (a_string)) " + 
tableDDLOptions;
             conn.createStatement().execute(ddl);
 
             ddl = "ALTER TABLE " + dataTableFullName + " DROP COLUMN col1";
@@ -704,7 +760,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
             conn.setAutoCommit(false);
             try {
                 String ddl = "CREATE TABLE " + dataTableFullName + " " + "  
(a_string varchar not null, col1 integer, cf1.col2 integer"
-                        + "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                        + "  CONSTRAINT pk PRIMARY KEY (a_string)) " +  
tableDDLOptions;
                 stmt = conn.prepareStatement(ddl);
                 stmt.execute();
             } finally {
@@ -739,7 +795,9 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                     .execute(
                             "CREATE TABLE " + dataTableFullName
                                     + "  (a_string varchar not null, col1 
integer, cf1.col2 integer, col3 integer , cf2.col4 integer "
-                                    + "  CONSTRAINT pk PRIMARY KEY (a_string)) 
immutable_rows=true, disable_wal=true ");
+                                    + "  CONSTRAINT pk PRIMARY KEY (a_string)) 
"
+                                    + generateDDLOptions("immutable_rows=true, 
disable_wal=true"
+                                    + (!columnEncoded ? 
",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : "")));
 
             Connection conn2 = DriverManager.getConnection(getUrl(), props);
             String query = "SELECT * FROM " + dataTableFullName;
@@ -771,7 +829,9 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                     .execute(
                             "CREATE TABLE " + dataTableFullName
                                     + "  (a_string varchar not null, col1 
integer, cf1.col2 integer, col3 integer , cf2.col4 integer "
-                                    + "  CONSTRAINT pk PRIMARY KEY (a_string)) 
immutable_rows=true");
+                                    + "  CONSTRAINT pk PRIMARY KEY 
(a_string))" 
+                                    + generateDDLOptions("immutable_rows=true"
+                                    + (!columnEncoded ? 
",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : "")));
 
             Connection conn2 = DriverManager.getConnection(getUrl(), props);
             String query = "SELECT * FROM " + dataTableFullName;
@@ -804,7 +864,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                     .execute(
                             "CREATE TABLE " + dataTableFullName
                                     + "  (a_string varchar not null, col1 
integer, cf1.col2 integer, col3 integer , cf2.col4 integer "
-                                    + "  CONSTRAINT pk PRIMARY KEY 
(a_string))");
+                                    + "  CONSTRAINT pk PRIMARY KEY (a_string)) 
" + tableDDLOptions);
 
             Connection conn2 = DriverManager.getConnection(getUrl(), props);
             String query = "SELECT * FROM " + dataTableFullName;
@@ -842,7 +902,9 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                     .execute(
                             "CREATE TABLE " + dataTableFullName
                                     + "  (a_string varchar not null, col1 
integer, cf1.col2 integer, col3 integer , cf2.col4 integer "
-                                    + "  CONSTRAINT pk PRIMARY KEY (a_string)) 
immutable_rows=true , SALT_BUCKETS=3 ");
+                                    + "  CONSTRAINT pk PRIMARY KEY (a_string)) 
" 
+                                    + generateDDLOptions("immutable_rows=true 
, SALT_BUCKETS=3 "
+                                    + (!columnEncoded ? 
",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : "")));
 
             String query = "SELECT * FROM " + dataTableFullName;
             ResultSet rs = conn.createStatement().executeQuery(query);
@@ -931,7 +993,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
 
         // here we insert into the orig schema with one column
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
-        conn1.createStatement().execute("create table " + dataTableFullName + 
"(id VARCHAR PRIMARY KEY, field1 BIGINT)");
+        conn1.createStatement().execute("create table " + dataTableFullName + 
"(id VARCHAR PRIMARY KEY, field1 BIGINT) " + tableDDLOptions);
         PreparedStatement stmtInsert1 = conn1.prepareStatement("upsert into " 
+ dataTableFullName + " (id, field1) values ( ?, ?)");
         stmtInsert1.setString(1, "key1");
         stmtInsert1.setLong(2, 1L);
@@ -963,7 +1025,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2))";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) "  + tableDDLOptions;
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " ADD STRING VARCHAR, 
STRING_DATA_TYPES VARCHAR";
@@ -980,7 +1042,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         String ddl = "CREATE TABLE " + dataTableFullName + " (\n"
                 +"ID VARCHAR(15) PRIMARY KEY,\n"
-                +"COL1 BIGINT)";
+                +"COL1 BIGINT) " + tableDDLOptions;
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         conn1.createStatement().execute("CREATE INDEX " + indexTableName + " 
ON " + dataTableFullName + "(COL1)");
@@ -1037,7 +1099,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 + "COL1 BIGINT,"
                 + "COL2 BIGINT,"
                 + "COL3 BIGINT,"
-                + "COL4 BIGINT)";
+                + "COL4 BIGINT) " + tableDDLOptions;
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         conn1.createStatement().execute("CREATE INDEX " + indexTableName + " 
ON " + dataTableFullName + "(COL1) INCLUDE (COL2,COL3,COL4)");
@@ -1093,7 +1155,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET REPLICATION_SCOPE=1";
@@ -1116,7 +1178,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET 
COMPACTION_ENABLED=FALSE";
@@ -1138,7 +1200,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET COMPACTION_ENABLED = 
FALSE, REPLICATION_SCOPE = 1";
@@ -1162,7 +1224,8 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CF1.CREATION_TIME BIGINT,\n"
                 +"CF2.LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) IMMUTABLE_ROWS=true";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("IMMUTABLE_ROWS=true"
+                + (!columnEncoded ? ",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ""));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute(ddl);
         assertImmutableRows(conn, dataTableFullName, true);
@@ -1204,7 +1267,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET CF.COMPACTION_ENABLED 
= FALSE";
@@ -1225,7 +1288,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET CF.DISABLE_WAL = 
TRUE";
@@ -1246,7 +1309,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"CF.LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET CF.TTL = 86400";
@@ -1267,7 +1330,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET CF.REPLICATION_SCOPE 
= 1";
@@ -1288,7 +1351,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions(" SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         ddl = "ALTER TABLE " + dataTableFullName + " SET DEFAULT_COLUMN_FAMILY 
= 'A'";
@@ -1309,19 +1372,19 @@ public class AlterTableIT extends 
ParallelStatsDisabledIT {
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
-        ddl = "CREATE VIEW v AS SELECT * FROM " + dataTableFullName + " WHERE 
CREATION_TIME = 1";
+        ddl = "CREATE VIEW " + viewName + "  AS SELECT * FROM " + 
dataTableFullName + " WHERE CREATION_TIME = 1";
         conn1.createStatement().execute(ddl);
-        ddl = "ALTER VIEW v SET REPLICATION_SCOPE = 1";
+        ddl = "ALTER VIEW " + viewName + " SET REPLICATION_SCOPE = 1";
         try {
             conn1.createStatement().execute(ddl);
             fail();
         } catch (SQLException e) {
             assertEquals(SQLExceptionCode.VIEW_WITH_PROPERTIES.getErrorCode(), 
e.getErrorCode());
         }
-        ddl = "ALTER VIEW v SET COMPACTION_ENABLED = FALSE";
+        ddl = "ALTER VIEW " + viewName + " SET COMPACTION_ENABLED = FALSE";
         try {
             conn1.createStatement().execute(ddl);
             fail();
@@ -1339,7 +1402,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("SALT_BUCKETS = 8");
         Connection conn1 = DriverManager.getConnection(getUrl(), props);
         conn1.createStatement().execute(ddl);
         String viewFullName = SchemaUtil.getTableName(schemaName, 
generateUniqueName());
@@ -1374,7 +1437,8 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                 +"CREATED_DATE DATE,\n"
                 +"CREATION_TIME BIGINT,\n"
                 +"CF.LAST_USED DATE,\n"
-                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) IMMUTABLE_ROWS=true, 
DEFAULT_COLUMN_FAMILY = 'XYZ'";
+                +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) " + 
generateDDLOptions("IMMUTABLE_ROWS=true, DEFAULT_COLUMN_FAMILY = 'XYZ'"
+                + (!columnEncoded ? ",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ""));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute(ddl);
         assertImmutableRows(conn, dataTableFullName, true);
@@ -1408,7 +1472,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
         String ddl = "CREATE TABLE " + dataTableFullName
                 +
                 "  (a_string varchar not null, col1 integer, CF.col2 integer" +
-                "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                "  CONSTRAINT pk PRIMARY KEY (a_string)) " + tableDDLOptions;
         try {
             conn.createStatement().execute(ddl);
             conn.createStatement().execute(
@@ -1434,7 +1498,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
         String ddl = "CREATE TABLE " + dataTableFullName + " "
                 +
                 "  (a_string varchar not null, col1 integer, CF1.col2 integer" 
+
-                "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                "  CONSTRAINT pk PRIMARY KEY (a_string)) " + tableDDLOptions;
         try {
             conn.createStatement().execute(ddl);
             conn.createStatement()
@@ -1468,7 +1532,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
         String ddl = "CREATE TABLE " + dataTableFullName + " "
                 +
                 "  (a_string varchar not null, col1 integer, CF1.col2 integer" 
+
-                "  CONSTRAINT pk PRIMARY KEY (a_string)) DEFAULT_COLUMN_FAMILY 
= 'XYZ'\n";
+                "  CONSTRAINT pk PRIMARY KEY (a_string)) " + 
generateDDLOptions("DEFAULT_COLUMN_FAMILY = 'XYZ'");
         try {
             conn.createStatement().execute(ddl);
             conn.createStatement()
@@ -1502,7 +1566,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
         String ddl = "CREATE TABLE " + dataTableFullName + " "
                 +
                        "  (a_string varchar not null, col1 integer, CF1.col2 
integer" +
-                       "  CONSTRAINT pk PRIMARY KEY (a_string)) 
DEFAULT_COLUMN_FAMILY = 'XYZ'\n";
+                       "  CONSTRAINT pk PRIMARY KEY (a_string)) "+ 
generateDDLOptions("DEFAULT_COLUMN_FAMILY = 'XYZ'");
        try {
                conn.createStatement().execute(ddl);
                try {
@@ -1525,7 +1589,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
         String ddl = "CREATE TABLE " + dataTableFullName
                 +
                 "  (a_string varchar not null, col1 integer, CF1.col2 integer, 
CF2.col3 integer" +
-                "  CONSTRAINT pk PRIMARY KEY (a_string)) DEFAULT_COLUMN_FAMILY 
= 'XYZ'\n";
+                "  CONSTRAINT pk PRIMARY KEY (a_string)) " + 
generateDDLOptions("DEFAULT_COLUMN_FAMILY = 'XYZ' ");
         try {
             conn.createStatement().execute(ddl);
             conn.createStatement()
@@ -1562,7 +1626,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
         String ddl = "CREATE TABLE " + dataTableFullName
                 +
                        "  (a_string varchar not null, col1 integer, CF1.col2 
integer" +
-                       "  CONSTRAINT pk PRIMARY KEY (a_string)) 
DEFAULT_COLUMN_FAMILY = 'XYZ'\n";
+                       "  CONSTRAINT pk PRIMARY KEY (a_string)) " + 
generateDDLOptions("DEFAULT_COLUMN_FAMILY = 'XYZ'");
        try {
             conn.createStatement().execute(ddl);
             conn.createStatement().execute(
@@ -1592,7 +1656,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
        conn.setAutoCommit(false);
         String ddl = "CREATE TABLE " + dataTableFullName +
                        "  (a_string varchar not null, col1 integer" +
-                       "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+                       "  CONSTRAINT pk PRIMARY KEY (a_string)) " + 
tableDDLOptions;
        try {
                conn.createStatement().execute(ddl);
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName 
+ " ADD col2 integer IN_MEMORY=true");
@@ -1620,7 +1684,8 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                             "CREATE TABLE "
                                     + dataTableFullName
                                                + "  (a_string varchar not 
null, col1 integer, cf1.col2 integer, col3 integer , cf2.col4 integer "
-                                               + "  CONSTRAINT pk PRIMARY KEY 
(a_string)) immutable_rows=true , SALT_BUCKETS=3 ");
+                                               + "  CONSTRAINT pk PRIMARY KEY 
(a_string)) " + generateDDLOptions("immutable_rows=true , SALT_BUCKETS=3 "
+                                               + (!columnEncoded ? 
",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ""))); 
 
             String ddl = "Alter table " + dataTableFullName + " add cf3.col5 
integer, cf4.col6 integer in_memory=true";
                conn.createStatement().execute(ddl);
@@ -1658,7 +1723,8 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                             "CREATE TABLE "
                                     + dataTableFullName
                                                + "  (a_string varchar not 
null, col1 integer, cf1.col2 integer, col3 integer , cf2.col4 integer "
-                                               + "  CONSTRAINT pk PRIMARY KEY 
(a_string)) immutable_rows=true , SALT_BUCKETS=3 ");
+                                               + "  CONSTRAINT pk PRIMARY KEY 
(a_string)) " + generateDDLOptions("immutable_rows=true , SALT_BUCKETS=3 "
+                                               + (!columnEncoded ? 
",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : "")));    
 
             String ddl = "Alter table " + dataTableFullName + " add cf1.col5 
integer in_memory=true";
                conn.createStatement().execute(ddl);
@@ -1688,7 +1754,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
 
        try {
             String ddl = "CREATE TABLE " + dataTableFullName
-                    + " (pk char(2) not null primary key, col1 integer, b.col1 
integer) SPLIT ON ('EA','EZ')";
+                    + " (pk char(2) not null primary key, col1 integer, b.col1 
integer) " + tableDDLOptions + " SPLIT ON ('EA','EZ') ";
                conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " add b.col2 varchar 
ttl=30";
                conn.createStatement().execute(ddl);
@@ -1708,7 +1774,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
 
        try {
             String ddl = "CREATE TABLE " + dataTableFullName
-                    + " (pk char(2) not null primary key) TTL=100 SPLIT ON 
('EA','EZ')";
+                    + " (pk char(2) not null primary key) " + 
generateDDLOptions("TTL=100") + " SPLIT ON ('EA','EZ')";
                conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " add col1 varchar 
ttl=30";
                conn.createStatement().execute(ddl);
@@ -1738,7 +1804,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                            + " col1 integer NOT NULL,"
                            + " col2 bigint NOT NULL,"
                            + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)"
-                           + " ) TTL=86400, SALT_BUCKETS = 4, 
DEFAULT_COLUMN_FAMILY='XYZ'";
+                           + " ) " + generateDDLOptions("TTL=86400, 
SALT_BUCKETS = 4, DEFAULT_COLUMN_FAMILY='XYZ'");
             conn.createStatement().execute(ddl);
             try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
                 HTableDescriptor tableDesc = 
admin.getTableDescriptor(Bytes.toBytes(dataTableFullName));
@@ -1773,7 +1839,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                            + " col1 integer NOT NULL,"
                            + " col2 bigint NOT NULL,"
                            + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)"
-                           + " ) TTL=86400, SALT_BUCKETS = 4, 
DEFAULT_COLUMN_FAMILY='XYZ'";
+                           + " ) " + generateDDLOptions("TTL=86400, 
SALT_BUCKETS = 4, DEFAULT_COLUMN_FAMILY='XYZ'");
             conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " SET IN_MEMORY=true";
                conn.createStatement().execute(ddl);
@@ -1801,7 +1867,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                            + " col1 integer NOT NULL,"
                            + " col2 bigint NOT NULL,"
                            + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)"
-                           + " ) TTL=86400, SALT_BUCKETS = 4";
+                           + " ) " + generateDDLOptions("TTL=86400, 
SALT_BUCKETS = 4");
             conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " SET IN_MEMORY=true";
                conn.createStatement().execute(ddl);
@@ -1829,7 +1895,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                            + " col1 integer NOT NULL,"
                            + " col2 bigint NOT NULL,"
                            + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)"
-                           + " ) TTL=86400, SALT_BUCKETS = 4, 
DEFAULT_COLUMN_FAMILY='XYZ'";
+                           + " ) " + generateDDLOptions("TTL=86400, 
SALT_BUCKETS = 4, DEFAULT_COLUMN_FAMILY='XYZ'");
             conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " ADD COL3 INTEGER 
IN_MEMORY=true";
                conn.createStatement().execute(ddl);
@@ -1857,7 +1923,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                            + " col1 integer NOT NULL,"
                            + " col2 bigint NOT NULL,"
                            + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)"
-                           + " ) TTL=86400, SALT_BUCKETS = 4, 
DEFAULT_COLUMN_FAMILY='XYZ'";
+                           + " ) " + generateDDLOptions("TTL=86400, 
SALT_BUCKETS = 4, DEFAULT_COLUMN_FAMILY='XYZ'");
             conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " ADD NEWCF.COL3 
INTEGER IN_MEMORY=true";
                conn.createStatement().execute(ddl);
@@ -1887,7 +1953,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                            + " col1 integer NOT NULL,"
                            + " col2 bigint NOT NULL,"
                            + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)"
-                           + " ) TTL=86400, SALT_BUCKETS = 4, 
DEFAULT_COLUMN_FAMILY='XYZ'";
+                           + " ) " + generateDDLOptions("TTL=86400, 
SALT_BUCKETS = 4, DEFAULT_COLUMN_FAMILY='XYZ'");
             conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " ADD NEWCF.COL3 
INTEGER IN_MEMORY=true";
                conn.createStatement().execute(ddl);
@@ -1951,7 +2017,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                                + " col1 integer NOT NULL,"
                                + " col2 bigint NOT NULL,"
                                + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, 
col2)"
-                               + " )";
+                               + " ) " +  tableDDLOptions;
                conn.createStatement().execute(ddl);
             ddl = "ALTER TABLE " + dataTableFullName + " ADD NEWCF.COL3 
INTEGER NEWCF.UNKNOWN_PROP='ABC'";
                try {
@@ -1977,7 +2043,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
         Connection conn = DriverManager.getConnection(getUrl(), props);
 
         Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE " + dataTableFullName + " (id SMALLINT 
PRIMARY KEY, name VARCHAR)");
+        stmt.execute("CREATE TABLE " + dataTableFullName + " (id SMALLINT 
PRIMARY KEY, name VARCHAR) "+tableDDLOptions);
 
         ResultSet rs = stmt.executeQuery("SELECT STORE_NULLS FROM 
\"SYSTEM\".\"CATALOG\" " +
  "WHERE table_name = '"
@@ -2009,7 +2075,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                     + " k2 integer NOT NULL,"
                     + " col1 bigint,"
                     + " CONSTRAINT NAME_PK PRIMARY KEY (k1, k2)"
-                    + " )";
+                    + " ) "+tableDDLOptions;
             conn.createStatement().execute(ddl);
 
             // set HTableProperty when adding a pk column should fail
@@ -2083,7 +2149,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                     + " col1 integer NOT NULL,"
                     + " col2 bigint NOT NULL,"
                     + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)"
-                    + " )";
+                    + " ) "+tableDDLOptions;
             conn.createStatement().execute(ddl);
             asssertIsWALDisabled(conn, dataTableFullName, false);
             
@@ -2120,12 +2186,12 @@ public class AlterTableIT extends 
ParallelStatsDisabledIT {
     @Test
     public void testDeclaringColumnAsRowTimestamp() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + dataTableFullName 
+ " (PK1 DATE NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY 
KEY(PK1 ROW_TIMESTAMP, PK2)) ");
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName 
+ " (PK1 DATE NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY 
KEY(PK1 ROW_TIMESTAMP, PK2)) " + tableDDLOptions);
             PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); 
             PTable table = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), dataTableFullName));
             // Assert that the column shows up as row time stamp in the cache.
-            assertTrue(table.getColumn("PK1").isRowTimestamp());
-            assertFalse(table.getColumn("PK2").isRowTimestamp());
+            assertTrue(table.getColumnForColumnName("PK1").isRowTimestamp());
+            assertFalse(table.getColumnForColumnName("PK2").isRowTimestamp());
             assertIsRowTimestampSet(schemaName, dataTableName, "PK1");
             
             String dataTableName2 = BaseTest.generateUniqueName();
@@ -2133,18 +2199,17 @@ public class AlterTableIT extends 
ParallelStatsDisabledIT {
             conn.createStatement().execute("CREATE TABLE " + 
dataTableFullName2 + " (PK1 VARCHAR, PK2 DATE PRIMARY KEY ROW_TIMESTAMP, KV1 
VARCHAR, KV2 INTEGER)");
             table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), 
dataTableFullName2));
             // Assert that the column shows up as row time stamp in the cache.
-            assertFalse(table.getColumn("PK1").isRowTimestamp());
-            assertTrue(table.getColumn("PK2").isRowTimestamp());
+            assertFalse(table.getColumnForColumnName("PK1").isRowTimestamp());
+            assertTrue(table.getColumnForColumnName("PK2").isRowTimestamp());
             assertIsRowTimestampSet(schemaName, dataTableName2, "PK2");
             
             // Create an index on a table has a row time stamp pk column. The 
column should show up as a row time stamp column for the index too. 
             conn.createStatement().execute("CREATE INDEX " + indexTableName + 
"  ON " + dataTableFullName2 + " (KV1) include (KV2)");
             PTable indexTable = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), indexTableFullName));
-            String indexColName = 
IndexUtil.getIndexColumnName(table.getColumn("PK2"));
+            String indexColName = 
IndexUtil.getIndexColumnName(table.getColumnForColumnName("PK2"));
             // Assert that the column shows up as row time stamp in the cache.
-            assertTrue(indexTable.getColumn(indexColName).isRowTimestamp());
+            
assertTrue(indexTable.getColumnForColumnName(indexColName).isRowTimestamp());
             assertIsRowTimestampSet(schemaName, indexTableName, indexColName);
-            
             String viewTableName2 = dataTableName2 + "_VIEW";
             String viewTableFullName2 = SchemaUtil.getTableName(schemaName, 
viewTableName2);
             // Creating a view with a row_timestamp column in its pk 
constraint is not allowed
@@ -2179,7 +2244,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
     @Test
     public void testAddingRowTimestampColumnNotAllowedViaAlterTable() throws 
Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + dataTableFullName 
+ " (PK1 VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK 
PRIMARY KEY(PK1, PK2)) ");
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName 
+ " (PK1 VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK 
PRIMARY KEY(PK1, PK2)) " + tableDDLOptions);
             // adding a new pk column that is also row_timestamp is not allowed
             try {
                 conn.createStatement().execute("ALTER TABLE " + 
dataTableFullName + " ADD PK3 DATE PRIMARY KEY ROW_TIMESTAMP");
@@ -2197,7 +2262,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
                try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
                        // creating a transactional table should fail if 
transactions are disabled
                        try {
-                               conn.createStatement().execute("CREATE TABLE " 
+ dataTableFullName + "(k INTEGER PRIMARY KEY, v VARCHAR) TRANSACTIONAL=true");
+                               conn.createStatement().execute("CREATE TABLE " 
+ dataTableFullName + "(k INTEGER PRIMARY KEY, v VARCHAR) " + 
generateDDLOptions("TRANSACTIONAL=true"));
                                fail();
                        } catch (SQLException e) {
                                
assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_IF_TXNS_DISABLED.getErrorCode(),
 e.getErrorCode());
@@ -2219,7 +2284,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT 
{
             String tableName = generateUniqueName();
             conn.createStatement().execute(
                     "CREATE TABLE " + tableName
-                    + " (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK 
PRIMARY KEY(K1,K2)) ");
+                    + " (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK 
PRIMARY KEY(K1,K2)) " + tableDDLOptions);
             try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
                 admin.disableTable(Bytes.toBytes(tableName));
             }
@@ -2235,5 +2300,299 @@ public class AlterTableIT extends 
ParallelStatsDisabledIT {
         }
     }
 
+       @Test
+       public void testMetadataForImmutableTable() throws Exception {
+           String schemaName = "XYZ";
+           String baseTableName = generateUniqueName();
+           String viewName = generateUniqueName();
+           String fullTableName = schemaName + "." + baseTableName;
+           String fullViewName = schemaName + "." + viewName;
+           try (Connection conn = DriverManager.getConnection(getUrl())) {
+               PhoenixConnection phxConn = 
conn.unwrap(PhoenixConnection.class);
+               conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + 
fullTableName + " ("
+                       + " ID char(1) NOT NULL,"
+                       + " COL1 integer NOT NULL,"
+                       + " COL2 bigint NOT NULL,"
+                       + " KV1 VARCHAR"
+                       + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2)"
+                       + " ) " + generateDDLOptions("IMMUTABLE_ROWS = true"
+                       + (!columnEncoded ? ",IMMUTABLE_STORAGE_SCHEME="+ 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : "")));
+               PTable baseTable = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullTableName));
+               long initBaseTableSeqNumber = baseTable.getSequenceNumber(); 
+
+               // assert that the client side cache is updated.
+               EncodedCQCounter cqCounter = baseTable.getEncodedCQCounter();
+               assertEquals( columnEncoded ? 
(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 1) : null, 
cqCounter.getNextQualifier(QueryConstants.DEFAULT_COLUMN_FAMILY));
+               
+               // assert that the server side metadata is updated correctly.
+               assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
+               assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "KV1", schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE);
+               assertSequenceNumber(schemaName, baseTableName, 
initBaseTableSeqNumber);
+
+               // now create a view and validate client and server side 
metadata
+               String viewDDL = "CREATE VIEW " + fullViewName + " ( VIEW_COL1 
INTEGER, A.VIEW_COL2 VARCHAR ) AS SELECT * FROM " + fullTableName;
+               conn.createStatement().execute(viewDDL);
+               baseTable = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullTableName));
+               PTable view = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullViewName));
+
+               // verify that the client side cache is updated. Base table's 
cq counters should be updated.
+               assertEquals( columnEncoded ? 
(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 2) : null, 
baseTable.getEncodedCQCounter().getNextQualifier(DEFAULT_COLUMN_FAMILY));
+               assertEquals( columnEncoded ? 
(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 1) : null, 
baseTable.getEncodedCQCounter().getNextQualifier("A"));
+               assertNull("A view should always have the null cq counter", 
view.getEncodedCQCounter().getNextQualifier(DEFAULT_COLUMN_FAMILY));
+               
+               // assert that the server side metadata for the base table and 
the view is also updated correctly.
+               assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 2);
+               assertEncodedCQCounter("A", schemaName, baseTableName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
+               assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL1", 
schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
+               assertEncodedCQValue("A", "VIEW_COL2", schemaName, viewName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE);
+               assertSequenceNumber(schemaName, baseTableName, 
initBaseTableSeqNumber + (columnEncoded ? 1 : 0));
+               assertSequenceNumber(schemaName, viewName, 
PTable.INITIAL_SEQ_NUM);
+           }
+       }
+       
+       @Test
+       public void testMetadataForMutableTable() throws Exception {
+           String schemaName = "XYZ";
+           String baseTableName = generateUniqueName();
+           String viewName = generateUniqueName();
+           String fullTableName = schemaName + "." + baseTableName;
+           String fullViewName = schemaName + "." + viewName;
+           try (Connection conn = DriverManager.getConnection(getUrl())) {
+               PhoenixConnection phxConn = 
conn.unwrap(PhoenixConnection.class);
+               conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + 
fullTableName + " ("
+                       + " ID char(1) NOT NULL,"
+                       + " COL1 integer NOT NULL,"
+                       + " COL2 bigint NOT NULL,"
+                       + " KV1 VARCHAR"
+                       + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2)"
+                       + " ) " + tableDDLOptions);
+               PTable baseTable = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullTableName));
+               long initBaseTableSeqNumber = baseTable.getSequenceNumber(); 
+
+               // assert that the client side cache is updated.
+               EncodedCQCounter cqCounter = baseTable.getEncodedCQCounter();
+               assertEquals( columnEncoded ? 
(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 1) : null, 
cqCounter.getNextQualifier(QueryConstants.DEFAULT_COLUMN_FAMILY));
+
+
+               // assert that the server side metadata is updated correctly.
+               assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
+               assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "KV1", schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE);
+               assertSequenceNumber(schemaName, baseTableName, 
initBaseTableSeqNumber);
+
+               // now create a view and validate client and server side 
metadata
+               String viewDDL = "CREATE VIEW " + fullViewName + " ( VIEW_COL1 
INTEGER, A.VIEW_COL2 VARCHAR ) AS SELECT * FROM " + fullTableName;
+               conn.createStatement().execute(viewDDL);
+               baseTable = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullTableName));
+               PTable view = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullViewName));
+
+               // verify that the client side cache is updated. Base table's 
cq counters should be updated.
+               assertEquals(columnEncoded ? 
(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 3) : null, 
baseTable.getEncodedCQCounter().getNextQualifier(DEFAULT_COLUMN_FAMILY));
+               assertNull("A view should always have the null cq counter", 
view.getEncodedCQCounter().getNextQualifier(DEFAULT_COLUMN_FAMILY));
+
+               // assert that the server side metadata for the base table and 
the view is also updated correctly.
+               assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 3);
+               assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL1", 
schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
+               assertEncodedCQValue("A", "VIEW_COL2", schemaName, viewName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 2);
+               assertSequenceNumber(schemaName, baseTableName, 
initBaseTableSeqNumber + (columnEncoded ? 1 : 0));
+               assertSequenceNumber(schemaName, viewName, 
PTable.INITIAL_SEQ_NUM);
+           }
+       }
+       
+       @Test
+    public void testAddingColumnsToTablesAndViews() throws Exception {
+        String schemaName = generateUniqueName();
+        String baseTableName = generateUniqueName();
+        String viewName = generateUniqueName();
+        String fullTableName = schemaName + "." + baseTableName;
+        String fullViewName = schemaName + "." + viewName;
+        Properties props = new Properties();
+        props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.toString(true));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE SCHEMA " + schemaName);
+            PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + " 
("
+                    + " ID char(1) NOT NULL,"
+                    + " COL1 integer NOT NULL,"
+                    + " COL2 bigint NOT NULL,"
+                    + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2)"
+                    + " ) " + tableDDLOptions);
+            PTable baseTable = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullTableName));
+            long initBaseTableSeqNumber = baseTable.getSequenceNumber(); 
+
+            // Add a column to the base table and see if the client and server 
metadata is updated correctly
+            String alterDDL = "ALTER TABLE " + fullTableName + " ADD COL3 
VARCHAR PRIMARY KEY, COL4 INTEGER, COL5 VARCHAR, B.COL6 DECIMAL (10, 2)";
+            conn.createStatement().execute(alterDDL);
+
+            // assert that the client side cache is updated.
+            baseTable = phxConn.getTable(new PTableKey(phxConn.getTenantId(), 
fullTableName));
+            EncodedCQCounter encodedCqCounter = 
baseTable.getEncodedCQCounter();
+            assertEquals( columnEncoded 
?(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 3) : null, 
encodedCqCounter.getNextQualifier(DEFAULT_COLUMN_FAMILY));
+            
+            // assert that the server side metadata is updated correctly.
+            assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 3);
+            
+            // assert that the server side metadata for columns is updated 
correctly.
+            assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL4", schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE);
+            assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL5", schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
+            assertEncodedCQValue("B", "COL6", schemaName, baseTableName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 2);
+            long baseTableSeqNumBeforeAddingChildCols = initBaseTableSeqNumber 
+ 1;
+            assertSequenceNumber(schemaName, baseTableName, 
baseTableSeqNumBeforeAddingChildCols);
+
+            // Create a view
+            String viewDDL = "CREATE VIEW " + fullViewName + " ( VIEW_COL1 
INTEGER, A.VIEW_COL2 VARCHAR ) AS SELECT * FROM " + fullTableName;
+            conn.createStatement().execute(viewDDL);
+            
+            // assert that the server side metadata is updated correctly.
+            assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 5);
+            
+            // assert that the server side metadata for columns is updated 
correctly.
+            assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL1", 
schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 3);
+            assertEncodedCQValue("A", "VIEW_COL2", schemaName, viewName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 4);
+            // for encoded columns creating a view that adds its own columns 
should increment the base table's sequence number too.
+            assertSequenceNumber(schemaName, baseTableName, columnEncoded ? 
initBaseTableSeqNumber + 2 : baseTableSeqNumBeforeAddingChildCols );
+
+            // Add column to the view
+            viewDDL = "ALTER VIEW " + fullViewName + " ADD VIEW_COL3 
DECIMAL(10, 2), A.VIEW_COL4 VARCHAR, B.VIEW_COL5 INTEGER";
+            conn.createStatement().execute(viewDDL);
+
+            // assert that the client cache for the base table is updated
+            baseTable = phxConn.getTable(new PTableKey(phxConn.getTenantId(), 
fullTableName));
+            encodedCqCounter = baseTable.getEncodedCQCounter();
+            assertEquals( columnEncoded ? 
(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 8) : null, 
encodedCqCounter.getNextQualifier(DEFAULT_COLUMN_FAMILY));
+            
+            // assert client cache for view
+            PTable view = phxConn.getTable(new 
PTableKey(phxConn.getTenantId(), fullViewName));
+            encodedCqCounter = view.getEncodedCQCounter();
+            assertNull("A view should always have the column qualifier counter 
as null", view.getEncodedCQCounter().getNextQualifier(DEFAULT_COLUMN_FAMILY));
+            
+            // assert that the server side metadata for the base table and the 
view is also updated correctly.
+            assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 8);
+            assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL1", 
schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 3);
+            assertEncodedCQValue("A", "VIEW_COL2", schemaName, viewName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 4);
+            assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL3", 
schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 5);
+            assertEncodedCQValue("A", "VIEW_COL4", schemaName, viewName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 6);
+            assertEncodedCQValue("B", "VIEW_COL5", schemaName, viewName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 7);
+            // adding a column to the should increment the base table's 
sequence number too since we update the cq counters for column families.
+            assertSequenceNumber(schemaName, baseTableName, columnEncoded ? 
initBaseTableSeqNumber + 3 : baseTableSeqNumBeforeAddingChildCols );
+            assertSequenceNumber(schemaName, viewName, PTable.INITIAL_SEQ_NUM 
+ 1);
+            
+            // Add column to the base table which doesn't already exist in the 
view.
+            alterDDL = "ALTER TABLE " + fullTableName + " ADD COL10 VARCHAR, 
A.COL11 INTEGER";
+            conn.createStatement().execute(alterDDL);
+            baseTable = phxConn.getTable(new PTableKey(phxConn.getTenantId(), 
fullTableName));
+            
+            // assert that the client cache for the base table is updated 
+            encodedCqCounter = baseTable.getEncodedCQCounter();
+            assertEquals( columnEncoded ? 
(Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 10) : null, 
encodedCqCounter.getNextQualifier(DEFAULT_COLUMN_FAMILY));
+            
+            // assert client cache for view
+            view = phxConn.getTable(new PTableKey(phxConn.getTenantId(), 
fullViewName));
+            encodedCqCounter = view.getEncodedCQCounter();
+            assertNull("A view should always have the column qualifier counter 
as null", view.getEncodedCQCounter().getNextQualifier(DEFAULT_COLUMN_FAMILY));
+            
+            // assert that the server side metadata for the base table and the 
view is also updated correctly.
+            assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, 
baseTableName, (ENCODED_CQ_COUNTER_INITIAL_VALUE + 10));
+            assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL10", schemaName, 
viewName, (ENCODED_CQ_COUNTER_INITIAL_VALUE + 8));
+            assertEncodedCQValue("A", "COL11", schemaName, viewName, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 9);
+            assertSequenceNumber(schemaName, baseTableName, columnEncoded ? 
initBaseTableSeqNumber + 4 : initBaseTableSeqNumber + 2 );
+            assertSequenceNumber(schemaName, viewName, PTable.INITIAL_SEQ_NUM 
+ 2);
+        }
+    }
+       
+       private void assertEncodedCQValue(String columnFamily, String 
columnName, String schemaName, String tableName, int expectedValue) throws 
Exception {
+        String query = "SELECT " + COLUMN_QUALIFIER + " FROM SYSTEM.CATALOG 
WHERE " + TABLE_SCHEM + " = ? AND " + TABLE_NAME
+                + " = ? " + " AND " + COLUMN_FAMILY + " = ?" + " AND " + 
COLUMN_NAME  + " = ?" + " AND " + COLUMN_QUALIFIER  + " IS NOT NULL";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(query);
+            stmt.setString(1, schemaName);
+            stmt.setString(2, tableName);
+            stmt.setString(3, columnFamily);
+            stmt.setString(4, columnName);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            if (columnEncoded) {
+                
assertTrue(Bytes.equals(QualifierEncodingScheme.TWO_BYTE_QUALIFIERS.encode(expectedValue),
 rs.getBytes(1)));
+            } else {
+                assertTrue(Bytes.equals(columnName.getBytes(), 
rs.getBytes(1)));
+            }
+            assertFalse(rs.next());
+        }
+    }
+    
+    private void assertEncodedCQCounter(String columnFamily, String 
schemaName, String tableName, int expectedValue) throws Exception {
+        String query = "SELECT " + COLUMN_QUALIFIER_COUNTER + " FROM 
SYSTEM.CATALOG WHERE " + TABLE_SCHEM + " = ? AND " + TABLE_NAME
+                + " = ? " + " AND " + COLUMN_FAMILY + " = ? AND " + 
COLUMN_QUALIFIER_COUNTER + " IS NOT NULL";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(query);
+            stmt.setString(1, schemaName);
+            stmt.setString(2, tableName);
+            stmt.setString(3, columnFamily);
+            ResultSet rs = stmt.executeQuery();
+            if (columnEncoded) {
+                assertTrue(rs.next());
+                assertEquals(expectedValue, rs.getInt(1));
+                assertFalse(rs.next());
+            } else {
+                assertFalse(rs.next());
+            }
+        }
+    }
+    
+    private void assertSequenceNumber(String schemaName, String tableName, 
long expectedSequenceNumber) throws Exception {
+        String query = "SELECT " + TABLE_SEQ_NUM + " FROM SYSTEM.CATALOG WHERE 
" + TABLE_SCHEM + " = ? AND " + TABLE_NAME
+                + " = ? AND " +  TABLE_SEQ_NUM + " IS NOT NULL AND " + 
COLUMN_NAME + " IS NULL AND "
+                + COLUMN_FAMILY + " IS NULL ";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(query);
+            stmt.setString(1, schemaName);
+            stmt.setString(2, tableName);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(expectedSequenceNumber, rs.getInt(1));
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void 
testAlterImmutableRowsPropertyForOneCellPerKeyValueColumnStorageScheme() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String ddl = "CREATE TABLE " + dataTableFullName + " (\n"
+                +"ID VARCHAR(15) NOT NULL,\n"
+                +"CREATED_DATE DATE,\n"
+                +"CREATION_TIME BIGINT,\n"
+                +"CONSTRAINT PK PRIMARY KEY (ID)) " + tableDDLOptions;
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute(ddl);
+        assertImmutableRows(conn, dataTableFullName, false);
+        ddl = "ALTER TABLE " + dataTableFullName + " SET IMMUTABLE_ROWS = 
true";
+        conn.createStatement().execute(ddl);
+        assertImmutableRows(conn, dataTableFullName, true);
+    }
+    
+    @Test
+    public void 
testAlterImmutableRowsPropertyForOneCellPerColumnFamilyStorageScheme() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String ddl = "CREATE TABLE " + dataTableFullName + " (\n"
+                +"ID VARCHAR(15) NOT NULL,\n"
+                +"CREATED_DATE DATE,\n"
+                +"CREATION_TIME BIGINT,\n"
+                +"CONSTRAINT PK PRIMARY KEY (ID)) " + 
generateDDLOptions("COLUMN_ENCODED_BYTES=4, IMMUTABLE_ROWS=true"
+                + (!columnEncoded ? ",IMMUTABLE_STORAGE_SCHEME=" + 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ""));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute(ddl);
+        assertImmutableRows(conn, dataTableFullName, true);
+        try {
+               ddl = "ALTER TABLE " + dataTableFullName + " SET IMMUTABLE_ROWS 
= false";
+               conn.createStatement().execute(ddl);
+               if (columnEncoded) {
+                   fail();
+               }
+        }
+        catch(SQLException e) {
+               
assertEquals(SQLExceptionCode.CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY.getErrorCode(),
 e.getErrorCode());
+        }
+        assertImmutableRows(conn, dataTableFullName, columnEncoded);
+    }
+    
 }
  

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index 211145e..6b57148 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -54,22 +54,41 @@ import org.junit.runners.Parameterized.Parameters;
 public class AlterTableWithViewsIT extends ParallelStatsDisabledIT {
     
     private final boolean isMultiTenant;
+    private final boolean columnEncoded;
     
     private final String TENANT_SPECIFIC_URL1 = getUrl() + ';' + 
TENANT_ID_ATTRIB + "=tenant1";
     private final String TENANT_SPECIFIC_URL2 = getUrl() + ';' + 
TENANT_ID_ATTRIB + "=tenant2";
     
-    public AlterTableWithViewsIT(boolean isMultiTenant) {
+    public AlterTableWithViewsIT(boolean isMultiTenant, boolean columnEncoded) 
{
         this.isMultiTenant = isMultiTenant;
+        this.columnEncoded = columnEncoded;
     }
     
-    @Parameters(name="AlterTableWithViewsIT_multiTenant={0}") // name is used 
by failsafe as file name in reports
-    public static Collection<Boolean> data() {
-        return Arrays.asList(false, true);
+    @Parameters(name="AlterTableWithViewsIT_multiTenant={0}, 
columnEncoded={1}") // name is used by failsafe as file name in reports
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] { 
+                { false, false }, { false, true },
+                { true, false }, { true, true } });
     }
-       
+    
     private String generateDDL(String format) {
+        return generateDDL("", format);
+    }
+    
+    private String generateDDL(String options, String format) {
+        StringBuilder optionsBuilder = new StringBuilder(options);
+        if (!columnEncoded) {
+            if (optionsBuilder.length()!=0)
+                optionsBuilder.append(",");
+            optionsBuilder.append("COLUMN_ENCODED_BYTES=0");
+        }
+        if (isMultiTenant) {
+            if (optionsBuilder.length()!=0)
+                optionsBuilder.append(",");
+            optionsBuilder.append("MULTI_TENANT=true");
+        }
         return String.format(format, isMultiTenant ? "TENANT_ID VARCHAR NOT 
NULL, " : "",
-            isMultiTenant ? "TENANT_ID, " : "", isMultiTenant ? 
"MULTI_TENANT=true" : "");
+            isMultiTenant ? "TENANT_ID, " : "", optionsBuilder.toString());
     }
     
     @Test
@@ -92,7 +111,7 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
             
             // adding a new pk column and a new regular column
             conn.createStatement().execute("ALTER TABLE " + tableName + " ADD 
COL3 varchar(10) PRIMARY KEY, COL4 integer");
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 1, 
5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2", "COL3", 
"COL4");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", 
"COL1", "COL2", "COL3", "COL4");
             assertTableDefinition(conn, viewOfTable, PTableType.VIEW, 
tableName, 1, 7, 5, "ID", "COL1", "COL2", "COL3", "COL4", "VIEW_COL1", 
"VIEW_COL2");
         } 
     }
@@ -109,28 +128,27 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
                             + " COL1 integer NOT NULL,"
                             + " COL2 bigint NOT NULL,"
                             + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, 
COL2)"
-                            + " ) UPDATE_CACHE_FREQUENCY=15 "
-                            + (isMultiTenant ? ",%s" : "%s");
-            conn.createStatement().execute(generateDDL(ddlFormat));
+                            + " ) %s ";
+            
conn.createStatement().execute(generateDDL("UPDATE_CACHE_FREQUENCY=2", 
ddlFormat));
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable1 + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR ) AS SELECT * FROM " + 
tableName);
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable2 + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR ) AS SELECT * FROM " + 
tableName);
-            viewConn.createStatement().execute("ALTER VIEW " + viewOfTable2 + 
" SET UPDATE_CACHE_FREQUENCY = 5");
+            viewConn.createStatement().execute("ALTER VIEW " + viewOfTable2 + 
" SET UPDATE_CACHE_FREQUENCY = 1");
             
             PhoenixConnection phoenixConn = 
conn.unwrap(PhoenixConnection.class);
             PTable table = phoenixConn.getTable(new PTableKey(null, 
tableName));
             PName tenantId = isMultiTenant ? PNameFactory.newName("tenant1") : 
null;
             assertFalse(table.isImmutableRows());
-            assertEquals(15, table.getUpdateCacheFrequency());
+            assertEquals(2, table.getUpdateCacheFrequency());
             PTable viewTable1 = 
viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, 
viewOfTable1));
             assertFalse(viewTable1.isImmutableRows());
-            assertEquals(15, viewTable1.getUpdateCacheFrequency());
+            assertEquals(2, viewTable1.getUpdateCacheFrequency());
             // query the view to force the table cache to be updated
             viewConn.createStatement().execute("SELECT * FROM "+viewOfTable2);
             PTable viewTable2 = 
viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, 
viewOfTable2));
             assertFalse(viewTable2.isImmutableRows());
-            assertEquals(5, viewTable2.getUpdateCacheFrequency());
+            assertEquals(1, viewTable2.getUpdateCacheFrequency());
             
-            conn.createStatement().execute("ALTER TABLE " + tableName + " SET 
IMMUTABLE_ROWS=true, UPDATE_CACHE_FREQUENCY=10");
+            conn.createStatement().execute("ALTER TABLE " + tableName + " SET 
IMMUTABLE_ROWS=true, UPDATE_CACHE_FREQUENCY=3");
             // query the views to force the table cache to be updated
             viewConn.createStatement().execute("SELECT * FROM "+viewOfTable1);
             viewConn.createStatement().execute("SELECT * FROM "+viewOfTable2);
@@ -138,16 +156,16 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
             phoenixConn = conn.unwrap(PhoenixConnection.class);
             table = phoenixConn.getTable(new PTableKey(null, tableName));
             assertTrue(table.isImmutableRows());
-            assertEquals(10, table.getUpdateCacheFrequency());
+            assertEquals(3, table.getUpdateCacheFrequency());
             
             viewTable1 = viewConn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(tenantId, viewOfTable1));
             assertTrue(viewTable1.isImmutableRows());
-            assertEquals(10, viewTable1.getUpdateCacheFrequency());
+            assertEquals(3, viewTable1.getUpdateCacheFrequency());
             
             viewTable2 = viewConn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(tenantId, viewOfTable2));
             assertTrue(viewTable2.isImmutableRows());
             // update cache frequency is not propagated to the view since it 
was altered on the view
-            assertEquals(5, viewTable2.getUpdateCacheFrequency());
+            assertEquals(1, viewTable2.getUpdateCacheFrequency());
         } 
     }
     
@@ -174,7 +192,7 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
 
             // drop two columns from the base table
             conn.createStatement().execute("ALTER TABLE " + tableName + " DROP 
COLUMN COL3, COL5");
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 1, 
4,
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 4,
                 QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", 
"COL2", "COL4");
             assertTableDefinition(conn, viewOfTable, PTableType.VIEW, 
tableName, 1, 6, 4,
                 "ID", "COL1", "COL2", "COL4", "VIEW_COL1", "VIEW_COL2");
@@ -253,38 +271,49 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
                 assertEquals("Unexpected exception", 
CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
             }
             
-            // validate that there were no columns added to the table or view
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2");
+            // validate that there were no columns added to the table or view, 
if its table is column encoded the sequence number changes when we increment 
the cq counter
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 1 : 0, 3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", 
"COL1", "COL2");
             assertTableDefinition(conn, viewOfTable, PTableType.VIEW, 
tableName, 0, 9, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2", 
"VIEW_COL3", "VIEW_COL4", "VIEW_COL5", "VIEW_COL6");
             
-            // should succeed 
-            conn.createStatement().execute("ALTER TABLE " + tableName + " ADD 
VIEW_COL4 DECIMAL, VIEW_COL2 VARCHAR(256)");
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 1, 
5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2", 
"VIEW_COL4", "VIEW_COL2");
-            assertTableDefinition(conn, viewOfTable, PTableType.VIEW, 
tableName, 1, 9, 5, "ID", "COL1", "COL2", "VIEW_COL4", "VIEW_COL2", 
"VIEW_COL1", "VIEW_COL3", "VIEW_COL5", "VIEW_COL6");
-            
-            // query table
-            ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
-            assertTrue(rs.next());
-            assertEquals("view1", rs.getString("ID"));
-            assertEquals(12, rs.getInt("COL1"));
-            assertEquals(13, rs.getInt("COL2"));
-            assertEquals("view5", rs.getString("VIEW_COL2"));
-            assertEquals(17, rs.getInt("VIEW_COL4"));
-            assertFalse(rs.next());
-
-            // query view
-            rs = stmt.executeQuery("SELECT * FROM " + viewOfTable);
-            assertTrue(rs.next());
-            assertEquals("view1", rs.getString("ID"));
-            assertEquals(12, rs.getInt("COL1"));
-            assertEquals(13, rs.getInt("COL2"));
-            assertEquals(14, rs.getInt("VIEW_COL1"));
-            assertEquals("view5", rs.getString("VIEW_COL2"));
-            assertEquals("view6", rs.getString("VIEW_COL3"));
-            assertEquals(17, rs.getInt("VIEW_COL4"));
-            assertEquals(18, rs.getInt("VIEW_COL5"));
-            assertEquals("view9", rs.getString("VIEW_COL6"));
-            assertFalse(rs.next());
+            if (columnEncoded) {
+                try {
+                    // adding a key value column to the base table that 
already exists in the view is not allowed
+                    conn.createStatement().execute("ALTER TABLE " + tableName 
+ " ADD VIEW_COL4 DECIMAL, VIEW_COL2 VARCHAR(256)");
+                    fail();
+                } catch (SQLException e) {
+                    assertEquals("Unexpected exception", 
CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+                }
+            }
+            else {
+                // should succeed 
+                conn.createStatement().execute("ALTER TABLE " + tableName + " 
ADD VIEW_COL4 DECIMAL, VIEW_COL2 VARCHAR(256)");
+                assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", 
"COL1", "COL2", "VIEW_COL4", "VIEW_COL2");
+                assertTableDefinition(conn, viewOfTable, PTableType.VIEW, 
tableName, 1, 9, 5, "ID", "COL1", "COL2", "VIEW_COL4", "VIEW_COL2", 
"VIEW_COL1", "VIEW_COL3", "VIEW_COL5", "VIEW_COL6");
+            
+                // query table
+                ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+                assertTrue(rs.next());
+                assertEquals("view1", rs.getString("ID"));
+                assertEquals(12, rs.getInt("COL1"));
+                assertEquals(13, rs.getInt("COL2"));
+                assertEquals("view5", rs.getString("VIEW_COL2"));
+                assertEquals(17, rs.getInt("VIEW_COL4"));
+                assertFalse(rs.next());
+    
+                // query view
+                rs = stmt.executeQuery("SELECT * FROM " + viewOfTable);
+                assertTrue(rs.next());
+                assertEquals("view1", rs.getString("ID"));
+                assertEquals(12, rs.getInt("COL1"));
+                assertEquals(13, rs.getInt("COL2"));
+                assertEquals(14, rs.getInt("VIEW_COL1"));
+                assertEquals("view5", rs.getString("VIEW_COL2"));
+                assertEquals("view6", rs.getString("VIEW_COL3"));
+                assertEquals(17, rs.getInt("VIEW_COL4"));
+                assertEquals(18, rs.getInt("VIEW_COL5"));
+                assertEquals("view9", rs.getString("VIEW_COL6"));
+                assertFalse(rs.next());
+            }
         } 
     }
     
@@ -603,9 +632,9 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
     
     @Test
     public void testAlteringViewThatHasChildViews() throws Exception {
-        String baseTable = "testAlteringViewThatHasChildViews";
-        String childView = "childView";
-        String grandChildView = "grandChildView";
+        String baseTable = generateUniqueName();
+        String childView = baseTable + "cildView";
+        String grandChildView = baseTable + "grandChildView";
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Connection viewConn = isMultiTenant ? 
DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn ) {
             String ddlFormat = "CREATE TABLE IF NOT EXISTS " + baseTable + "  
("

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
index 4a21864..77af84c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
@@ -2777,4 +2777,32 @@ public class ArrayIT extends BaseClientManagedTimeIT {
         assertTrue(rs.next());
         assertEquals(conn.createArrayOf("CHAR", new String[]{"aaa", "bbb", 
"ccc"}), rs.getArray(1));
     }
+
+    @Test
+    public void testArrayIndexFunctionForImmutableTable() throws Exception {
+        String tableName = 
"testArrayIndexFunctionForImmutableTable".toUpperCase();
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 10));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "CREATE IMMUTABLE TABLE " + tableName + " 
(region_name VARCHAR PRIMARY KEY, ZIP VARCHAR ARRAY[10])";
+            conn.createStatement().execute(ddl);
+        }
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 20));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().executeUpdate("UPSERT INTO " + tableName + 
" (region_name,zip) VALUES('SF Bay Area',ARRAY['94115','94030','94125'])");
+            conn.commit();
+        }
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 30));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String sql = "SELECT ZIP[2] FROM " + tableName;
+            try (ResultSet rs = conn.createStatement().executeQuery(sql)) {
+                assertTrue(rs.next());
+                assertEquals("94030", rs.getString(1));
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
index f1c1808..152bdf0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
@@ -60,14 +60,14 @@ public abstract class BaseJoinIT extends 
ParallelStatsDisabledIT {
                 "    \"item_id\" varchar(10), " +
                 "    price integer, " +
                 "    quantity integer, " +
-                "    date timestamp)");
+                "    date timestamp) IMMUTABLE_ROWS=true");
         builder.put(JOIN_CUSTOMER_TABLE_FULL_NAME, "create table " + 
JOIN_CUSTOMER_TABLE_FULL_NAME +
                 "   (\"customer_id\" varchar(10) not null primary key, " +
                 "    name varchar, " +
                 "    phone varchar(12), " +
                 "    address varchar, " +
                 "    loc_id varchar(5), " +
-                "    date date)");
+                "    date date) IMMUTABLE_ROWS=true");
         builder.put(JOIN_ITEM_TABLE_FULL_NAME, "create table " + 
JOIN_ITEM_TABLE_FULL_NAME +
                 "   (\"item_id\" varchar(10) not null primary key, " +
                 "    name varchar, " +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
index 9ad12e5..e82daf9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -30,14 +29,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
@@ -45,6 +48,7 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 
 
@@ -61,6 +65,21 @@ public abstract class BaseQueryIT extends 
BaseClientManagedTimeIT {
     protected static final String tenantId = getOrganizationId();
     protected static final String ATABLE_INDEX_NAME = "ATABLE_IDX";
     protected static final long BATCH_SIZE = 3;
+    protected static final String[] INDEX_DDLS = new String[] {
+            "CREATE INDEX %s ON %s (a_integer DESC) INCLUDE ("
+                    + "    A_STRING, " + "    B_STRING, " + "    A_DATE)"};
+//    ,
+//            "CREATE INDEX %s ON %s (a_integer, a_string) INCLUDE ("
+//                    + "    B_STRING, " + "    A_DATE)",
+//            "CREATE INDEX %s ON %s (a_integer) INCLUDE ("
+//                    + "    A_STRING, " + "    B_STRING, " + "    A_DATE)",
+//            "CREATE LOCAL INDEX %s ON %s (a_integer DESC) INCLUDE ("
+//                    + "    A_STRING, " + "    B_STRING, " + "    A_DATE)",
+//            "CREATE LOCAL INDEX %s ON %s (a_integer, a_string) INCLUDE (" + 
"    B_STRING, "
+//                    + "    A_DATE)",
+//            "CREATE LOCAL INDEX %s ON %s (a_integer) INCLUDE ("
+//                    + "    A_STRING, " + "    B_STRING, " + "    A_DATE)", 
+//            "" };
 
     @BeforeClass
     @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
@@ -79,42 +98,69 @@ public abstract class BaseQueryIT extends 
BaseClientManagedTimeIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
+    private static Map<Pair<String, String>, Pair<String, String>> 
namesByParams = Maps.newHashMapWithExpectedSize(10);
+    
     protected long ts;
     protected Date date;
     private String indexDDL;
+    private String tableDDLOptions;
+    protected String tableName;
+    protected String indexName;
     
-    public BaseQueryIT(String indexDDL) {
-        this.indexDDL = indexDDL;
+    public BaseQueryIT(String idxDdl, boolean mutable, boolean columnEncoded) {
+        StringBuilder optionBuilder = new StringBuilder();
+        if (!columnEncoded) {
+            optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+        }
+        if (!mutable) {
+            if (optionBuilder.length()>0)
+                optionBuilder.append(",");
+            optionBuilder.append("IMMUTABLE_ROWS=true");
+            if (!columnEncoded) {
+                
optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+            }
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+        try {
+            this.ts = nextTimestamp();
+            Pair<String, String> runParam = new Pair<>(idxDdl, 
tableDDLOptions);
+            Pair<String, String> tableIndexNames = namesByParams.get(runParam);
+            if (tableIndexNames == null) {
+                this.tableName = initATableValues(null, tenantId, 
getDefaultSplits(tenantId), date=new Date(System.currentTimeMillis()), ts, 
getUrl(), tableDDLOptions);
+                this.indexName = generateUniqueName();
+                namesByParams.put(runParam, new Pair<>(tableName, indexName));
+                if (idxDdl.length() > 0) {
+                    this.indexDDL = String.format(idxDdl, indexName, 
tableName);
+                    Properties props = 
PropertiesUtil.deepCopy(TEST_PROPERTIES);
+                    props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+                    Connection conn = DriverManager.getConnection(getUrl(), 
props);
+                    conn.createStatement().execute(this.indexDDL);
+                }
+            } else {
+                this.tableName = tableIndexNames.getFirst();
+                this.indexName = tableIndexNames.getSecond();
+                initATableValues(this.tableName, tenantId, 
getDefaultSplits(tenantId), date=new Date(System.currentTimeMillis()), ts, 
getUrl(), tableDDLOptions);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
     
     @Before
-    public void initTable() throws Exception {
-         ts = nextTimestamp();
-        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
date=new Date(System.currentTimeMillis()), ts, getUrl());
-        if (indexDDL != null && indexDDL.length() > 0) {
-            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-            props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
-            Connection conn = DriverManager.getConnection(getUrl(), props);
-            conn.createStatement().execute(indexDDL);
-        }
+    public void init() throws Exception {
+        this.ts = nextTimestamp();
     }
     
-    @Parameters(name="{0}")
+    @Parameters(name="indexDDL={0},mutable={1},columnEncoded={2}")
     public static Collection<Object> data() {
         List<Object> testCases = Lists.newArrayList();
-        testCases.add(new String[] { "CREATE INDEX " + ATABLE_INDEX_NAME + " 
ON aTable (a_integer DESC) INCLUDE ("
-                + "    A_STRING, " + "    B_STRING, " + "    A_DATE)" });
-        testCases.add(new String[] { "CREATE INDEX " + ATABLE_INDEX_NAME + " 
ON aTable (a_integer, a_string) INCLUDE ("
-                + "    B_STRING, " + "    A_DATE)" });
-        testCases.add(new String[] { "CREATE INDEX " + ATABLE_INDEX_NAME + " 
ON aTable (a_integer) INCLUDE ("
-                + "    A_STRING, " + "    B_STRING, " + "    A_DATE)" });
-        testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME 
+ " ON aTable (a_integer DESC) INCLUDE ("
-                + "    A_STRING, " + "    B_STRING, " + "    A_DATE)" });
-        testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME 
+ " ON aTable (a_integer, a_string) INCLUDE ("
-                + "    B_STRING, " + "    A_DATE)" });
-        testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME 
+ " ON aTable (a_integer) INCLUDE ("
-                + "    A_STRING, " + "    B_STRING, " + "    A_DATE)" });
-        testCases.add(new String[] { "" });
+        for (String indexDDL : INDEX_DDLS) {
+            for (boolean mutable : new boolean[]{false}) {
+                for (boolean columnEncoded : new boolean[]{false}) {
+                    testCases.add(new Object[] { indexDDL, mutable, 
columnEncoded });
+                }
+            }
+        }
         return testCases;
     }
     
@@ -132,4 +178,5 @@ public abstract class BaseQueryIT extends 
BaseClientManagedTimeIT {
     protected static int nextRunCount() {
         return runCount.getAndAdd(1);
     }
+
 }

Reply via email to