http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index cc2b5b9..c8179e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -35,6 +35,8 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
@@ -42,9 +44,11 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
@@ -85,9 +89,14 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
 import static 
org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
+import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
+import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
+import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -187,8 +196,12 @@ import 
org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.QualifierOutOfRangeException;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.types.PDataType;
@@ -200,6 +213,7 @@ import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -215,6 +229,7 @@ import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -259,8 +274,10 @@ public class MetaDataClient {
                     IS_NAMESPACE_MAPPED + "," +
                     AUTO_PARTITION_SEQ +  "," +
                     APPEND_ONLY_SCHEMA + "," +
-                    GUIDE_POSTS_WIDTH + 
-                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                    GUIDE_POSTS_WIDTH + "," +
+                    IMMUTABLE_STORAGE_SCHEME + "," +
+                    ENCODING_SCHEME + 
+                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + 
SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -275,6 +292,7 @@ public class MetaDataClient {
                     TABLE_SEQ_NUM +","+ // this is actually set to the parent 
table's sequence number
                     TABLE_TYPE +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?)";
+    
     private static final String CREATE_VIEW_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
@@ -284,14 +302,24 @@ public class MetaDataClient {
                     LINK_TYPE + "," +
                     PARENT_TENANT_ID + " " + 
PVarchar.INSTANCE.getSqlTypeName() + // Dynamic column for now to prevent 
schema change
                     ") VALUES (?, ?, ?, ?, ?, ?)";
-    private static final String INCREMENT_SEQ_NUM =
+    
+    public static final String UPDATE_ENCODED_COLUMN_COUNTER = 
+            "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
+            TENANT_ID + ", " + 
+            TABLE_SCHEM + "," +
+            TABLE_NAME + "," +
+            COLUMN_FAMILY + "," +
+            COLUMN_QUALIFIER_COUNTER + 
+            ") VALUES (?, ?, ?, ?, ?)";
+
+    public static final String INCREMENT_SEQ_NUM =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
                     TABLE_SCHEM + "," +
                     TABLE_NAME + "," +
                     TABLE_SEQ_NUM  +
                     ") VALUES (?, ?, ?, ?)";
-    private static final String MUTATE_TABLE =
+    public static final String MUTATE_TABLE =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
                     TABLE_SCHEM + "," +
@@ -348,8 +376,10 @@ public class MetaDataClient {
                     PK_NAME + "," +  // write this both in the column and 
table rows for access by metadata APIs
                     KEY_SEQ + "," +
                     COLUMN_DEF + "," +
+                    COLUMN_QUALIFIER + ", " +
                     IS_ROW_TIMESTAMP +
-                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?)";
+                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?)";
+
     private static final String INSERT_COLUMN_ALTER_TABLE =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
@@ -369,8 +399,38 @@ public class MetaDataClient {
                     IS_VIEW_REFERENCED + "," +
                     PK_NAME + "," +  // write this both in the column and 
table rows for access by metadata APIs
                     KEY_SEQ + "," +
+                    COLUMN_DEF + "," +
+                    COLUMN_QUALIFIER +
+                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?)";
+
+    /*
+     * Custom sql to add a column to SYSTEM.CATALOG table during upgrade.
+     * We can't use the regular INSERT_COLUMN_ALTER_TABLE sql because the 
COLUMN_QUALIFIER column
+     * was added in 4.10. And so if upgrading from let's say 4.7, we won't be 
able to
+     * find the COLUMN_QUALIFIER column which the INSERT_COLUMN_ALTER_TABLE 
sql expects.
+     */
+    private static final String ALTER_SYSCATALOG_TABLE_UPGRADE =
+            "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\"( " +
+                    TENANT_ID + "," +
+                    TABLE_SCHEM + "," +
+                    TABLE_NAME + "," +
+                    COLUMN_NAME + "," +
+                    COLUMN_FAMILY + "," +
+                    DATA_TYPE + "," +
+                    NULLABLE + "," +
+                    COLUMN_SIZE + "," +
+                    DECIMAL_DIGITS + "," +
+                    ORDINAL_POSITION + "," +
+                    SORT_ORDER + "," +
+                    DATA_TABLE_NAME + "," + // write this both in the column 
and table rows for access by metadata APIs
+                    ARRAY_SIZE + "," +
+                    VIEW_CONSTANT + "," +
+                    IS_VIEW_REFERENCED + "," +
+                    PK_NAME + "," +  // write this both in the column and 
table rows for access by metadata APIs
+                    KEY_SEQ + "," +
                     COLUMN_DEF +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?)";
+
     private static final String UPDATE_COLUMN_POSITION =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\" ( " +
                     TENANT_ID + "," +
