http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index aec4482..c119ebb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -27,6 +27,8 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
@@ -34,6 +36,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
@@ -57,6 +60,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
@@ -189,8 +193,11 @@ import org.apache.phoenix.schema.PMetaDataEntity;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
@@ -209,10 +216,12 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -283,6 +292,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
     private static final KeyValue AUTO_PARTITION_SEQ_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
AUTO_PARTITION_SEQ_BYTES);
     private static final KeyValue APPEND_ONLY_SCHEMA_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
APPEND_ONLY_SCHEMA_BYTES);
+    private static final KeyValue STORAGE_SCHEME_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
STORAGE_SCHEME_BYTES);
+    private static final KeyValue ENCODING_SCHEME_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
ENCODING_SCHEME_BYTES);
     
     private static final List<KeyValue> TABLE_KV_COLUMNS = 
Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
@@ -309,7 +320,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             UPDATE_CACHE_FREQUENCY_KV,
             IS_NAMESPACE_MAPPED_KV,
             AUTO_PARTITION_SEQ_KV,
-            APPEND_ONLY_SCHEMA_KV
+            APPEND_ONLY_SCHEMA_KV,
+            STORAGE_SCHEME_KV,
+            ENCODING_SCHEME_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -339,6 +352,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int IS_NAMESPACE_MAPPED_INDEX = 
TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
     private static final int AUTO_PARTITION_SEQ_INDEX = 
TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV);
     private static final int APPEND_ONLY_SCHEMA_INDEX = 
TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV);
+    private static final int STORAGE_SCHEME_INDEX = 
TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
+    private static final int QUALIFIER_ENCODING_SCHEME_INDEX = 
TABLE_KV_COLUMNS.indexOf(ENCODING_SCHEME_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DECIMAL_DIGITS_BYTES);
@@ -352,6 +367,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final KeyValue IS_VIEW_REFERENCED_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_VIEW_REFERENCED_BYTES);
     private static final KeyValue COLUMN_DEF_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_DEF_BYTES);
     private static final KeyValue IS_ROW_TIMESTAMP_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_ROW_TIMESTAMP_BYTES);
+    private static final KeyValue COLUMN_QUALIFIER_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_QUALIFIER_BYTES);
     private static final List<KeyValue> COLUMN_KV_COLUMNS = 
Arrays.<KeyValue>asList(
             DECIMAL_DIGITS_KV,
             COLUMN_SIZE_KV,
@@ -364,11 +380,13 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             VIEW_CONSTANT_KV,
             IS_VIEW_REFERENCED_KV,
             COLUMN_DEF_KV,
-            IS_ROW_TIMESTAMP_KV
+            IS_ROW_TIMESTAMP_KV,
+            COLUMN_QUALIFIER_KV
             );
     static {
         Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
     }
+    private static final KeyValue QUALIFIER_COUNTER_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_QUALIFIER_COUNTER_BYTES);
     private static final int DECIMAL_DIGITS_INDEX = 
COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV);
     private static final int COLUMN_SIZE_INDEX = 
COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
     private static final int NULLABLE_INDEX = 
COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
@@ -380,9 +398,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int IS_VIEW_REFERENCED_INDEX = 
COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
     private static final int COLUMN_DEF_INDEX = 
COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
     private static final int IS_ROW_TIMESTAMP_INDEX = 
COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
+    private static final int COLUMN_QUALIFIER_INDEX = 
COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV);
     
     private static final int LINK_TYPE_INDEX = 0;
-
+    
     private static final KeyValue CLASS_NAME_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
CLASS_NAME_BYTES);
     private static final KeyValue JAR_PATH_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
JAR_PATH_BYTES);
     private static final KeyValue RETURN_TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
RETURN_TYPE_BYTES);
@@ -718,8 +737,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 isRowTimestampKV == null ? false : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                         isRowTimestampKV.getValueArray(), 
isRowTimestampKV.getValueOffset(),
                         isRowTimestampKV.getValueLength()));
-
-        PColumn column = new PColumnImpl(colName, famName, dataType, 
maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, 
isViewReferenced, expressionStr, isRowTimestamp, false);
+        
+        boolean isPkColumn = famName == null || famName.getString() == null;
+        Cell columnQualifierKV = colKeyValues[COLUMN_QUALIFIER_INDEX];
+        // Older tables won't have column qualifier metadata present. To make 
things simpler, just set the
+        // column qualifier bytes by using the column name.
+        byte[] columnQualifierBytes = columnQualifierKV != null ? 
+                Arrays.copyOfRange(columnQualifierKV.getValueArray(),
+                    columnQualifierKV.getValueOffset(), 
columnQualifierKV.getValueOffset()
+                            + columnQualifierKV.getValueLength()) : 
(isPkColumn ? null : colName.getBytes());
+        PColumn column = new PColumnImpl(colName, famName, dataType, 
maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, 
isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes);
         columns.add(column);
     }
     
@@ -927,37 +954,55 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
                 : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
                     isAppendOnlySchemaKv.getValueOffset(), 
isAppendOnlySchemaKv.getValueLength()));
-        
+        Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
+        //TODO: change this once we start having other values for storage 
schemes
+        ImmutableStorageScheme storageScheme = storageSchemeKv == null ? 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme
+                
.fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
+                        storageSchemeKv.getValueOffset(), 
storageSchemeKv.getValueLength()));
+        Cell encodingSchemeKv = 
tableKeyValues[QUALIFIER_ENCODING_SCHEME_INDEX];
+        QualifierEncodingScheme encodingScheme = encodingSchemeKv == null ? 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme
+                
.fromSerializedValue((byte)PTinyint.INSTANCE.toObject(encodingSchemeKv.getValueArray(),
+                    encodingSchemeKv.getValueOffset(), 
encodingSchemeKv.getValueLength()));
         
         List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = Lists.newArrayList();
         List<PName> physicalTables = Lists.newArrayList();
         PName parentTableName = tableType == INDEX ? dataTableName : null;
         PName parentSchemaName = tableType == INDEX ? schemaName : null;
+        EncodedCQCounter cqCounter =
+                (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || 
tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER
+                        : new EncodedCQCounter();
         while (true) {
-          results.clear();
-          scanner.next(results);
-          if (results.isEmpty()) {
-              break;
-          }
-          Cell colKv = results.get(LINK_TYPE_INDEX);
-          int colKeyLength = colKv.getRowLength();
-          PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + 
offset, colKeyLength-offset);
-          int colKeyOffset = offset + colName.getBytes().length + 1;
-          PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + 
colKeyOffset, colKeyLength-colKeyOffset);
-          if (colName.getString().isEmpty() && famName != null) {
-              LinkType linkType = 
LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
-              if (linkType == LinkType.INDEX_TABLE) {
-                  addIndexToTable(tenantId, schemaName, famName, tableName, 
clientTimeStamp, indexes);
-              } else if (linkType == LinkType.PHYSICAL_TABLE) {
-                  physicalTables.add(famName);
-              } else if (linkType == LinkType.PARENT_TABLE) {
-                  parentTableName = 
PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
-                  parentSchemaName = 
PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
-              }
-          } else {
-              addColumnToTable(results, colName, famName, colKeyValues, 
columns, saltBucketNum != null);
-          }
+            results.clear();
+            scanner.next(results);
+            if (results.isEmpty()) {
+                break;
+            }
+            Cell colKv = results.get(LINK_TYPE_INDEX);
+            if (colKv != null) {
+                int colKeyLength = colKv.getRowLength();
+                PName colName = newPName(colKv.getRowArray(), 
colKv.getRowOffset() + offset, colKeyLength-offset);
+                int colKeyOffset = offset + colName.getBytes().length + 1;
+                PName famName = newPName(colKv.getRowArray(), 
colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
+                if (isQualifierCounterKV(colKv)) {
+                    Integer value = 
PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), 
colKv.getValueOffset(), SortOrder.ASC);
+                    cqCounter.setValue(famName.getString(), value);
+                } else {
+                    if (colName.getString().isEmpty() && famName != null) {
+                        LinkType linkType = 
LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+                        if (linkType == LinkType.INDEX_TABLE) {
+                            addIndexToTable(tenantId, schemaName, famName, 
tableName, clientTimeStamp, indexes);
+                        } else if (linkType == LinkType.PHYSICAL_TABLE) {
+                            physicalTables.add(famName);
+                        } else if (linkType == LinkType.PARENT_TABLE) {
+                            parentTableName = 
PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
+                            parentSchemaName = 
PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
+                        }
+                    } else {
+                        addColumnToTable(results, colName, famName, 
colKeyValues, columns, saltBucketNum != null);
+                    }
+                } 
+            }
         }
         // Avoid querying the stats table because we're holding the rowLock 