@@ -715,22 +775,22 @@ public class MetaDataClient {
             // since view columns may be removed.
             IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(parentTable, connection);
             // Check that the columns required for the index pk are present in 
the view
-            Set<ColumnReference> indexColRefs = 
indexMaintainer.getIndexedColumns();
-            for (ColumnReference colRef : indexColRefs) {
+            Set<Pair<String, String>> indexedColInfos = 
indexMaintainer.getIndexedColumnInfo();
+            for (Pair<String, String> colInfo : indexedColInfos) {
                 try {
-                    byte[] cf= colRef.getFamily();
-                    byte[] cq= colRef.getQualifier();
-                    if (cf!=null) {
-                        view.getColumnFamily(cf).getColumn(cq);
-                    }
-                    else {
-                        view.getColumn( Bytes.toString(cq));
+                    String colFamily = colInfo.getFirst();
+                    String colName = colInfo.getSecond();
+                    if (colFamily == null) {
+                        view.getColumnForColumnName(colName);
+                    } else {
+                        
view.getColumnFamily(colFamily).getPColumnForColumnName(colName);
                     }
-                } catch (ColumnNotFoundException e) { // Ignore this index and 
continue with others
+                } catch (ColumnNotFoundException e) {
                     containsAllReqdCols = false;
                     break;
                 }
             }
+            
             // Ensure that constant columns (i.e. columns matched in the view 
WHERE clause)
             // all exist in the index on the parent table.
             for (PColumn col : view.getColumns()) {
@@ -740,17 +800,17 @@ public class MetaDataClient {
                         // but the WHERE clause for the view statement (which 
is added to the index below)
                         // would fail to compile.
                         String indexColumnName = 
IndexUtil.getIndexColumnName(col);
-                        index.getColumn(indexColumnName);
+                        index.getColumnForColumnName(indexColumnName);
                     } catch (ColumnNotFoundException e1) {
                         PColumn indexCol = null;
                         try {
                             String cf = col.getFamilyName()!=null ? 
col.getFamilyName().getString() : null;
-                            String cq = col.getName().getString();
-                            if (cf!=null) {
-                                indexCol = 
parentTable.getColumnFamily(cf).getColumn(cq);
+                            String colName = col.getName().getString();
+                            if (cf != null) {
+                                indexCol = 
parentTable.getColumnFamily(cf).getPColumnForColumnName(colName);
                             }
                             else {
-                                indexCol = parentTable.getColumn(cq);
+                                indexCol = 
parentTable.getColumnForColumnName(colName);
                             }
                         } catch (ColumnNotFoundException e2) { // Ignore this 
index and continue with others
                             containsAllReqdCols = false;
@@ -818,7 +878,14 @@ public class MetaDataClient {
             colUpsert.setString(18, column.getExpressionStr());
         }
         if (colUpsert.getParameterMetaData().getParameterCount() > 18) {
-            colUpsert.setBoolean(19, column.isRowTimestamp());
+            if (column.getColumnQualifierBytes() == null) {
+                colUpsert.setNull(19, Types.VARBINARY);
+            } else {
+                colUpsert.setBytes(19, column.getColumnQualifierBytes());
+            }
+        }
+        if (colUpsert.getParameterMetaData().getParameterCount() > 19) {
+            colUpsert.setBoolean(20, column.isRowTimestamp());
         }
         colUpsert.execute();
     }
@@ -837,7 +904,7 @@ public class MetaDataClient {
         argUpsert.execute();
     }
 
-    private PColumn newColumn(int position, ColumnDef def, 
PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean 
addingToPK) throws SQLException {
+    private PColumn newColumn(int position, ColumnDef def, 
PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean 
addingToPK, byte[] columnQualifierBytes) throws SQLException {
         try {
             ColumnName columnDefName = def.getColumnDefName();
             SortOrder sortOrder = def.getSortOrder();
@@ -885,15 +952,14 @@ public class MetaDataClient {
                 }
                 isNull = false;
             }
-
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), 
familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, 
sortOrder, def.getArraySize(), null, false, def.getExpression(), 
isRowTimestamp, false);
+                    def.getMaxLength(), def.getScale(), isNull, position, 
sortOrder, def.getArraySize(), null, false, def.getExpression(), 
isRowTimestamp, false, columnQualifierBytes);
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check 
in constructor
             throw new SQLException(e);
         }
     }
-
+    
     public MutationState createTable(CreateTableStatement statement, byte[][] 
splits, PTable parent, String viewStatement, ViewType viewType, byte[][] 
viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
         TableName tableName = statement.getTableName();
         Map<String,Object> tableProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size());
@@ -956,7 +1022,7 @@ public class MetaDataClient {
         }
         table = createTableInternal(statement, splits, parent, viewStatement, 
viewType, viewColumnConstants, isViewColumnReferenced, false, null, null, 
tableProps, commonFamilyProps);
 
-        if (table == null || table.getType() == PTableType.VIEW || 
table.isTransactional()) {
+        if (table == null || table.getType() == PTableType.VIEW /*|| 
table.isTransactional()*/) {
             return new MutationState(0,connection);
         }
         // Hack to get around the case when an SCN is specified on the 
connection.
@@ -1675,6 +1741,8 @@ public class MetaDataClient {
                     ? SchemaUtil.isNamespaceMappingEnabled(tableType, 
connection.getQueryServices().getProps())
                     : parent.isNamespaceMapped();
             boolean isLocalIndex = indexType == IndexType.LOCAL;
+            QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
+            ImmutableStorageScheme immutableStorageScheme = 
ONE_CELL_PER_COLUMN;
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, 
transactional);
                 storeNulls = parent.getStoreNulls();
@@ -1797,7 +1865,6 @@ public class MetaDataClient {
             }
             String autoPartitionSeq = (String) 
TableProperty.AUTO_PARTITION_SEQ.getValue(tableProps);
             Long guidePostsWidth = (Long) 
TableProperty.GUIDE_POSTS_WIDTH.getValue(tableProps);
-
             Boolean storeNullsProp = (Boolean) 
TableProperty.STORE_NULLS.getValue(tableProps);
             if (storeNullsProp == null) {
                 if (parent == null) {
@@ -1819,7 +1886,7 @@ public class MetaDataClient {
                 if (transactionalProp == null) {
                     transactional = 
connection.getQueryServices().getProps().getBoolean(
                                     
QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB,
-                                    
QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+                                    
QueryServicesOptions.DEFAULT_TABLE_ISTRANSACTIONAL);
                 } else {
                     transactional = transactionalProp;
                 }
@@ -1834,7 +1901,7 @@ public class MetaDataClient {
                 .build().buildException();
             }
             // can't create a transactional table if it has a row timestamp 
column
-            if (pkConstraint.getNumColumnsWithRowTimestamp()>0 && 
transactional) {
+            if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && 
transactional) {
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP)
                 .setSchemaName(schemaName).setTableName(tableName)
                 .build().buildException();
@@ -1969,7 +2036,7 @@ public class MetaDataClient {
                 columns = new LinkedHashMap<PColumn,PColumn>(colDefs.size());
                 pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size() + 
1); // in case salted
             }
-
+            
             // Don't add link for mapped view, as it just points back to 
itself and causes the drop to
             // fail because it looks like there's always a view associated 
with it.
             if (!physicalNames.isEmpty()) {
@@ -2013,7 +2080,105 @@ public class MetaDataClient {
             }
             int pkPositionOffset = pkColumns.size();
             int position = positionOffset;
+            EncodedCQCounter cqCounter = NULL_COUNTER;
+            PTable viewPhysicalTable = null;
+            if (tableType == PTableType.VIEW) {
+                /*
+                 * We can't control what column qualifiers are used in HTable 
mapped to Phoenix views. So we are not
+                 * able to encode column names.
+                 */  
+                if (viewType != MAPPED) {
+                    /*
+                     * For regular phoenix views, use the storage scheme of 
the physical table since they all share the
+                     * the same HTable. Views always use the base table's 
column qualifier counter for doling out
+                     * encoded column qualifier.
+                     */
+                    viewPhysicalTable = PhoenixRuntime.getTable(connection, 
physicalNames.get(0).getString());
+                    immutableStorageScheme = 
viewPhysicalTable.getImmutableStorageScheme();
+                    encodingScheme = viewPhysicalTable.getEncodingScheme();
+                                       if 
(EncodedColumnsUtil.usesEncodedColumnNames(viewPhysicalTable)) {
+                        cqCounter  = viewPhysicalTable.getEncodedCQCounter();
+                    }
+                }
+            }
+            // System tables have hard-coded column qualifiers. So we can't 
use column encoding for them.
+            else if 
(!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, 
tableName)))) {
+                /*
+                 * Indexes inherit the storage scheme of the parent data 
tables. Otherwise, we always attempt to 
+                 * create tables with encoded column names. 
+                 * 
+                 * Also of note is the case with shared indexes i.e. local 
indexes and view indexes. In these cases, 
+                 * column qualifiers for covered columns don't have to be 
unique because rows of the logical indexes are 
+                 * partitioned by the virtue of indexId present in the row 
key. As such, different shared indexes can use
+                 * potentially overlapping column qualifiers.
+                 * 
+                 * If the hbase table already exists, then possibly encoded or 
non-encoded column qualifiers were used. 
+                 * In this case we pursue ahead with non-encoded column 
qualifier scheme. If the phoenix metadata for this table already exists 
+                 * then we rely on the PTable, with appropriate storage 
scheme, returned in the MetadataMutationResult to be updated 
+                 * in the client cache. If the phoenix table metadata already 
doesn't exist then the non-encoded column qualifier scheme works
+                 * because we cannot control the column qualifiers that were 
used when populating the hbase table.
+                 */
+                
+                byte[] tableNameBytes = 
SchemaUtil.getTableNameAsBytes(schemaName, tableName);
+                boolean tableExists = true;
+                try {
+                    HTableDescriptor tableDescriptor = 
connection.getQueryServices().getTableDescriptor(tableNameBytes);
+                    if (tableDescriptor == null) { // for connectionless
+                        tableExists = false;
+                    }
+                } catch (org.apache.phoenix.schema.TableNotFoundException e) {
+                    tableExists = false;
+                }
+                if (tableExists) {
+                    encodingScheme = NON_ENCODED_QUALIFIERS;
+                    immutableStorageScheme = ONE_CELL_PER_COLUMN;
+                } else if (parent != null) {
+                    encodingScheme = parent.getEncodingScheme();
+                    immutableStorageScheme = 
parent.getImmutableStorageScheme();
+                } else {
+                       Byte encodingSchemeSerializedByte = (Byte) 
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
+                    if (encodingSchemeSerializedByte == null) {
+                       encodingSchemeSerializedByte = 
(byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB,
 QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES);
+                    } 
+                    encodingScheme =  
QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte);
+                    if (isImmutableRows) {
+                        immutableStorageScheme =
+                                (ImmutableStorageScheme) 
TableProperty.IMMUTABLE_STORAGE_SCHEME
+                                        .getValue(tableProps);
+                        if (immutableStorageScheme == null) {
+                            if (multiTenant) {
+                                immutableStorageScheme =
+                                        ImmutableStorageScheme
+                                                .valueOf(connection
+                                                        .getQueryServices()
+                                                        .getProps()
+                                                        .get(
+                                                            
QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
+                                                            
QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME));
+                            } else {
+                                immutableStorageScheme =
+                                        ImmutableStorageScheme
+                                                .valueOf(connection
+                                                        .getQueryServices()
+                                                        .getProps()
+                                                        .get(
+                                                            
QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
+                                                            
QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
+                            }
+                        }
+                        if (immutableStorageScheme != ONE_CELL_PER_COLUMN
+                                && encodingScheme == NON_ENCODED_QUALIFIERS) {
+                            throw new SQLExceptionInfo.Builder(
+                                    
SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
+                                    
.setSchemaName(schemaName).setTableName(tableName).build()
+                                    .buildException();
+                        }
+                    } 
+                }
+                cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new 
EncodedCQCounter() : NULL_COUNTER;
+            }
 
+            Map<String, Integer> changedCqCounters = new 
HashMap<>(colDefs.size());
             for (ColumnDef colDef : colDefs) {
                 rowTimeStampColumnAlreadyFound = 
checkAndValidateRowTimestampCol(colDef, pkConstraint, 
rowTimeStampColumnAlreadyFound, tableType);
                 if (colDef.isPK()) { // i.e. the column is declared as CREATE 
TABLE COLNAME DATATYPE PRIMARY KEY...
@@ -2032,7 +2197,33 @@ public class MetaDataClient {
                                 
.setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                     }
                 }
-                PColumn column = newColumn(position++, colDef, pkConstraint, 
defaultFamilyName, false);
+                ColumnName columnDefName = colDef.getColumnDefName();
+                String colDefFamily = columnDefName.getFamilyName();
+                boolean isPkColumn = isPkColumn(pkConstraint, colDef, 
columnDefName);
+                String cqCounterFamily = null;
+                if (!isPkColumn) {
+                    if (immutableStorageScheme == 
SINGLE_CELL_ARRAY_WITH_OFFSETS && encodingScheme != NON_ENCODED_QUALIFIERS) {
+                        // For this scheme we track column qualifier counters 
at the column family level.
+                        cqCounterFamily = colDefFamily != null ? colDefFamily 
: (defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY);
+                    } else {
+                        // For other schemes, column qualifier counters are 
tracked using the default column family.
+                        cqCounterFamily = defaultFamilyName != null ? 
defaultFamilyName : DEFAULT_COLUMN_FAMILY;
+                    }
+                }
+                Integer encodedCQ =  isPkColumn ? null : 
cqCounter.getNextQualifier(cqCounterFamily);
+                byte[] columnQualifierBytes = null;
+                try {
+                    columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), 
encodedCQ, encodingScheme, isPkColumn);
+                }
+                catch (QualifierOutOfRangeException e) {
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
+                    .setSchemaName(schemaName)
+                    .setTableName(tableName).build().buildException();
+                }
+                PColumn column = newColumn(position++, colDef, pkConstraint, 
defaultFamilyName, false, columnQualifierBytes);
+                if (cqCounter.increment(cqCounterFamily)) {
+                    changedCqCounters.put(cqCounterFamily, 
cqCounter.getNextQualifier(cqCounterFamily));
+                }
                 if (SchemaUtil.isPKColumn(column)) {
                     // TODO: remove this constraint?
                     if (pkColumnsIterator.hasNext() && 
!column.getName().getString().equals(pkColumnsIterator.next().getFirst().getColumnName()))
 {
@@ -2067,6 +2258,7 @@ public class MetaDataClient {
                         column.getFamilyName());
                 }
             }
+            
             // We need a PK definition for a TABLE or mapped VIEW
             if (!isPK && pkColumnsNames.isEmpty() && tableType != 
PTableType.VIEW && viewType != ViewType.MAPPED) {
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
@@ -2148,15 +2340,52 @@ public class MetaDataClient {
             if (SchemaUtil.isMetaTable(schemaName,tableName)) {
                 // TODO: what about stats for system catalog?
                 PName newSchemaName = PNameFactory.newName(schemaName);
+                // Column names and qualifiers and hardcoded for system tables.
                 PTable table = PTableImpl.makePTable(tenantId,newSchemaName, 
PNameFactory.newName(tableName), tableType,
                         null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, 
PTable.INITIAL_SEQ_NUM,
                         
PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, 
columns.values(), null, null,
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == 
null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, 
null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, 
null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, 
PTable.EncodedCQCounter.NULL_COUNTER);
                 connection.addTable(table, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
+            
+            // Update column qualifier counters
+            if (EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme)) {
+                // Store the encoded column counter for phoenix entities that 
have their own hbase
+                // tables i.e. base tables and indexes.
+                String schemaNameToUse = tableType == VIEW ? 
viewPhysicalTable.getSchemaName().getString() : schemaName;
+                String tableNameToUse = tableType == VIEW ? 
viewPhysicalTable.getTableName().getString() : tableName;
+                boolean sharedIndex = tableType == PTableType.INDEX && 
(indexType == IndexType.LOCAL || parent.getType() == PTableType.VIEW);
+                // For local indexes and indexes on views, pass on the the 
tenant id since all their meta-data rows have
+                // tenant ids in there.
+                String tenantIdToUse = connection.getTenantId() != null && 
sharedIndex ? connection.getTenantId().getString() : null;
+                // When a view adds its own columns, then we need to increase 
the sequence number of the base table
+                // too since we want clients to get the latest PTable of the 
base table.
+                for (Entry<String, Integer> entry : 
changedCqCounters.entrySet()) {
+                    try (PreparedStatement linkStatement = 
connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER)) {
+                        linkStatement.setString(1, tenantIdToUse);
+                        linkStatement.setString(2, schemaNameToUse);
+                        linkStatement.setString(3, tableNameToUse);
+                        linkStatement.setString(4, entry.getKey());
+                        linkStatement.setInt(5, entry.getValue());
+                        linkStatement.execute();
+                    }
+                }
+                if (tableType == VIEW && !changedCqCounters.isEmpty()) {
+                    PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM);
+                    incrementStatement.setString(1, null);
+                    incrementStatement.setString(2, 
viewPhysicalTable.getSchemaName().getString());
+                    incrementStatement.setString(3, 
viewPhysicalTable.getTableName().getString());
+                    incrementStatement.setLong(4, 
viewPhysicalTable.getSequenceNumber() + 1);
+                    incrementStatement.execute();
+                }
+                if 
(connection.getMutationState().toMutations(timestamp).hasNext()) {
+                    
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
+                    connection.rollback();
+                }
+            }
 
             short nextKeySeq = 0;
 
@@ -2182,14 +2411,14 @@ public class MetaDataClient {
                                 return true;
                             }
                         });
-                    }
-                    else if (isViewColumnReferenced != null) {
+                    } else if (isViewColumnReferenced != null) {
                         if (viewColumnConstants != null && columnPosition < 
viewColumnConstants.length) {
                             entry.setValue(column = new DelegateColumn(column) 
{
                                 @Override
                                 public byte[] getViewConstant() {
                                     return viewColumnConstants[columnPosition];
                                 }
+                                
                                 @Override
                                 public boolean isViewReferenced() {
                                     return 
isViewColumnReferenced.get(columnPosition);
@@ -2272,6 +2501,8 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setLong(25, guidePostsWidth);
             }
+            tableUpsert.setByte(26, 
immutableStorageScheme.getSerializedMetadataValue());
+            tableUpsert.setByte(27, 
encodingScheme.getSerializedMetadataValue());
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -2292,7 +2523,7 @@ public class MetaDataClient {
              * 3) parent table header row
              */
             Collections.reverse(tableMetaData);
-
+            
                        if (indexType != IndexType.LOCAL) {
                 splits = SchemaUtil.processSplits(splits, pkColumns, 
saltBucketNum, connection.getQueryServices().getProps().getBoolean(
                         QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, 
QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER));
@@ -2365,12 +2596,18 @@ public class MetaDataClient {
                     }
                 }
                 PName newSchemaName = PNameFactory.newName(schemaName);
+                /*
+                 * It doesn't hurt for the PTable of views to have the 
cqCounter. However, views always rely on the
+                 * parent table's counter to dole out encoded column 
qualifiers. So setting the counter as NULL_COUNTER
+                 * for extra safety.
+                 */
+                EncodedCQCounter cqCounterToBe = tableType == PTableType.VIEW 
? NULL_COUNTER : cqCounter;
                 PTable table =  PTableImpl.makePTable(
                         tenantId, newSchemaName, 
PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? 
timestamp : result.getMutationTime(),
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : 
PNameFactory.newName(pkName), saltBucketNum, columns.values(),
                         parent == null ? null : parent.getSchemaName(), parent 
== null ? null : parent.getTableName(), Collections.<PTable>emptyList(), 
isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : 
PNameFactory.newName(defaultFamilyName), viewStatement, 
Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        result.getViewIndexId(), indexType, 
rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, 
isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema);
+                        result.getViewIndexId(), indexType, 
rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, 
isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, 
immutableStorageScheme, encodingScheme, cqCounterToBe);
                 result = new MetaDataMutationResult(code, 
result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2380,6 +2617,10 @@ public class MetaDataClient {
         }
     }
 
+    private static boolean isPkColumn(PrimaryKeyConstraint pkConstraint, 
ColumnDef colDef, ColumnName columnDefName) {
+        return colDef.isPK() || (pkConstraint != null && 
pkConstraint.getColumnWithSortOrder(columnDefName) != null);
+    }
+    
     /**
      * A table can be a parent table to tenant-specific tables if all of the 
following conditions are true:
      * <p>
@@ -2550,7 +2791,7 @@ public class MetaDataClient {
                                 PTable viewIndexTable = new PTableImpl(null,
                                         
SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                         
SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                        
table.getColumnFamilies(),table.isNamespaceMapped());
+                                        
table.getColumnFamilies(),table.isNamespaceMapped(), 
table.getImmutableStorageScheme(), table.getEncodingScheme());
                                 tableRefs.add(new TableRef(null, 
viewIndexTable, ts, false));
                             }
                         }
@@ -2671,12 +2912,12 @@ public class MetaDataClient {
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency) 
throws SQLException {
-        return incrementTableSeqNum(table, expectedType, columnCountDelta, 
isTransactional, updateCacheFrequency, null, null, null, null, -1L, null);
+        return incrementTableSeqNum(table, expectedType, columnCountDelta, 
isTransactional, updateCacheFrequency, null, null, null, null, -1L, null, null);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta,
             Boolean isTransactional, Long updateCacheFrequency, Boolean 
isImmutableRows, Boolean disableWAL,
-            Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, 
Boolean appendOnlySchema)
+            Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, 
Boolean appendOnlySchema, ImmutableStorageScheme immutableStorageScheme)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -2720,6 +2961,10 @@ public class MetaDataClient {
         if (appendOnlySchema !=null) {
             mutateBooleanProperty(tenantId, schemaName, tableName, 
APPEND_ONLY_SCHEMA, appendOnlySchema);
         }
+        if (immutableStorageScheme !=null) {
+            mutateStringProperty(tenantId, schemaName, tableName, 
IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name());
+        }
+        
         return seqNum;
     }
 
@@ -2760,6 +3005,23 @@ public class MetaDataClient {
             tableBoolUpsert.execute();
         }
     }
+    
+    private void mutateStringProperty(String tenantId, String schemaName, 
String tableName,
+            String propertyName, String propertyValue) throws SQLException {
+        String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + 
".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+                TENANT_ID + "," +
+                TABLE_SCHEM + "," +
+                TABLE_NAME + "," +
+                propertyName +
+                ") VALUES (?, ?, ?, ?)";
+        try (PreparedStatement tableBoolUpsert = 
connection.prepareStatement(updatePropertySql)) {
+            tableBoolUpsert.setString(1, tenantId);
+            tableBoolUpsert.setString(2, schemaName);
+            tableBoolUpsert.setString(3, tableName);
+            tableBoolUpsert.setString(4, propertyValue);
+            tableBoolUpsert.execute();
+        }
+    }
 
     public MutationState addColumn(AddColumnStatement statement) throws 
SQLException {
         PTable table = FromCompiler.getResolver(statement, 
connection).getTables().get(0).getTable();
@@ -2777,7 +3039,6 @@ public class MetaDataClient {
             PName tenantId = connection.getTenantId();
             String schemaName = table.getSchemaName().getString();
             String tableName = table.getTableName().getString();
-
             Boolean isImmutableRowsProp = null;
             Boolean multiTenantProp = null;
             Boolean disableWALProp = null;
@@ -2786,6 +3047,7 @@ public class MetaDataClient {
             Long updateCacheFrequencyProp = null;
             Boolean appendOnlySchemaProp = null;
             Long guidePostWidth = -1L;
+            ImmutableStorageScheme immutableStorageSchemeProp = null;
 
             Map<String, List<Pair<String, Object>>> properties = new 
HashMap<>(stmtProperties.size());
             List<ColumnDef> columnDefs = null;
@@ -2798,7 +3060,7 @@ public class MetaDataClient {
                     if (familyName!=null) {
                         try {
                             PColumnFamily columnFamily = 
table.getColumnFamily(familyName);
-                            columnFamily.getColumn(columnName);
+                            columnFamily.getPColumnForColumnName(columnName);
                             if (!ifNotExists) {
                                 throw new 
ColumnAlreadyExistsException(schemaName, tableName, columnName);
                             }
@@ -2809,7 +3071,7 @@ public class MetaDataClient {
                     }
                     else {
                         try {
-                            table.getColumn(columnName);
+                            table.getColumnForColumnName(columnName);
                             if (!ifNotExists) {
                                 throw new 
ColumnAlreadyExistsException(schemaName, tableName, columnName);
                             }
@@ -2848,6 +3110,8 @@ public class MetaDataClient {
                             guidePostWidth = (Long)value;
                         } else if (propName.equals(APPEND_ONLY_SCHEMA)) {
                             appendOnlySchemaProp = (Boolean) value;
+                        } else if 
(propName.equalsIgnoreCase(IMMUTABLE_STORAGE_SCHEME)) {
+                            immutableStorageSchemeProp = 
(ImmutableStorageScheme)value;
                         }
                     }
                     // if removeTableProps is true only add the property if it 
is not a HTable or Phoenix Table property
@@ -2864,7 +3128,8 @@ public class MetaDataClient {
                 ColumnResolver resolver = 
FromCompiler.getResolver(namedTableNode, connection);
                 table = resolver.getTables().get(0).getTable();
                 int nIndexes = table.getIndexes().size();
-                int nNewColumns = columnDefs.size();
+                int numCols = columnDefs.size();
+                int nNewColumns = numCols;
                 List<Mutation> tableMetaData = 
Lists.newArrayListWithExpectedSize((1 + nNewColumns) * (nIndexes + 1));
                 List<Mutation> columnMetaData = 
Lists.newArrayListWithExpectedSize(nNewColumns * (nIndexes + 1));
                 if (logger.isDebugEnabled()) {
@@ -2889,6 +3154,10 @@ public class MetaDataClient {
                 Boolean isImmutableRows = null;
                 if (isImmutableRowsProp != null) {
                     if (isImmutableRowsProp.booleanValue() != 
table.isImmutableRows()) {
+                       if (table.getImmutableStorageScheme() != 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                               throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY)
+                               
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                       }
                         isImmutableRows = isImmutableRowsProp;
                         changingPhoenixTableProperty = true;
                     }
@@ -2921,6 +3190,18 @@ public class MetaDataClient {
                         changingPhoenixTableProperty = true;
                     }
                 }
+                ImmutableStorageScheme immutableStorageScheme = null;
+                if (immutableStorageSchemeProp!=null) {
+                    if (table.getImmutableStorageScheme() == 
ONE_CELL_PER_COLUMN || 
+                            immutableStorageSchemeProp == ONE_CELL_PER_COLUMN) 
{
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE)
+                        
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                    }
+                    else if (immutableStorageSchemeProp != 
table.getImmutableStorageScheme()) {
+                        immutableStorageScheme = immutableStorageSchemeProp;
+                        changingPhoenixTableProperty = true;
+                    }
+                }
             
                 if (guidePostWidth == null || guidePostWidth >= 0) {
                     changingPhoenixTableProperty = true;
@@ -2965,12 +3246,19 @@ public class MetaDataClient {
                 Long timeStamp = TransactionUtil.getTableTimestamp(connection, 
table.isTransactional() || nonTxToTx);
 
                 int numPkColumnsAdded = 0;
-                List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(columnDefs.size());
+                List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(numCols);
                 Set<String> colFamiliesForPColumnsToBeAdded = new 
LinkedHashSet<>();
                 Set<String> families = new LinkedHashSet<>();
-                if (columnDefs.size() > 0 ) {
+                PTable tableForCQCounters = tableType == PTableType.VIEW ? 
PhoenixRuntime.getTable(connection, table.getPhysicalName().getString()) : 
table;;
+                EncodedCQCounter cqCounterToUse = 
tableForCQCounters.getEncodedCQCounter();
+                Map<String, Integer> changedCqCounters = new 
HashMap<>(numCols);
+                if (numCols > 0 ) {
                     StatementContext context = new StatementContext(new 
PhoenixStatement(connection), resolver);
-                    try (PreparedStatement colUpsert = 
connection.prepareStatement(INSERT_COLUMN_ALTER_TABLE)) {
+                    String addColumnSqlToUse = connection.isRunningUpgrade()
+                            && 
tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE)
+                            && 
schemaName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA) ? 
ALTER_SYSCATALOG_TABLE_UPGRADE
+                            : INSERT_COLUMN_ALTER_TABLE;
+                    try (PreparedStatement colUpsert = 
connection.prepareStatement(addColumnSqlToUse)) {
                         short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
                         for( ColumnDef colDef : columnDefs) {
                             if (colDef != null && !colDef.isNull()) {
@@ -2992,11 +3280,41 @@ public class MetaDataClient {
                             if (!colDef.validateDefault(context, null)) {
                                 colDef = new ColumnDef(colDef, null); // 
Remove DEFAULT as it's not necessary
                             }
-                            PColumn column = newColumn(position++, colDef, 
PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : 
table.getDefaultFamilyName().getString(), true);
+                            Integer encodedCQ = null;
+                            if (!colDef.isPK()) {
+                                String colDefFamily = 
colDef.getColumnDefName().getFamilyName();
+                                String familyName = null;
+                                ImmutableStorageScheme storageScheme = 
table.getImmutableStorageScheme();
+                                String defaultColumnFamily = 
tableForCQCounters.getDefaultFamilyName() != null && 
!Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ? 
+                                        
tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
+                                    if (table.getType() == PTableType.INDEX && 
table.getIndexType() == IndexType.LOCAL) {
+                                        defaultColumnFamily = 
QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
+                                    }
+                                if (storageScheme == 
SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                                    familyName = colDefFamily != null ? 
colDefFamily : defaultColumnFamily;
+                                } else {
+                                    familyName = defaultColumnFamily;
+                                }
+                                encodedCQ = 
cqCounterToUse.getNextQualifier(familyName);
+                                if (cqCounterToUse.increment(familyName)) {
+                                    changedCqCounters.put(familyName,
+                                        
cqCounterToUse.getNextQualifier(familyName));
+                                }
+                            }
+                            byte[] columnQualifierBytes = null;
+                            try {
+                                columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(),
 encodedCQ, table, colDef.isPK());
+                            }
+                            catch (QualifierOutOfRangeException e) {
+                                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
+                                .setSchemaName(schemaName)
+                                
.setTableName(tableName).build().buildException();
+                            }
+                            PColumn column = newColumn(position++, colDef, 
PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : 
table.getDefaultFamilyName().getString(), true, columnQualifierBytes);
                             columns.add(column);
                             String pkName = null;
                             Short keySeq = null;
-
+                            
                             // TODO: support setting properties on other 
families?
                             if (column.getFamilyName() == null) {
                                 ++numPkColumnsAdded;
@@ -3008,13 +3326,13 @@ public class MetaDataClient {
                             
colFamiliesForPColumnsToBeAdded.add(column.getFamilyName() == null ? null : 
column.getFamilyName().getString());
                             addColumnMutation(schemaName, tableName, column, 
colUpsert, null, pkName, keySeq, table.getBucketNum() != null);
                         }
-
+                        
                         // Add any new PK columns to end of index PK
-                        if (numPkColumnsAdded>0) {
+                        if (numPkColumnsAdded > 0) {
                             // create PK column list that includes the newly 
created columns
                             List<PColumn> pkColumns = 
Lists.newArrayListWithExpectedSize(table.getPKColumns().size()+numPkColumnsAdded);
                             pkColumns.addAll(table.getPKColumns());
-                            for (int i=0; i<columnDefs.size(); ++i) {
+                            for (int i=0; i<numCols; ++i) {
                                 if (columnDefs.get(i).isPK()) {
                                     pkColumns.add(columns.get(i));
                                 }
@@ -3023,14 +3341,14 @@ public class MetaDataClient {
                             for (PTable index : table.getIndexes()) {
                                 short nextIndexKeySeq = 
SchemaUtil.getMaxKeySeq(index);
                                 int indexPosition = index.getColumns().size();
-                                for (int i=0; i<columnDefs.size(); ++i) {
+                                for (int i=0; i<numCols; ++i) {
                                     ColumnDef colDef = columnDefs.get(i);
                                     if (colDef.isPK()) {
                                         PDataType indexColDataType = 
IndexUtil.getIndexColumnDataType(colDef.isNull(), colDef.getDataType());
                                         ColumnName indexColName = 
ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, 
colDef.getColumnDefName().getColumnName()));
                                         Expression expression = new 
RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, 
++pkSlotPosition));
                                         ColumnDef indexColDef = 
FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), 
colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, 
colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
-                                        PColumn indexColumn = 
newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true);
+                                        PColumn indexColumn = 
newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, 
null);
                                         addColumnMutation(schemaName, 
index.getTableName().getString(), indexColumn, colUpsert, 
index.getParentTableName().getString(), index.getPKName() == null ? null : 
index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
                                     }
                                 }
@@ -3066,10 +3384,10 @@ public class MetaDataClient {
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
-                long seqNum = table.getSequenceNumber();
+                
                 if (changingPhoenixTableProperty || columnDefs.size() > 0) {
-                    seqNum = incrementTableSeqNum(table, tableType, 
columnDefs.size(), isTransactional, updateCacheFrequency, isImmutableRows,
-                            disableWAL, multiTenant, storeNulls, 
guidePostWidth, appendOnlySchema);
+                    incrementTableSeqNum(table, tableType, columnDefs.size(), 
isTransactional, updateCacheFrequency, isImmutableRows,
+                            disableWAL, multiTenant, storeNulls, 
guidePostWidth, appendOnlySchema, immutableStorageScheme);
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
@@ -3078,6 +3396,33 @@ public class MetaDataClient {
                 Collections.reverse(tableMetaData);
                 // Add column metadata afterwards, maintaining the order so 
columns have more predictable ordinal position
                 tableMetaData.addAll(columnMetaData);
+                boolean sharedIndex = tableType == PTableType.INDEX && 
(table.getIndexType() == IndexType.LOCAL || table.getViewIndexId() != null);
+                String tenantIdToUse = connection.getTenantId() != null && 
sharedIndex ? connection.getTenantId().getString() : null;
+                if (!changedCqCounters.isEmpty()) {
+                    PreparedStatement linkStatement;
+                        linkStatement = 
connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER);
+                        for (Entry<String, Integer> entry : 
changedCqCounters.entrySet()) {    
+                            linkStatement.setString(1, tenantIdToUse);
+                            linkStatement.setString(2, 
tableForCQCounters.getSchemaName().getString());
+                            linkStatement.setString(3, 
tableForCQCounters.getTableName().getString());
+                            linkStatement.setString(4, entry.getKey());
+                            linkStatement.setInt(5, entry.getValue());
+                            linkStatement.execute();
+                        }
+
+                    // When a view adds its own columns, then we need to 
increase the sequence number of the base table
+                    // too since we want clients to get the latest PTable of 
the base table.
+                    if (tableType == VIEW) {
+                        PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM);
+                        incrementStatement.setString(1, null);
+                        incrementStatement.setString(2, 
tableForCQCounters.getSchemaName().getString());
+                        incrementStatement.setString(3, 
tableForCQCounters.getTableName().getString());
+                        incrementStatement.setLong(4, 
tableForCQCounters.getSequenceNumber() + 1);
+                        incrementStatement.execute();
+                    }
+                    
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
+                    connection.rollback();
+                }
 
                 byte[] family = families.size() > 0 ? 
families.iterator().next().getBytes() : null;
 
@@ -3107,7 +3452,6 @@ public class MetaDataClient {
                         }
                         return new MutationState(0,connection);
                     }
-
                     // Only update client side cache if we aren't adding a PK 
column to a table with indexes or
                     // transitioning a table from non transactional to 
transactional.
                     // We could update the cache manually then too, it'd just 
be a pain.
@@ -3115,6 +3459,7 @@ public class MetaDataClient {
                     long resolvedTimeStamp = 
TransactionUtil.getResolvedTime(connection, result);
                     if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 
&& !nonTxToTx)) {
                         connection.addTable(result.getTable(), 
resolvedTimeStamp);
+                        table = result.getTable();
                     } else if (updateCacheFrequency != null) {
                         // Force removal from cache as the update cache 
frequency has changed
                         // Note that clients outside this JVM won't be 
affected.
@@ -3139,7 +3484,7 @@ public class MetaDataClient {
                             PTable viewIndexTable = new PTableImpl(null,
                                     
SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                     
SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                    table.getColumnFamilies(), 
table.isNamespaceMapped());
+                                    table.getColumnFamilies(), 
table.isNamespaceMapped(), table.getImmutableStorageScheme(), 
table.getEncodingScheme());
                             List<TableRef> tableRefs = 
Collections.singletonList(new TableRef(null, viewIndexTable, ts, false));
                             MutationPlan plan = new 
PostDDLCompiler(connection).compile(tableRefs, null, null,
                                     Collections.<PColumn> emptyList(), ts);
@@ -3291,23 +3636,24 @@ public class MetaDataClient {
                 Long timeStamp = table.isTransactional() ? 
tableRef.getTimeStamp() : null;
                 for (PTable index : table.getIndexes()) {
                     IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(table, connection);
-                    // get the columns required for the index pk
-                    Set<ColumnReference> indexColumns = 
indexMaintainer.getIndexedColumns();
-                    // get the covered columns
-                    Set<ColumnReference> coveredColumns = 
indexMaintainer.getCoveredColumns();
+                    // get the covered columns 
                     List<PColumn> indexColumnsToDrop = 
Lists.newArrayListWithExpectedSize(columnRefs.size());
+                    Set<Pair<String, String>> indexedColsInfo = 
indexMaintainer.getIndexedColumnInfo();
+                    Set<ColumnReference> coveredCols = 
indexMaintainer.getCoveredColumns();
                     for(PColumn columnToDrop : tableColumnsToDrop) {
-                        ColumnReference columnToDropRef = new 
ColumnReference(columnToDrop.getFamilyName().getBytes(), 
columnToDrop.getName().getBytes());
-                        // if the columns being dropped is indexed and the 
physical index table is not shared
-                        if (indexColumns.contains(columnToDropRef)) {
-                            if (index.getViewIndexId()==null)
+                        Pair<String, String> columnToDropInfo = new 
Pair<>(columnToDrop.getFamilyName().getString(), 
columnToDrop.getName().getString());
+                        ColumnReference colDropRef = new 
ColumnReference(columnToDrop.getFamilyName() == null ? null
+                                : columnToDrop.getFamilyName().getBytes(), 
columnToDrop.getColumnQualifierBytes());
+                        boolean isColumnIndexed = 
indexedColsInfo.contains(columnToDropInfo);
+                        if (isColumnIndexed) {
+                            if (index.getViewIndexId() == null) { 
                                 indexesToDrop.add(new TableRef(index));
+                            }
                             connection.removeTable(tenantId, 
SchemaUtil.getTableName(schemaName, index.getName().getString()), 
index.getParentName() == null ? null : index.getParentName().getString(), 
index.getTimeStamp());
                             removedIndexTableOrColumn = true;
-                        }
-                        else if (coveredColumns.contains(columnToDropRef)) {
+                        } else if (coveredCols.contains(colDropRef)) {
                             String indexColumnName = 
IndexUtil.getIndexColumnName(columnToDrop);
-                            PColumn indexColumn = 
index.getColumn(indexColumnName);
+                            PColumn indexColumn = 
index.getColumnForColumnName(indexColumnName);
                             indexColumnsToDrop.add(indexColumn);
                             // add the index column to be dropped so that we 
actually delete the column values
                             columnsToDrop.add(new ColumnRef(new 
TableRef(index), indexColumn.getPosition()));
@@ -3397,13 +3743,13 @@ public class MetaDataClient {
                         // so we need to issue deletes markers for all the 
rows of the index
                         final List<TableRef> tableRefsToDrop = 
Lists.newArrayList();
                         Map<String, List<TableRef>> tenantIdTableRefMap = 
Maps.newHashMap();
-                        if (result.getSharedTablesToDelete()!=null) {
+                        if (result.getSharedTablesToDelete() != null) {
                             for (SharedTableState sharedTableState : 
result.getSharedTablesToDelete()) {
                                 PTableImpl viewIndexTable = new 
PTableImpl(sharedTableState.getTenantId(),
                                         sharedTableState.getSchemaName(), 
sharedTableState.getTableName(), ts,
                                         table.getColumnFamilies(), 
sharedTableState.getColumns(),
                                         sharedTableState.getPhysicalNames(), 
sharedTableState.getViewIndexId(),
-                                        table.isMultiTenant(), 
table.isNamespaceMapped());
+                                        table.isMultiTenant(), 
table.isNamespaceMapped(), table.getImmutableStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
                                 TableRef indexTableRef = new 
TableRef(viewIndexTable);
                                 PName indexTableTenantId = 
sharedTableState.getTenantId();
                                 if (indexTableTenantId==null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 0f5fa44..9e26227 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -27,7 +27,7 @@ package org.apache.phoenix.schema;
 public interface PColumn extends PDatum {
 
     /**
-     * @return the name of the column qualifier
+     * @return the name of the column
      */
     PName getName();
 
@@ -60,4 +60,6 @@ public interface PColumn extends PDatum {
     boolean isRowTimestamp();
     
     boolean isDynamic();
+    
+    byte[] getColumnQualifierBytes();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
index 24da14d..c4c383e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
@@ -39,16 +39,22 @@ public interface PColumnFamily {
     Collection<PColumn> getColumns();
     
     /**
-     * @return The PColumn for the specified column qualifier.
+     * @return The PColumn for the specified column name.
      * @throws ColumnNotFoundException if the column cannot be found
      */
-    PColumn getColumn(byte[] qualifier) throws ColumnNotFoundException;
+    PColumn getPColumnForColumnNameBytes(byte[] columnNameBytes) throws 
ColumnNotFoundException;
     
     /**
-     * @return The PColumn for the specified column qualifier.
+     * @return The PColumn for the specified column name.
      * @throws ColumnNotFoundException if the column cannot be found
      */
-    PColumn getColumn(String name) throws ColumnNotFoundException;
+    PColumn getPColumnForColumnName(String columnName) throws 
ColumnNotFoundException;
     
     int getEstimatedSize();
+    
+    /**
+     * @return The PColumn for the specified column qualifier.
+     * @throws ColumnNotFoundException if the column cannot be found
+     */
+    PColumn getPColumnForColumnQualifier(byte[] cq) throws 
ColumnNotFoundException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
index 2e29656..453e33b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
@@ -17,10 +17,14 @@
  */
 package org.apache.phoenix.schema;
 
+import static 
org.apache.phoenix.util.EncodedColumnsUtil.usesEncodedColumnNames;
+
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Preconditions;
@@ -31,8 +35,9 @@ import com.google.common.collect.ImmutableSortedMap;
 public class PColumnFamilyImpl implements PColumnFamily {
     private final PName name;
     private final List<PColumn> columns;
-    private final Map<String, PColumn> columnByString;
-    private final Map<byte[], PColumn> columnByBytes;
+    private final Map<String, PColumn> columnNamesByStrings;
+    private final Map<byte[], PColumn> columnNamesByBytes;
+    private final Map<byte[], PColumn> columnsByQualifiers;
     private final int estimatedSize;
 
     @Override
@@ -47,15 +52,23 @@ public class PColumnFamilyImpl implements PColumnFamily {
                 SizedUtil.sizeOfMap(columns.size()) * 2 + 
SizedUtil.sizeOfArrayList(columns.size());
         this.name = name;
         this.columns = ImmutableList.copyOf(columns);
-        ImmutableMap.Builder<String, PColumn> columnByStringBuilder = 
ImmutableMap.builder();
-        ImmutableSortedMap.Builder<byte[], PColumn> columnByBytesBuilder = 
ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableMap.Builder<String, PColumn> columnNamesByStringBuilder = 
ImmutableMap.builder();
+        ImmutableSortedMap.Builder<byte[], PColumn> columnNamesByBytesBuilder 
= ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableSortedMap.Builder<byte[], PColumn> columnsByQualifiersBuilder 
= ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
         for (PColumn column : columns) {
             estimatedSize += column.getEstimatedSize();
-            columnByBytesBuilder.put(column.getName().getBytes(), column);
-            columnByStringBuilder.put(column.getName().getString(), column);
+            columnNamesByBytesBuilder.put(column.getName().getBytes(), column);
+            columnNamesByStringBuilder.put(column.getName().getString(), 
column);
+            // In certain cases like JOIN, PK columns are assigned a column 
family. So they
+            // are not evaluated as a PK column. However, their column 
qualifier bytes are
+            // still null.
+            if (!SchemaUtil.isPKColumn(column) && 
column.getColumnQualifierBytes() != null) {
+                
columnsByQualifiersBuilder.put(column.getColumnQualifierBytes(), column);
+            }
         }
-        this.columnByBytes = columnByBytesBuilder.build();
-        this.columnByString = columnByStringBuilder.build();
+        this.columnNamesByBytes = columnNamesByBytesBuilder.build();
+        this.columnNamesByStrings = columnNamesByStringBuilder.build();
+        this.columnsByQualifiers =  columnsByQualifiersBuilder.build();
         this.estimatedSize = (int)estimatedSize;
     }
     
@@ -70,20 +83,28 @@ public class PColumnFamilyImpl implements PColumnFamily {
     }
 
     @Override
-    public PColumn getColumn(byte[] qualifier) throws ColumnNotFoundException  
{
-        PColumn column = columnByBytes.get(qualifier);
+    public PColumn getPColumnForColumnNameBytes(byte[] columnNameBytes) throws 
ColumnNotFoundException  {
+        PColumn column = columnNamesByBytes.get(columnNameBytes);
         if (column == null) {
-            throw new ColumnNotFoundException(Bytes.toString(qualifier));
+            throw new ColumnNotFoundException(Bytes.toString(columnNameBytes));
         }
         return column;
     }
     
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException  {
-        PColumn column = columnByString.get(name);
+    public PColumn getPColumnForColumnName(String columnName) throws 
ColumnNotFoundException  {
+        PColumn column = columnNamesByStrings.get(columnName);
         if (column == null) {
-            throw new ColumnNotFoundException(name);
+            throw new ColumnNotFoundException(columnName);
         }
         return column;
     }
+    
+    
+    //TODO: samarth think about backward compatibility here
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws 
ColumnNotFoundException {
+        Preconditions.checkNotNull(cq);
+        return columnsByQualifiers.get(cq);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index ca827d8..78baa4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Preconditions;
@@ -40,6 +41,7 @@ public class PColumnImpl implements PColumn {
     private String expressionStr;
     private boolean isRowTimestamp;
     private boolean isDynamic;
+    private byte[] columnQualifierBytes;
     
     public PColumnImpl() {
     }
@@ -51,13 +53,13 @@ public class PColumnImpl implements PColumn {
                        Integer scale,
                        boolean nullable,
                        int position,
-                       SortOrder sortOrder, Integer arrSize, byte[] 
viewConstant, boolean isViewReferenced, String expressionStr, boolean 
isRowTimestamp, boolean isDynamic) {
-        init(name, familyName, dataType, maxLength, scale, nullable, position, 
sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic);
+                       SortOrder sortOrder, Integer arrSize, byte[] 
viewConstant, boolean isViewReferenced, String expressionStr, boolean 
isRowTimestamp, boolean isDynamic, byte[] columnQualifierBytes) {
+        init(name, familyName, dataType, maxLength, scale, nullable, position, 
sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic, columnQualifierBytes);
     }
 
     public PColumnImpl(PColumn column, int position) {
         this(column.getName(), column.getFamilyName(), column.getDataType(), 
column.getMaxLength(),
-                column.getScale(), column.isNullable(), position, 
column.getSortOrder(), column.getArraySize(), column.getViewConstant(), 
column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), 
column.isDynamic());
+                column.getScale(), column.isNullable(), position, 
column.getSortOrder(), column.getArraySize(), column.getViewConstant(), 
column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), 
column.isDynamic(), column.getColumnQualifierBytes());
     }
 
     private void init(PName name,
@@ -69,7 +71,7 @@ public class PColumnImpl implements PColumn {
             int position,
             SortOrder sortOrder,
             Integer arrSize,
-            byte[] viewConstant, boolean isViewReferenced, String 
expressionStr, boolean isRowTimestamp, boolean isDynamic) {
+            byte[] viewConstant, boolean isViewReferenced, String 
expressionStr, boolean isRowTimestamp, boolean isDynamic, byte[] 
columnQualifierBytes) {
        Preconditions.checkNotNull(sortOrder);
         this.dataType = dataType;
         if (familyName == null) {
@@ -94,6 +96,7 @@ public class PColumnImpl implements PColumn {
         this.expressionStr = expressionStr;
         this.isRowTimestamp = isRowTimestamp;
         this.isDynamic = isDynamic;
+        this.columnQualifierBytes = columnQualifierBytes;
     }
 
     @Override
@@ -205,6 +208,15 @@ public class PColumnImpl implements PColumn {
     public boolean isDynamic() {
         return isDynamic;
     }
+    
+    @Override
+    public byte[] getColumnQualifierBytes() {
+        // Needed for backward compatibility
+        if (!SchemaUtil.isPKColumn(this) && columnQualifierBytes == null) {
+            return this.name.getBytes();
+        }
+        return columnQualifierBytes;
+    }
 
     /**
      * Create a PColumn instance from PBed PColumn instance
@@ -251,8 +263,12 @@ public class PColumnImpl implements PColumn {
         if (column.hasIsDynamic()) {
                isDynamic = column.getIsDynamic();
         }
+        byte[] columnQualifierBytes = null;
+        if (column.hasColumnQualifierBytes()) {
+            columnQualifierBytes = 
column.getColumnQualifierBytes().toByteArray();
+        }
         return new PColumnImpl(columnName, familyName, dataType, maxLength, 
scale, nullable, position, sortOrder,
-                arraySize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic);
+                arraySize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic, columnQualifierBytes);
     }
 
     public static PTableProtos.PColumn toProto(PColumn column) {
@@ -283,6 +299,9 @@ public class PColumnImpl implements PColumn {
             builder.setExpression(column.getExpressionStr());
         }
         builder.setIsRowTimestamp(column.isRowTimestamp());
+        if (column.getColumnQualifierBytes() != null) {
+            
builder.setColumnQualifierBytes(ByteStringer.wrap(column.getColumnQualifierBytes()));
+        }
         return builder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 6ca38cc..169e78d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -199,7 +199,7 @@ public class PMetaDataImpl implements PMetaData {
             if (familyName == null) {
                 column = 
table.getPKColumn(columnToRemove.getName().getString());
             } else {
-                column = 
table.getColumnFamily(familyName).getColumn(columnToRemove.getName().getString());
+                column = 
table.getColumnFamily(familyName).getPColumnForColumnName(columnToRemove.getName().getString());
             }
             int positionOffset = 0;
             int position = column.getPosition();
@@ -214,7 +214,7 @@ public class PMetaDataImpl implements PMetaData {
             // Update position of columns that follow removed column
             for (int i = position+1; i < oldColumns.size(); i++) {
                 PColumn oldColumn = oldColumns.get(i);
-                PColumn newColumn = new PColumnImpl(oldColumn.getName(), 
oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), 
oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, 
oldColumn.getSortOrder(), oldColumn.getArraySize(), 
oldColumn.getViewConstant(), oldColumn.isViewReferenced(), 
oldColumn.getExpressionStr(), oldColumn.isRowTimestamp(), 
oldColumn.isDynamic());
+                PColumn newColumn = new PColumnImpl(oldColumn.getName(), 
oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), 
oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, 
oldColumn.getSortOrder(), oldColumn.getArraySize(), 
oldColumn.getViewConstant(), oldColumn.isViewReferenced(), 
oldColumn.getExpressionStr(), oldColumn.isRowTimestamp(), 
oldColumn.isDynamic(), oldColumn.getColumnQualifierBytes());
                 columns.add(newColumn);
             }
             

Reply via email to