here. Issuing an RPC to a remote
         // server while holding this lock is a bad idea and likely to cause 
contention.
@@ -965,9 +1010,17 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 pkName, saltBucketNum, columns, parentSchemaName, 
parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
                 viewStatement, disableWAL, multiTenant, storeNulls, viewType, 
viewIndexId, indexType,
                 rowKeyOrderOptimizable, transactional, updateCacheFrequency, 
baseColumnCount,
-                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema);
+                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema, storageScheme, encodingScheme, cqCounter);
     }
-
+    
+    private boolean isQualifierCounterKV(Cell kv) {
+        int cmp =
+                Bytes.compareTo(kv.getQualifierArray(), 
kv.getQualifierOffset(),
+                    kv.getQualifierLength(), 
QUALIFIER_COUNTER_KV.getQualifierArray(),
+                    QUALIFIER_COUNTER_KV.getQualifierOffset(), 
QUALIFIER_COUNTER_KV.getQualifierLength());
+        return cmp == 0;
+    }
+    
     private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) 
throws IOException, SQLException {
         List<Cell> results = Lists.newArrayList();
         scanner.next(results);
@@ -1419,7 +1472,6 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 // tableMetadata and set the view statement and partition 
column correctly
                 if (parentTable!=null && 
parentTable.getAutoPartitionSeqName()!=null) {
                     long autoPartitionNum = 1;
-                    final Properties props = new Properties();
                     try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
                         Statement stmt = connection.createStatement()) {
                         String seqName = parentTable.getAutoPartitionSeqName();
@@ -1487,46 +1539,46 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 Short indexId = null;
                 if (request.hasAllocateIndexId() && 
request.getAllocateIndexId()) {
                     String tenantIdStr = tenantIdBytes.length == 0 ? null : 
Bytes.toString(tenantIdBytes);
-                    try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)){
-                    PName physicalName = parentTable.getPhysicalName();
-                    int nSequenceSaltBuckets = 
connection.getQueryServices().getSequenceSaltBuckets();
-                    SequenceKey key = 
MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
+                    try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
 {
+                        PName physicalName = parentTable.getPhysicalName();
+                        int nSequenceSaltBuckets = 
connection.getQueryServices().getSequenceSaltBuckets();
+                        SequenceKey key = 
MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
                             nSequenceSaltBuckets, 
parentTable.isNamespaceMapped() );
                         // TODO Review Earlier sequence was created at 
(SCN-1/LATEST_TIMESTAMP) and incremented at the client 
max(SCN,dataTable.getTimestamp), but it seems we should
                         // use always LATEST_TIMESTAMP to avoid seeing wrong 
sequence values by different connection having SCN
                         // or not. 
-                    long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
-                    try {
-                        
connection.getQueryServices().createSequence(key.getTenantId(), 
key.getSchemaName(), key.getSequenceName(),
+                        long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
+                        try {
+                            
connection.getQueryServices().createSequence(key.getTenantId(), 
key.getSchemaName(), key.getSequenceName(),
                                 Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, 
Long.MAX_VALUE, false, sequenceTimestamp);
-                    } catch (SequenceAlreadyExistsException e) {
-                    }
-                    long[] seqValues = new long[1];
-                    SQLException[] sqlExceptions = new SQLException[1];
-                    
connection.getQueryServices().incrementSequences(Collections.singletonList(new 
SequenceAllocation(key, 1)),
+                        } catch (SequenceAlreadyExistsException e) {
+                        }
+                        long[] seqValues = new long[1];
+                        SQLException[] sqlExceptions = new SQLException[1];
+                        
connection.getQueryServices().incrementSequences(Collections.singletonList(new 
SequenceAllocation(key, 1)),
                             HConstants.LATEST_TIMESTAMP, seqValues, 
sqlExceptions);
-                    if (sqlExceptions[0] != null) {
-                        throw sqlExceptions[0];
-                    }
-                    long seqValue = seqValues[0];
-                    if (seqValue > Short.MAX_VALUE) {
-                        
builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
-                        
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                        done.run(builder.build());
-                        return;
-                    }
-                    Put tableHeaderPut = 
MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
-                    NavigableMap<byte[], List<Cell>> familyCellMap = 
tableHeaderPut.getFamilyCellMap();
-                    List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
-                    Cell cell = cells.get(0);
-                    PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
-                    Object val = dataType.toObject(seqValue, PLong.INSTANCE);
-                    byte[] bytes = new byte [dataType.getByteSize() + 1];
-                    dataType.toBytes(val, bytes, 0);
-                    Cell indexIdCell = new KeyValue(cell.getRow(), 
cell.getFamily(), VIEW_INDEX_ID_BYTES,
+                        if (sqlExceptions[0] != null) {
+                            throw sqlExceptions[0];
+                        }
+                        long seqValue = seqValues[0];
+                        if (seqValue > Short.MAX_VALUE) {
+                            
builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
+                            
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            done.run(builder.build());
+                            return;
+                        }
+                        Put tableHeaderPut = 
MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
+                        NavigableMap<byte[], List<Cell>> familyCellMap = 
tableHeaderPut.getFamilyCellMap();
+                        List<Cell> cells = 
familyCellMap.get(TABLE_FAMILY_BYTES);
+                        Cell cell = cells.get(0);
+                        PDataType dataType = 
MetaDataUtil.getViewIndexIdDataType();
+                        Object val = dataType.toObject(seqValue, 
PLong.INSTANCE);
+                        byte[] bytes = new byte [dataType.getByteSize() + 1];
+                        dataType.toBytes(val, bytes, 0);
+                        Cell indexIdCell = new KeyValue(cell.getRow(), 
cell.getFamily(), VIEW_INDEX_ID_BYTES,
                             cell.getTimestamp(), 
Type.codeToType(cell.getTypeByte()), bytes);
-                    cells.add(indexIdCell);
-                    indexId = (short) seqValue;
+                        cells.add(indexIdCell);
+                        indexId = (short) seqValue;
                     }
                 }
                 
@@ -1537,7 +1589,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 // indexing on the system table. This is an issue because of 
the way we manage batch mutation
                 // in the Indexer.
                 region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
-
+                
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created 
here, patching up the parent table, and updating the cache
                 Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -1989,7 +2041,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     return result;
                 }
                 region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
-                // Invalidate from cache
+                // Invalidate from cache.
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
                     metaDataCache.invalidate(invalidateKey);
                 }
@@ -2169,6 +2221,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 int pkCount = getVarChars(m.getRow(), rkmd);
                 // check if this put is for adding a column
                 if (pkCount > COLUMN_NAME_INDEX
+                        && rkmd[COLUMN_NAME_INDEX] != null && 
rkmd[COLUMN_NAME_INDEX].length > 0
                         && Bytes.compareTo(schemaName, 
rkmd[SCHEMA_NAME_INDEX]) == 0
                         && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) 
== 0) {
                     columnPutsForBaseTable.add(new 
PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, 
ORDINAL_POSITION_BYTES)));
@@ -2221,8 +2274,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
                 String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : 
Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
                 try {
-                    existingViewColumn = columnFamily == null ? 
view.getColumn(columnName) : view.getColumnFamily(
-                            columnFamily).getColumn(columnName);
+                    existingViewColumn = columnFamily == null ? 
view.getColumnForColumnName(columnName) : view.getColumnFamily(
+                            columnFamily).getPColumnForColumnName(columnName);
                 } catch (ColumnFamilyNotFoundException e) {
                     // ignore since it means that the column family is not 
present for the column to be added.
                 } catch (ColumnNotFoundException e) {
@@ -2551,8 +2604,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 byte[] columnKey = getColumnKey(viewKey, columnName, 
columnFamily);
                 try {
                     existingViewColumn =
-                            columnFamily == null ? view.getColumn(columnName) 
: view
-                                    
.getColumnFamily(columnFamily).getColumn(columnName);
+                            columnFamily == null ? 
view.getColumnForColumnName(columnName) : view
+                                    
.getColumnFamily(columnFamily).getPColumnForColumnName(columnName);
                 } catch (ColumnFamilyNotFoundException e) {
                     // ignore since it means that the column family is not 
present for the column to
                     // be added.
@@ -2618,7 +2671,25 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     
     private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn 
existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean 
isColumnToBeAddPkCol, PTable view) {
         if (existingViewColumn != null) {
-            
+            if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) 
&& !SchemaUtil.isPKColumn(existingViewColumn)) {
+                /*
+                 * If the column already exists in a view, then we cannot add 
the column to the base
+                 * table. The reason is subtle and is as follows: consider the 
case where a table
+                 * has two views where both the views have the same key value 
column KV. Now, we
+                 * dole out encoded column qualifiers for key value columns in 
views by using the
+                 * counters stored in the base physical table. So the KV 
column can have different
+                 * column qualifiers for the two views. For example, 11 for 
VIEW1 and 12 for VIEW2.
+                 * This naturally extends to rows being inserted using the two 
views having
+                 * different column qualifiers for the column named KV. Now, 
when an attempt is made
+                 * to add column KV to the base table, we cannot decide which 
column qualifier
+                 * should that column be assigned. It cannot be a number 
different than 11 or 12
+                 * since a query like SELECT KV FROM BASETABLE would return 
null for KV which is
+                 * incorrect since column KV is present in rows inserted from 
the two views. We
+                 * cannot use 11 or 12 either because we will then incorrectly 
return value of KV
+                 * column inserted using only one view.
+                 */
+                return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+            }
             // Validate data type is same
             int baseColumnDataType = getInteger(columnToBeAdded, 
TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
             if (baseColumnDataType != 
existingViewColumn.getDataType().getSqlType()) {
@@ -2848,6 +2919,16 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                        return mutationResult;
                             } 
                         }
+                    } else if (type == PTableType.VIEW
+                            && 
EncodedColumnsUtil.usesEncodedColumnNames(table)) {
+                        /*
+                         * When adding a column to a view that uses encoded 
column name scheme, we
+                         * need to modify the CQ counters stored in the view's 
physical table. So to
+                         * make sure clients get the latest PTable, we need to 
invalidate the cache
+                         * entry.
+                         */
+                        invalidateList.add(new ImmutableBytesPtr(MetaDataUtil
+                                .getPhysicalTableRowForView(table)));
                     }
                     for (Mutation m : tableMetaData) {
                         byte[] key = m.getRow();
@@ -2861,7 +2942,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                         && 
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
                                     PColumnFamily family =
                                             
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                                    
family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                    
family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                 } else if (pkCount > COLUMN_NAME_INDEX
                                         && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                     addingPKColumn = true;
@@ -3114,7 +3195,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                         PColumnFamily family =
                                                 
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
                                         columnToDelete =
-                                                
family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                                
family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                     } else if (pkCount > COLUMN_NAME_INDEX
                                             && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                         deletePKColumn = true;
@@ -3203,10 +3284,12 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             byte[] indexKey =
                     SchemaUtil.getTableKey(tenantId, 
index.getSchemaName().getBytes(), index
                             .getTableName().getBytes());
+            Pair<String, String> columnToDeleteInfo = new 
Pair<>(columnToDelete.getFamilyName().getString(), 
columnToDelete.getName().getString());
+            ColumnReference colDropRef = new 
ColumnReference(columnToDelete.getFamilyName().getBytes(), 
columnToDelete.getColumnQualifierBytes());
+            boolean isColumnIndexed = 
indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo);
+            boolean isCoveredColumn = 
indexMaintainer.getCoveredColumns().contains(colDropRef);
             // If index requires this column for its pk, then drop it
-            if (indexMaintainer.getIndexedColumns().contains(
-                new ColumnReference(columnToDelete.getFamilyName().getBytes(), 
columnToDelete
-                        .getName().getBytes()))) {
+            if (isColumnIndexed) {
                 // Since we're dropping the index, lock it to ensure
                 // that a change in index state doesn't
                 // occur while we're dropping it.
@@ -3227,9 +3310,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
             // If the dropped column is a covered index column, invalidate the 
index
-            else if (indexMaintainer.getCoveredColumns().contains(
-                new ColumnReference(columnToDelete.getFamilyName().getBytes(), 
columnToDelete
-                        .getName().getBytes()))) {
+            else if (isCoveredColumn){
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 83290db..dd445ce 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -85,8 +85,9 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 = 
MIN_TABLE_TIMESTAMP + 18;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 = 
MIN_TABLE_TIMESTAMP + 18;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 = 
MIN_TABLE_TIMESTAMP + 20;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 = 
MIN_TABLE_TIMESTAMP + 25;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the 
MIN_SYSTEM_TABLE_TIMESTAMP_* constants
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0;
     
     // ALWAYS update this map whenever rolling out a new release (major, minor 
or patch release). 
     // Key is the SYSTEM.CATALOG timestamp for the version and value is the 
version string.
@@ -101,6 +102,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, "4.7.x");
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, "4.8.x");
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0, "4.9.x");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0, "4.10.x");
     }
     
     public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION 
+ "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index d5e5542..1fb8221 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -386,7 +386,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                             IndexMaintainer.serializeAdditional(dataPTable, 
indexMetaDataPtr, indexesToPartiallyRebuild,
                                     conn);
                             byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-                            
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                            
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
                             MutationState mutationState = plan.execute();
                             long rowCount = mutationState.getUpdateCount();
                             LOG.info(rowCount + " rows of index which are 
rebuild");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 3cfe790..6ff6de9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static 
org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFromScan;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -44,6 +46,7 @@ import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -57,10 +60,13 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -107,7 +113,7 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         }
     }
 
-    public static OrderedResultIterator deserializeFromScan(Scan scan, 
RegionScanner s) {
+    private static OrderedResultIterator deserializeFromScan(Scan scan, 
RegionScanner s) {
         byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
         if (topN == null) {
             return null;
@@ -125,7 +131,8 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
                 orderByExpression.readFields(input);
                 orderByExpressions.add(orderByExpression);
             }
-            ResultIterator inner = new RegionScannerResultIterator(s);
+            QualifierEncodingScheme encodingScheme = 
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+            ResultIterator inner = new RegionScannerResultIterator(s, 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
             return new OrderedResultIterator(inner, orderByExpressions, 
thresholdBytes, limit >= 0 ? limit : null, null,
                     estimatedRowSize);
         } catch (IOException e) {
@@ -151,7 +158,9 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
             DataInputStream input = new DataInputStream(stream);
             int arrayKVRefSize = WritableUtils.readVInt(input);
             for (int i = 0; i < arrayKVRefSize; i++) {
-                KeyValueColumnExpression kvExp = new 
KeyValueColumnExpression();
+                ImmutableStorageScheme scheme = 
EncodedColumnsUtil.getImmutableStorageScheme(scan);
+                KeyValueColumnExpression kvExp = scheme != 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression()
+                        : new KeyValueColumnExpression();
                 kvExp.readFields(input);
                 arrayKVRefs.add(kvExp);
             }
@@ -208,8 +217,13 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         if (dataColumns != null) {
             tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
             dataRegion = c.getEnvironment().getRegion();
-            byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
-            List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes);
+            boolean useProto = false;
+            byte[] localIndexBytes = 
scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+            useProto = localIndexBytes != null;
+            if (localIndexBytes == null) {
+                localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+            }
+            List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes, useProto);
             indexMaintainer = indexMaintainers.get(0);
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             byte[] txState = 
scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
@@ -218,21 +232,22 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan)) && 
scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
         innerScanner =
                 getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, 
offset, scan,
                     dataColumns, tupleProjector, dataRegion, indexMaintainer, 
tx,
-                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : 
null, ptr);
+                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : 
null, ptr, useQualifierAsIndex);
 
         final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
         if (j != null) {
-            innerScanner = new HashJoinRegionScanner(innerScanner, p, j, 
tenantId, c.getEnvironment());
+            innerScanner = new HashJoinRegionScanner(innerScanner, p, j, 
tenantId, c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier);
         }
         if (scanOffset != null) {
             innerScanner = getOffsetScanner(c, innerScanner,
-                    new OffsetResultIterator(new 
RegionScannerResultIterator(innerScanner), scanOffset),
+                    new OffsetResultIterator(new 
RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), 
encodingScheme), scanOffset),
                     scan.getAttribute(QueryConstants.LAST_SCAN) != null);
         }
-        final OrderedResultIterator iterator = 
deserializeFromScan(scan,innerScanner);
+        final OrderedResultIterator iterator = deserializeFromScan(scan, 
innerScanner);
         if (iterator == null) {
             return innerScanner;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index bf889d5..98f57ad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -72,7 +72,7 @@ public class ServerCachingEndpointImpl extends 
ServerCachingService implements C
           (Class<ServerCacheFactory>) 
Class.forName(request.getCacheFactory().getClassName());
       ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
       tenantCache.addServerCache(new 
ImmutableBytesPtr(request.getCacheId().toByteArray()),
-        cachePtr, txState, cacheFactory);
+        cachePtr, txState, cacheFactory, 
request.hasHasProtoBufIndexMaintainer() && 
request.getHasProtoBufIndexMaintainer());
     } catch (Throwable e) {
       ProtobufUtil.setControllerException(controller, new IOException(e));
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
index b201c8e..139a69c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
  */
 public interface ServerCachingProtocol {
     public static interface ServerCacheFactory extends Writable {
-        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] 
txState, MemoryChunk chunk) throws SQLException;
+        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] 
txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws 
SQLException;
     }
     /**
      * Add the cache to the region server cache.  

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d94c715..f075371 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -101,7 +101,10 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
 import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
@@ -109,6 +112,7 @@ import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -181,9 +185,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     }
 
     private void commitBatch(HRegion region, List<Mutation> mutations, byte[] 
indexUUID, long blockingMemstoreSize,
-            byte[] indexMaintainersPtr, byte[] txState) throws IOException {
+            byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) 
throws IOException {
         if (indexMaintainersPtr != null) {
-            mutations.get(0).setAttribute(PhoenixIndexCodec.INDEX_MD, 
indexMaintainersPtr);
+            mutations.get(0).setAttribute(useIndexProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, 
indexMaintainersPtr);
         }
 
         if (txState != null) {
@@ -212,13 +216,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     }
     
     private void commitBatchWithHTable(HTable table, HRegion region, 
List<Mutation> mutations, byte[] indexUUID,
-            long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] 
txState) throws IOException {
+            long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] 
txState, boolean useIndexProto) throws IOException {
 
         if (indexUUID != null) {
             // Need to add indexMaintainers for each mutation as table.batch 
can be distributed across servers
             for (Mutation m : mutations) {
                 if (indexMaintainersPtr != null) {
-                    m.setAttribute(PhoenixIndexCodec.INDEX_MD, 
indexMaintainersPtr);
+                    m.setAttribute(useIndexProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, 
indexMaintainersPtr);
                 }
                 if (txState != null) {
                     m.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
@@ -327,8 +331,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             }
             values = new byte[projectedTable.getPKColumns().size()][];
         }
-        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
-        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes);
+        boolean useProto = false;
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+        useProto = localIndexBytes != null;
+        if (localIndexBytes == null) {
+            localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        }
+        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes, useProto);
         List<Mutation> indexMutations = localIndexBytes == null ? 
Collections.<Mutation>emptyList() : 
Lists.<Mutation>newArrayListWithExpectedSize(1024);
         
         RegionScanner theScanner = s;
@@ -369,6 +378,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
         if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j 
== null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, 
dataColumns);
@@ -377,11 +387,11 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
                     getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector, 
-                            c.getEnvironment().getRegion(), indexMaintainers 
== null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+                            c.getEnvironment().getRegion(), indexMaintainers 
== null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, 
useQualifierAsIndex);
         } 
         
         if (j != null)  {
-            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env);
+            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env, useQualifierAsIndex, 
useNewValueColumnQualifier);
         }
         
         int batchSize = 0;
@@ -420,13 +430,20 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         Aggregator[] rowAggregators = aggregators.getAggregators();
         boolean hasMore;
         boolean hasAny = false;
-        MultiKeyValueTuple result = new MultiKeyValueTuple();
+        Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
         }
         long rowCount = 0;
         final RegionScanner innerScanner = theScanner;
-        byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+        boolean useIndexProto = true;
+        byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        // for backward compatiblity fall back to look by the old attribute
+        if (indexMaintainersPtr == null) {
+            indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            useIndexProto = false;
+        }
         boolean acquiredLock = false;
         try {
             if(needToWrite) {
@@ -438,7 +455,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             acquiredLock = true;
             synchronized (innerScanner) {
                 do {
-                    List<Cell> results = new ArrayList<Cell>();
+                    List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
                     // Results are potentially returned even when the return 
value of s.next is false
                     // since this is an indication of whether or not there are 
more values after the
                     // ones returned
@@ -653,7 +670,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                 MutationState.getMutationBatchList(batchSize, 
batchSizeBytes, mutations);
                             for (List<Mutation> batchMutations : 
batchMutationList) {
                                 commit(region, batchMutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr,
-                                        txState, areMutationInSameRegion, 
targetHTable);
+                                        txState, areMutationInSameRegion, 
targetHTable, useIndexProto);
                                 batchMutations.clear();
                             }
                             mutations.clear();
@@ -661,7 +678,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             List<List<Mutation>> batchIndexMutationList =
                                 MutationState.getMutationBatchList(batchSize, 
batchSizeBytes, indexMutations);
                             for (List<Mutation> batchIndexMutations : 
batchIndexMutationList) {
-                                commitBatch(region, batchIndexMutations, null, 
blockingMemStoreSize, null, txState);
+                                commitBatch(region, batchIndexMutations, null, 
blockingMemStoreSize, null, txState, useIndexProto);
                                 batchIndexMutations.clear();
                             }
                             indexMutations.clear();
@@ -672,12 +689,12 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
                     commit(region, mutations, indexUUID, blockingMemStoreSize, 
indexMaintainersPtr, txState,
-                            areMutationInSameRegion, targetHTable);
+                            areMutationInSameRegion, targetHTable, 
useIndexProto);
                     mutations.clear();
                 }
 
                 if (!indexMutations.isEmpty()) {
-                    commitBatch(region, indexMutations, null, 
blockingMemStoreSize, indexMaintainersPtr, txState);
+                    commitBatch(region, indexMutations, null, 
blockingMemStoreSize, indexMaintainersPtr, txState, useIndexProto);
                     indexMutations.clear();
                 }
             }
@@ -734,14 +751,14 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     }
 
     private void commit(HRegion region, List<Mutation> mutations, byte[] 
indexUUID, long blockingMemstoreSize,
-            byte[] indexMaintainersPtr, byte[] txState, boolean 
areMutationsForSameRegion, HTable hTable)
+            byte[] indexMaintainersPtr, byte[] txState, boolean 
areMutationsForSameRegion, HTable hTable, boolean useIndexProto)
             throws IOException {
         if (!areMutationsForSameRegion) {
             assert hTable != null;// table cannot be null
             commitBatchWithHTable(hTable, region, mutations, indexUUID, 
blockingMemstoreSize, indexMaintainersPtr,
-                    txState);
+                    txState, useIndexProto);
         } else {
-            commitBatch(region, mutations, indexUUID, blockingMemstoreSize, 
indexMaintainersPtr, txState);
+            commitBatch(region, mutations, indexUUID, blockingMemstoreSize, 
indexMaintainersPtr, txState, useIndexProto);
         }
     }
 
@@ -808,7 +825,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, 
final HRegion region, final Scan scan,
             Configuration config) throws IOException {
-        byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+        byte[] indexMetaData = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        boolean useProto = true;
+        // for backward compatibility fall back to look up by the old attribute
+        if (indexMetaData == null) {
+            useProto = false;
+            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+        }
         boolean hasMore;
         long rowCount = 0;
         try {
@@ -828,7 +851,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             if (KeyValue.Type.codeToType(cell.getTypeByte()) 
== KeyValue.Type.Put) {
                                 if (put == null) {
                                     put = new Put(CellUtil.cloneRow(cell));
-                                    
put.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+                                    put.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
                                             PDataType.TRUE_BYTES);
@@ -838,7 +861,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             } else {
                                 if (del == null) {
                                     del = new Delete(CellUtil.cloneRow(cell));
-                                    
del.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+                                    del.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
                                             PDataType.TRUE_BYTES);

Reply via email to