http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 67b7663..ee1af19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -47,6 +47,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
@@ -71,7 +72,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -93,9 +93,9 @@ import static 
org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
 import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
+import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
-import static 
org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_COLUMN_FAMILY;
-import static 
org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -195,11 +195,11 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.QualifierOutOfRangeException;
-import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.types.PDataType;
@@ -273,7 +273,7 @@ public class MetaDataClient {
                     AUTO_PARTITION_SEQ +  "," +
                     APPEND_ONLY_SCHEMA + "," +
                     GUIDE_POSTS_WIDTH + "," +
-                    STORAGE_SCHEME + "," +
+                    IMMUTABLE_STORAGE_SCHEME + "," +
                     ENCODING_SCHEME + 
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
@@ -1695,7 +1695,8 @@ public class MetaDataClient {
                     ? SchemaUtil.isNamespaceMappingEnabled(tableType, 
connection.getQueryServices().getProps())
                     : parent.isNamespaceMapped();
             boolean isLocalIndex = indexType == IndexType.LOCAL;
-            QualifierEncodingScheme encodingScheme = 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+            QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
+            ImmutableStorageScheme immutableStorageScheme = 
ONE_CELL_PER_COLUMN;
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, 
transactional);
                 storeNulls = parent.getStoreNulls();
@@ -2033,7 +2034,6 @@ public class MetaDataClient {
             }
             int pkPositionOffset = pkColumns.size();
             int position = positionOffset;
-            StorageScheme storageScheme = ONE_CELL_PER_KEYVALUE_COLUMN;
             EncodedCQCounter cqCounter = NULL_COUNTER;
             PTable viewPhysicalTable = null;
             if (tableType == PTableType.VIEW) {
@@ -2048,7 +2048,7 @@ public class MetaDataClient {
                      * encoded column qualifier.
                      */
                     viewPhysicalTable = PhoenixRuntime.getTable(connection, 
physicalNames.get(0).getString());
-                    storageScheme = viewPhysicalTable.getStorageScheme();
+                    immutableStorageScheme = 
viewPhysicalTable.getImmutableStorageScheme();
                     encodingScheme = viewPhysicalTable.getEncodingScheme();
                                        if 
(EncodedColumnsUtil.usesEncodedColumnNames(viewPhysicalTable)) {
                         cqCounter  = viewPhysicalTable.getEncodedCQCounter();
@@ -2088,21 +2088,29 @@ public class MetaDataClient {
                 }
                 if (tableExists) {
                     encodingScheme = NON_ENCODED_QUALIFIERS;
+                    immutableStorageScheme = ONE_CELL_PER_COLUMN;
                 } else if (parent != null) {
                     encodingScheme = parent.getEncodingScheme();
+                    immutableStorageScheme = 
parent.getImmutableStorageScheme();
                 } else {
                        Byte encodingSchemeSerializedByte = (Byte) 
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
                     if (encodingSchemeSerializedByte == null) {
                        encodingSchemeSerializedByte = 
(byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB,
 QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES);
                     } 
                     encodingScheme =  
QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte);
+                    if (isImmutableRows) {
+                        immutableStorageScheme = (ImmutableStorageScheme) 
TableProperty.IMMUTABLE_STORAGE_SCHEME.getValue(tableProps);
+                        if (immutableStorageScheme == null) {
+                            immutableStorageScheme = 
ImmutableStorageScheme.valueOf(connection.getQueryServices().getProps().get(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
 QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
+                        } 
+                        if (immutableStorageScheme!=ONE_CELL_PER_COLUMN && 
encodingScheme == NON_ENCODED_QUALIFIERS) {
+                            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                        }
+                    } 
                 }
-                if (isImmutableRows && encodingScheme != 
NON_ENCODED_QUALIFIERS) {
-                    storageScheme = ONE_CELL_PER_COLUMN_FAMILY;
-                } 
                 cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new 
EncodedCQCounter() : NULL_COUNTER;
             }
-                        
+
             Map<String, Integer> changedCqCounters = new 
HashMap<>(colDefs.size());
             for (ColumnDef colDef : colDefs) {
                 rowTimeStampColumnAlreadyFound = 
checkAndValidateRowTimestampCol(colDef, pkConstraint, 
rowTimeStampColumnAlreadyFound, tableType);
@@ -2127,7 +2135,7 @@ public class MetaDataClient {
                 boolean isPkColumn = isPkColumn(pkConstraint, colDef, 
columnDefName);
                 String cqCounterFamily = null;
                 if (!isPkColumn) {
-                    if (storageScheme == ONE_CELL_PER_COLUMN_FAMILY && 
encodingScheme != NON_ENCODED_QUALIFIERS) {
+                    if (immutableStorageScheme == 
SINGLE_CELL_ARRAY_WITH_OFFSETS && encodingScheme != NON_ENCODED_QUALIFIERS) {
                         // For this scheme we track column qualifier counters 
at the column family level.
                         cqCounterFamily = colDefFamily != null ? colDefFamily 
: (defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY);
                     } else {
@@ -2272,7 +2280,7 @@ public class MetaDataClient {
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == 
null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, 
null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema, ONE_CELL_PER_KEYVALUE_COLUMN, NON_ENCODED_QUALIFIERS, 
PTable.EncodedCQCounter.NULL_COUNTER);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, 
null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, 
PTable.EncodedCQCounter.NULL_COUNTER);
                 connection.addTable(table, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
             
@@ -2426,7 +2434,7 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setLong(25, guidePostsWidth);
             }
-            tableUpsert.setByte(26, 
storageScheme.getSerializedMetadataValue()); //TODO: samarth should there be a 
null check here?
+            tableUpsert.setByte(26, 
immutableStorageScheme.getSerializedMetadataValue()); //TODO: samarth should 
there be a null check here?
             tableUpsert.setByte(27, 
encodingScheme.getSerializedMetadataValue());
             tableUpsert.execute();
 
@@ -2532,7 +2540,7 @@ public class MetaDataClient {
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : 
PNameFactory.newName(pkName), saltBucketNum, columns.values(),
                         parent == null ? null : parent.getSchemaName(), parent 
== null ? null : parent.getTableName(), Collections.<PTable>emptyList(), 
isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : 
PNameFactory.newName(defaultFamilyName), viewStatement, 
Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        result.getViewIndexId(), indexType, 
rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, 
isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, 
encodingScheme, cqCounterToBe);
+                        result.getViewIndexId(), indexType, 
rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, 
isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, 
immutableStorageScheme, encodingScheme, cqCounterToBe);
                 result = new MetaDataMutationResult(code, 
result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2716,7 +2724,7 @@ public class MetaDataClient {
                                 PTable viewIndexTable = new PTableImpl(null,
                                         
SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                         
SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                        
table.getColumnFamilies(),table.isNamespaceMapped(), table.getStorageScheme(), 
table.getEncodingScheme());
+                                        
table.getColumnFamilies(),table.isNamespaceMapped(), 
table.getImmutableStorageScheme(), table.getEncodingScheme());
                                 tableRefs.add(new TableRef(null, 
viewIndexTable, ts, false));
                             }
                         }
@@ -2837,12 +2845,12 @@ public class MetaDataClient {
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency) 
throws SQLException {
-        return incrementTableSeqNum(table, expectedType, columnCountDelta, 
isTransactional, updateCacheFrequency, null, null, null, null, -1L, null);
+        return incrementTableSeqNum(table, expectedType, columnCountDelta, 
isTransactional, updateCacheFrequency, null, null, null, null, -1L, null, null);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta,
             Boolean isTransactional, Long updateCacheFrequency, Boolean 
isImmutableRows, Boolean disableWAL,
-            Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, 
Boolean appendOnlySchema)
+            Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, 
Boolean appendOnlySchema, ImmutableStorageScheme immutableStorageScheme)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -2886,6 +2894,10 @@ public class MetaDataClient {
         if (appendOnlySchema !=null) {
             mutateBooleanProperty(tenantId, schemaName, tableName, 
APPEND_ONLY_SCHEMA, appendOnlySchema);
         }
+        if (immutableStorageScheme !=null) {
+            mutateStringProperty(tenantId, schemaName, tableName, 
IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name());
+        }
+        
         return seqNum;
     }
 
@@ -2926,6 +2938,23 @@ public class MetaDataClient {
             tableBoolUpsert.execute();
         }
     }
+    
+    private void mutateStringProperty(String tenantId, String schemaName, 
String tableName,
+            String propertyName, String propertyValue) throws SQLException {
+        String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + 
".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+                TENANT_ID + "," +
+                TABLE_SCHEM + "," +
+                TABLE_NAME + "," +
+                propertyName +
+                ") VALUES (?, ?, ?, ?)";
+        try (PreparedStatement tableBoolUpsert = 
connection.prepareStatement(updatePropertySql)) {
+            tableBoolUpsert.setString(1, tenantId);
+            tableBoolUpsert.setString(2, schemaName);
+            tableBoolUpsert.setString(3, tableName);
+            tableBoolUpsert.setString(4, propertyValue);
+            tableBoolUpsert.execute();
+        }
+    }
 
     public MutationState addColumn(AddColumnStatement statement) throws 
SQLException {
         PTable table = FromCompiler.getResolver(statement, 
connection).getTables().get(0).getTable();
@@ -2951,6 +2980,7 @@ public class MetaDataClient {
             Long updateCacheFrequencyProp = null;
             Boolean appendOnlySchemaProp = null;
             Long guidePostWidth = -1L;
+            ImmutableStorageScheme immutableStorageSchemeProp = null;
 
             Map<String, List<Pair<String, Object>>> properties = new 
HashMap<>(stmtProperties.size());
             List<ColumnDef> columnDefs = null;
@@ -3013,6 +3043,8 @@ public class MetaDataClient {
                             guidePostWidth = (Long)value;
                         } else if (propName.equals(APPEND_ONLY_SCHEMA)) {
                             appendOnlySchemaProp = (Boolean) value;
+                        } else if 
(propName.equalsIgnoreCase(IMMUTABLE_STORAGE_SCHEME)) {
+                            immutableStorageSchemeProp = 
(ImmutableStorageScheme)value;
                         }
                     }
                     // if removeTableProps is true only add the property if it 
is not a HTable or Phoenix Table property
@@ -3055,7 +3087,7 @@ public class MetaDataClient {
                 Boolean isImmutableRows = null;
                 if (isImmutableRowsProp != null) {
                     if (isImmutableRowsProp.booleanValue() != 
table.isImmutableRows()) {
-                       if (table.getStorageScheme() != 
StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN) {
+                       if (table.getImmutableStorageScheme() != 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
                                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY)
                                
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
                        }
@@ -3091,6 +3123,18 @@ public class MetaDataClient {
                         changingPhoenixTableProperty = true;
                     }
                 }
+                ImmutableStorageScheme immutableStorageScheme = null;
+                if (immutableStorageSchemeProp!=null) {
+                    if (table.getImmutableStorageScheme() == 
ONE_CELL_PER_COLUMN || 
+                            immutableStorageSchemeProp == ONE_CELL_PER_COLUMN) 
{
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE)
+                        
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                    }
+                    else if (immutableStorageSchemeProp != 
table.getImmutableStorageScheme()) {
+                        immutableStorageScheme = immutableStorageSchemeProp;
+                        changingPhoenixTableProperty = true;
+                    }
+                }
             
                 if (guidePostWidth == null || guidePostWidth >= 0) {
                     changingPhoenixTableProperty = true;
@@ -3170,13 +3214,13 @@ public class MetaDataClient {
                             if (!colDef.isPK()) {
                                 String colDefFamily = 
colDef.getColumnDefName().getFamilyName();
                                 String familyName = null;
-                                StorageScheme storageScheme = 
table.getStorageScheme();
+                                ImmutableStorageScheme storageScheme = 
table.getImmutableStorageScheme();
                                 String defaultColumnFamily = 
tableForCQCounters.getDefaultFamilyName() != null && 
!Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ? 
                                         
tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
                                     if (table.getType() == PTableType.INDEX && 
table.getIndexType() == IndexType.LOCAL) {
                                         defaultColumnFamily = 
QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
                                     }
-                                if (storageScheme == 
ONE_CELL_PER_COLUMN_FAMILY) {
+                                if (storageScheme == 
SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                                     familyName = colDefFamily != null ? 
colDefFamily : defaultColumnFamily;
                                 } else {
                                     familyName = defaultColumnFamily;
@@ -3271,10 +3315,9 @@ public class MetaDataClient {
                     connection.rollback();
                 }
                 
-                long seqNum = table.getSequenceNumber();
                 if (changingPhoenixTableProperty || columnDefs.size() > 0) {
-                    seqNum = incrementTableSeqNum(table, tableType, 
columnDefs.size(), isTransactional, updateCacheFrequency, isImmutableRows,
-                            disableWAL, multiTenant, storeNulls, 
guidePostWidth, appendOnlySchema);
+                    incrementTableSeqNum(table, tableType, columnDefs.size(), 
isTransactional, updateCacheFrequency, isImmutableRows,
+                            disableWAL, multiTenant, storeNulls, 
guidePostWidth, appendOnlySchema, immutableStorageScheme);
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
@@ -3371,7 +3414,7 @@ public class MetaDataClient {
                             PTable viewIndexTable = new PTableImpl(null,
                                     
SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                     
SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                    table.getColumnFamilies(), 
table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme());
+                                    table.getColumnFamilies(), 
table.isNamespaceMapped(), table.getImmutableStorageScheme(), 
table.getEncodingScheme());
                             List<TableRef> tableRefs = 
Collections.singletonList(new TableRef(null, viewIndexTable, ts, false));
                             MutationPlan plan = new 
PostDDLCompiler(connection).compile(tableRefs, null, null,
                                     Collections.<PColumn> emptyList(), ts);
@@ -3637,7 +3680,7 @@ public class MetaDataClient {
                                         sharedTableState.getSchemaName(), 
sharedTableState.getTableName(), ts,
                                         table.getColumnFamilies(), 
sharedTableState.getColumns(),
                                         sharedTableState.getPhysicalNames(), 
sharedTableState.getViewIndexId(),
-                                        table.isMultiTenant(), 
table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
+                                        table.isMultiTenant(), 
table.isNamespaceMapped(), table.getImmutableStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
                                 TableRef indexTableRef = new 
TableRef(viewIndexTable);
                                 PName indexTableTenantId = 
sharedTableState.getTenantId();
                                 if (indexTableTenantId==null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 5e9608b..4a02e54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.schema;
 
 import static 
org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
 
+import java.io.DataOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -32,6 +33,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;
+import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -164,32 +171,57 @@ public interface PTable extends PMetaDataEntity {
         }
     }
     
-    public enum StorageScheme {
-        ONE_CELL_PER_KEYVALUE_COLUMN((byte)1),
-        ONE_CELL_PER_COLUMN_FAMILY((byte)2);
+    public enum ImmutableStorageScheme implements 
ColumnValueEncoderDecoderSupplier {
+        ONE_CELL_PER_COLUMN((byte)1) {
+            @Override
+            public ColumnValueEncoder getEncoder(int numElements) {
+                throw new UnsupportedOperationException();
+            }
+            
+            @Override
+            public ColumnValueDecoder getDecoder() {
+                throw new UnsupportedOperationException();
+            }
+        },
+        // stores a single cell per column family that contains all serialized 
column values
+        SINGLE_CELL_ARRAY_WITH_OFFSETS((byte)2) {
+            @Override
+            public ColumnValueEncoder getEncoder(int numElements) {
+                PDataType type = PVarbinary.INSTANCE;
+                int estimatedSize = PArrayDataType.estimateSize(numElements, 
type);
+                TrustedByteArrayOutputStream byteStream = new 
TrustedByteArrayOutputStream(estimatedSize);
+                DataOutputStream oStream = new DataOutputStream(byteStream);
+                return new PArrayDataTypeEncoder(byteStream, oStream, 
numElements, type, SortOrder.ASC, false, 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+            }
+            
+            @Override
+            public ColumnValueDecoder getDecoder() {
+                return new PArrayDataTypeDecoder();
+            }
+        };
 
-        private final byte[] byteValue;
         private final byte serializedValue;
         
-        StorageScheme(byte serializedValue) {
+        private ImmutableStorageScheme(byte serializedValue) {
             this.serializedValue = serializedValue;
-            this.byteValue = Bytes.toBytes(this.name());
-        }
-
-        public byte[] getBytes() {
-            return byteValue;
         }
 
         public byte getSerializedMetadataValue() {
             return this.serializedValue;
         }
 
-        public static StorageScheme fromSerializedValue(byte serializedValue) {
-            if (serializedValue < 1 || serializedValue > 
StorageScheme.values().length) {
+        public static ImmutableStorageScheme fromSerializedValue(byte 
serializedValue) {
+            if (serializedValue < 1 || serializedValue > 
ImmutableStorageScheme.values().length) {
                 return null;
             }
-            return StorageScheme.values()[serializedValue-1];
+            return ImmutableStorageScheme.values()[serializedValue-1];
         }
+
+    }
+    
+    interface ColumnValueEncoderDecoderSupplier {
+        ColumnValueEncoder getEncoder(int numElements);
+        ColumnValueDecoder getDecoder();
     }
     
     public enum QualifierEncodingScheme implements QualifierEncoderDecoder {
@@ -393,7 +425,7 @@ public interface PTable extends PMetaDataEntity {
         int decode(byte[] bytes, int offset, int length);
         Integer getMaxQualifier();
     }
-
+    
     long getTimeStamp();
     long getSequenceNumber();
     long getIndexDisableTimestamp();
@@ -607,7 +639,7 @@ public interface PTable extends PMetaDataEntity {
      * you are also not allowed to delete the table  
      */
     boolean isAppendOnlySchema();
-    StorageScheme getStorageScheme();
+    ImmutableStorageScheme getImmutableStorageScheme();
     QualifierEncodingScheme getEncodingScheme();
     EncodedCQCounter getEncodedCQCounter();
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index e84e529..0816fea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -50,10 +50,9 @@ import org.apache.phoenix.compile.ExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
-import org.apache.phoenix.expression.ArrayConstructorExpression;
-import org.apache.phoenix.expression.DelegateExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.SingleCellConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -63,16 +62,12 @@ import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
-import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -153,7 +148,7 @@ public class PTableImpl implements PTable {
     private boolean isNamespaceMapped;
     private String autoPartitionSeqName;
     private boolean isAppendOnlySchema;
-    private StorageScheme storageScheme;
+    private ImmutableStorageScheme immutableStorageScheme;
     private QualifierEncodingScheme qualifierEncodingScheme;
     private EncodedCQCounter encodedCQCounter;
 
@@ -188,7 +183,7 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
     }
     
-    public PTableImpl(PName tenantId, String schemaName, String tableName, 
long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, 
StorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { // For 
base table of mapped VIEW
+    public PTableImpl(PName tenantId, String schemaName, String tableName, 
long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, 
ImmutableStorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { 
// For base table of mapped VIEW
         Preconditions.checkArgument(tenantId==null || 
tenantId.getBytes().length > 0); // tenantId should be null or not empty
         this.tenantId = tenantId;
         this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, 
tableName));
@@ -210,13 +205,13 @@ public class PTableImpl implements PTable {
         this.families = families;
         this.physicalNames = Collections.emptyList();
         this.isNamespaceMapped = isNamespaceMapped;
-        this.storageScheme = storageScheme;
+        this.immutableStorageScheme = storageScheme;
         this.qualifierEncodingScheme = encodingScheme;
     }
     
     // For indexes stored in shared physical tables
     public PTableImpl(PName tenantId, PName schemaName, PName tableName, long 
timestamp, List<PColumnFamily> families, 
-            List<PColumn> columns, List<PName> physicalNames, Short 
viewIndexId, boolean multiTenant, boolean isNamespaceMpped, StorageScheme 
storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
+            List<PColumn> columns, List<PName> physicalNames, Short 
viewIndexId, boolean multiTenant, boolean isNamespaceMpped, 
ImmutableStorageScheme storageScheme, QualifierEncodingScheme 
qualifierEncodingScheme, 
             EncodedCQCounter encodedCQCounter) throws SQLException {
         this.pkColumns = this.allColumns = Collections.emptyList();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
@@ -275,7 +270,7 @@ public class PTableImpl implements PTable {
                     indexes, table.isImmutableRows(), physicalNames, 
table.getDefaultFamilyName(), viewStatement,
                     table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                     table.getBaseColumnCount(), 
table.rowKeyOrderOptimizable(), table.isTransactional(), updateCacheFrequency,
-                    table.getIndexDisableTimestamp(), 
table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
+                    table.getIndexDisableTimestamp(), 
table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getImmutableStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
         }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, 
List<PTable> indexes, PName parentSchemaName, String viewStatement) throws 
SQLException {
@@ -285,7 +280,7 @@ public class PTableImpl implements PTable {
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getImmutableStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, Collection<PColumn> 
columns) throws SQLException {
@@ -295,7 +290,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), table.isImmutableRows(), 
table.getPhysicalNames(), table.getDefaultFamilyName(), 
table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getImmutableStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, Collection<PColumn> columns) throws SQLException {
@@ -305,7 +300,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), 
table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getImmutableStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws 
SQLException {
@@ -315,7 +310,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
                 table.getIndexType(), table.getBaseColumnCount(), 
table.rowKeyOrderOptimizable(), table.isTransactional(),
-                table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
+                table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getImmutableStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean 
isWalDisabled,
@@ -326,7 +321,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), 
table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), 
-                isNamespaceMapped, table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
+                isNamespaceMapped, table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getImmutableStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) 
throws SQLException {
@@ -337,7 +332,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getImmutableStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, boolean 
rowKeyOrderOptimizable) throws SQLException {
@@ -348,7 +343,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                 table.getBaseColumnCount(), rowKeyOrderOptimizable, 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
-                table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
+                table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getImmutableStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table) throws SQLException {
@@ -359,7 +354,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema(), table.getImmutableStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, 
PName tableName, PTableType type,
@@ -368,7 +363,7 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName 
defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, 
ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean 
isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String 
autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, 
QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter 
encodedCQCounter) throws SQLException {
+            long indexDisableTimestamp, boolean isNamespaceMapped, String 
autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme 
storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
EncodedCQCounter encodedCQCounter) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, 
timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, 
defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, 
viewIndexId,
@@ -383,7 +378,7 @@ public class PTableImpl implements PTable {
             boolean disableWAL, boolean multiTenant, boolean storeNulls, 
ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean 
isTransactional, long updateCacheFrequency,
             int baseColumnCount, long indexDisableTimestamp, boolean 
isNamespaceMapped,
-            String autoPartitionSeqName, boolean isAppendOnlySchema, 
StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
EncodedCQCounter encodedCQCounter)
+            String autoPartitionSeqName, boolean isAppendOnlySchema, 
ImmutableStorageScheme storageScheme, QualifierEncodingScheme 
qualifierEncodingScheme, EncodedCQCounter encodedCQCounter)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, 
timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, 
isImmutableRows, physicalNames,
@@ -398,7 +393,7 @@ public class PTableImpl implements PTable {
             List<PName> physicalNames, PName defaultFamilyName, String 
viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, 
IndexType indexType,
             int baseColumnCount, boolean rowKeyOrderOptimizable, boolean 
isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String 
autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, 
+            long indexDisableTimestamp, boolean isNamespaceMapped, String 
autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme 
storageScheme, 
             QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter 
encodedCQCounter) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, 
sequenceNumber, pkName, bucketNum, columns,
                 parentSchemaName, parentTableName, indexes, isImmutableRows, 
physicalNames, defaultFamilyName,
@@ -438,7 +433,7 @@ public class PTableImpl implements PTable {
             List<PTable> indexes, boolean isImmutableRows, List<PName> 
physicalNames, PName defaultFamilyName, String viewExpression, boolean 
disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short 
viewIndexId,
             IndexType indexType , int baseColumnCount, boolean 
rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, 
long indexDisableTimestamp, 
-            boolean isNamespaceMapped, String autoPartitionSeqName, boolean 
isAppendOnlySchema, StorageScheme storageScheme, QualifierEncodingScheme 
qualifierEncodingScheme, 
+            boolean isNamespaceMapped, String autoPartitionSeqName, boolean 
isAppendOnlySchema, ImmutableStorageScheme storageScheme, 
QualifierEncodingScheme qualifierEncodingScheme, 
             EncodedCQCounter encodedCQCounter) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || 
tenantId.getBytes().length > 0); // tenantId should be null or not empty
@@ -475,7 +470,7 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
         this.autoPartitionSeqName = autoPartitionSeqName;
         this.isAppendOnlySchema = isAppendOnlySchema;
-        this.storageScheme = storageScheme;
+        this.immutableStorageScheme = storageScheme;
         this.qualifierEncodingScheme = qualifierEncodingScheme;
         List<PColumn> pkColumns;
         PColumn[] allColumns;
@@ -913,7 +908,7 @@ public class PTableImpl implements PTable {
                 mutations.add(deleteRow);
             } else {
                 // store all columns for a given column family in a single 
cell instead of one column per cell in order to improve write performance
-                if (storageScheme == StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) 
{
+                if (immutableStorageScheme != 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
                     Put put = new Put(this.key);
                     if (isWALDisabled()) {
                         put.setDurability(Durability.SKIP_WAL);
@@ -927,28 +922,19 @@ public class PTableImpl implements PTable {
                             int qualifier = 
qualifierEncodingScheme.decode(column.getColumnQualifierBytes());
                             maxEncodedColumnQualifier = 
Math.max(maxEncodedColumnQualifier, qualifier);
                         }
-                        Expression[] colValues = new 
Expression[maxEncodedColumnQualifier+1];
-                        Arrays.fill(colValues, new 
DelegateExpression(LiteralExpression.newConstant(null)) {
-                                               @Override
-                                           public boolean evaluate(Tuple 
tuple, ImmutableBytesWritable ptr) {
-                                               return false;
-                                           }
-                                       });
-                        // 0 is a reserved position, set it to a non-null 
value so that we can represent absence of a value using a negative offset
-                        
colValues[0]=LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                        Expression[] colValues = 
EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier);
                         for (PColumn column : columns) {
                                if (columnToValueMap.containsKey(column)) {
-                                   int qualifier = 
qualifierEncodingScheme.decode(column.getColumnQualifierBytes());
-                                       colValues[qualifier] = new 
LiteralExpression(columnToValueMap.get(column));
+                                   int colIndex = 
qualifierEncodingScheme.decode(column.getColumnQualifierBytes())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
+                                   colValues[colIndex] = new 
LiteralExpression(columnToValueMap.get(column));
                                }
                         }
                         
                         List<Expression> children = Arrays.asList(colValues);
-                        // we use ArrayConstructorExpression to serialize 
multiple columns into a single byte[]
-                        // construct the ArrayConstructorExpression with a 
variable length data type since columns can be of fixed or variable length 
-                        ArrayConstructorExpression arrayExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, 
rowKeyOrderOptimizable, PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+                        // we use SingleCellConstructorExpression to serialize 
all the columns into a single byte[]
+                        SingleCellConstructorExpression 
singleCellConstructorExpression = new 
SingleCellConstructorExpression(immutableStorageScheme, children);
                         ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
-                        arrayExpression.evaluate(null, ptr);
+                        singleCellConstructorExpression.evaluate(null, ptr);
                         ImmutableBytesPtr colFamilyPtr = new 
ImmutableBytesPtr(columnFamily);
                         addQuietly(put, kvBuilder, kvBuilder.buildPut(keyPtr,
                             colFamilyPtr, 
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
@@ -1027,7 +1013,7 @@ public class PTableImpl implements PTable {
                 removeIfPresent(unsetValues, family, qualifier);
                 // store all columns for a given column family in a single 
cell instead of one column per cell in order to improve write performance
                 // we don't need to do anything with unsetValues as it is only 
used when storeNulls is false, storeNulls is always true when 
storeColsInSingleCell is true
-                if (storageScheme == StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) 
{
+                if (immutableStorageScheme == 
ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                     columnToValueMap.put(column, ptr.get());
                 }
                 else {
@@ -1326,9 +1312,9 @@ public class PTableImpl implements PTable {
         if (table.hasIsAppendOnlySchema()) {
             isAppendOnlySchema = table.getIsAppendOnlySchema();
         }
-        StorageScheme storageScheme = null;
+        ImmutableStorageScheme storageScheme = null;
         if (table.hasStorageScheme()) {
-            storageScheme = 
StorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
+            storageScheme = 
ImmutableStorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
         }
         QualifierEncodingScheme qualifierEncodingScheme = null;
         if (table.hasEncodingScheme()) {
@@ -1440,8 +1426,8 @@ public class PTableImpl implements PTable {
           builder.setAutoParititonSeqName(table.getAutoPartitionSeqName());
       }
       builder.setIsAppendOnlySchema(table.isAppendOnlySchema());
-      if (table.getStorageScheme() != null) {
-          builder.setStorageScheme(ByteStringer.wrap(new 
byte[]{table.getStorageScheme().getSerializedMetadataValue()}));
+      if (table.getImmutableStorageScheme() != null) {
+          builder.setStorageScheme(ByteStringer.wrap(new 
byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()}));
       }
       if (table.getEncodedCQCounter() != null) {
           Map<String, Integer> values = table.getEncodedCQCounter().values();
@@ -1526,8 +1512,8 @@ public class PTableImpl implements PTable {
     }
     
     @Override
-    public StorageScheme getStorageScheme() {
-        return storageScheme;
+    public ImmutableStorageScheme getImmutableStorageScheme() {
+        return immutableStorageScheme;
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 36df961..67ff1a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.util.SchemaUtil;
 
 public enum TableProperty {
@@ -165,7 +166,27 @@ public enum TableProperty {
                        return table.getEncodingScheme();
                }       
            
-       }
+       },
+    
+    IMMUTABLE_STORAGE_SCHEME(PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME, 
COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+        @Override
+        public ImmutableStorageScheme getValue(Object value) {
+            if (value == null) {
+                return null;
+            } else if (value instanceof String) {
+                String strValue = (String) value;
+                return ImmutableStorageScheme.valueOf(strValue.toUpperCase());
+            } else {
+                throw new IllegalArgumentException("Immutable storage scheme 
table property must be a string");
+            }
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getImmutableStorageScheme();
+        }   
+        
+    }
     ;
        
        private final String propertyName;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
index d3065a7..e99003f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -31,11 +31,12 @@ import java.util.NoSuchElementException;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 
 /**
  * List implementation that provides indexed based look up when the cell 
column qualifiers are positive numbers. 
- * These qualifiers are generated by using one of the column qualifier 
encoding schemes specified in {@link QualifierEncodingScheme}. 
+ * These qualifiers are generated by using one of the column qualifier 
encoding schemes specified in {@link ImmutableStorageScheme}. 
  * The api methods in this list assume that the caller wants to see
  * and add only non null elements in the list. 
  * <p>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
index fede7d8..f31f272 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
@@ -22,20 +22,15 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.Types;
 import java.text.Format;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema;
-import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
@@ -354,130 +349,11 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         return createPhoenixArray(bytes, offset, length, sortOrder, baseType, 
maxLength, desiredDataType);
     }
 
-    public static boolean positionAtArrayElement(Tuple tuple, 
ImmutableBytesWritable ptr, int index,
-            Expression arrayExpr, PDataType pDataType, Integer maxLen) {
-        if (!arrayExpr.evaluate(tuple, ptr)) {
-            return false;
-        } else if (ptr.getLength() == 0) { return true; }
-
-        // Given a ptr to the entire array, set ptr to point to a particular 
element within that array
-        // given the type of an array element (see comments in 
PDataTypeForArray)
-        return positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
-    }
-
-    public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, 
int arrayIndex, PDataType baseDataType,
-            Integer byteSize) {
-        byte[] bytes = ptr.get();
-        int initPos = ptr.getOffset();
-        if (!baseDataType.isFixedWidth()) {
-               byte serializationVersion = bytes[ptr.getOffset() + 
ptr.getLength() - Bytes.SIZEOF_BYTE];
-            int noOfElements = Bytes.toInt(bytes,
-                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
-            boolean useShort = true;
-            if (noOfElements < 0) {
-                noOfElements = -noOfElements;
-                useShort = false;
-            }
-            if (arrayIndex >= noOfElements) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                return false;
-            }
-
-            int indexOffset = Bytes.toInt(bytes,
-                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
-            if (arrayIndex >= noOfElements) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                return false;
-            } else {
-                // Skip those many offsets as given in the arrayIndex
-                // If suppose there are 5 elements in the array and the 
arrayIndex = 3
-                // This means we need to read the 4th element of the array
-                // So inorder to know the length of the 4th element we will 
read the offset of 4th element and the
-                // offset of 5th element.
-                // Subtracting the offset of 5th element and 4th element will 
give the length of 4th element
-                // So we could just skip reading the other elements.
-                int currOffset = getSerializedOffset(bytes, arrayIndex, 
useShort, indexOffset, serializationVersion);
-                if (currOffset<0) {
-                       ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                    return false;
-                }
-                int elementLength = 0;
-                if (arrayIndex == (noOfElements - 1)) {
-                    int separatorBytes =  serializationVersion == 
SORTABLE_SERIALIZATION_VERSION ? 3 : 0;
-                    elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
-                            - (currOffset + initPos) - separatorBytes;
-                } else {
-                    int separatorByte =  serializationVersion == 
SORTABLE_SERIALIZATION_VERSION ? 1 : 0;
-                    elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset, 
serializationVersion) - currOffset - separatorByte;
-                }
-                ptr.set(bytes, currOffset + initPos, elementLength);
-            }
-        } else {
-            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() 
: byteSize);
-            int offset = arrayIndex * elemByteSize;
-            if (offset >= ptr.getLength()) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-            } else {
-                ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
-            }
-        }
-        return true;
-    }
-
-    public static void positionAtArrayElement(ImmutableBytesWritable ptr, int 
arrayIndex, PDataType baseDataType,
-            Integer byteSize, int offset, int length, int noOfElements, 
boolean first) {
-        byte[] bytes = ptr.get();
-        if (!baseDataType.isFixedWidth()) {
-               byte serializationVersion = bytes[ptr.getOffset() + 
ptr.getLength() - Bytes.SIZEOF_BYTE];
-            int indexOffset = Bytes.toInt(bytes, (offset + length - 
(Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT)))
-                    + offset;
-            boolean useShort = true;
-            if (first) {
-                int count = Bytes.toInt(bytes,
-                        (ptr.getOffset() + ptr.getLength() - 
(Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
-                if (count < 0) {
-                    count = -count;
-                    useShort = false;
-                }
-            }
-            if (arrayIndex >= noOfElements) {
-                return;
-            } else {
-                // Skip those many offsets as given in the arrayIndex
-                // If suppose there are 5 elements in the array and the 
arrayIndex = 3
-                // This means we need to read the 4th element of the array
-                // So inorder to know the length of the 4th element we will 
read the offset of 4th element and the
-                // offset of 5th element.
-                // Subtracting the offset of 5th element and 4th element will 
give the length of 4th element
-                // So we could just skip reading the other elements.
-                int currOffset = getOffset(bytes, arrayIndex, useShort, 
indexOffset, serializationVersion);
-                int elementLength = 0;
-                if (arrayIndex == (noOfElements - 1)) {
-                    elementLength = (bytes[currOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
-                            - (currOffset + offset) - 3;
-                } else {
-                    elementLength = (bytes[currOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset, 
serializationVersion) - currOffset - 1;
-                }
-                ptr.set(bytes, currOffset + offset, elementLength);
-            }
-        } else {
-            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() 
: byteSize);
-            offset += arrayIndex * elemByteSize;
-            if (offset >= offset + length) {
-                return;
-            } else {
-                ptr.set(bytes, offset, elemByteSize);
-            }
-        }
-    }
-
-    private static int getOffset(byte[] bytes, int arrayIndex, boolean 
useShort, int indexOffset, byte serializationVersion) {
+    static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int 
indexOffset, byte serializationVersion) {
         return Math.abs(getSerializedOffset(bytes, arrayIndex, useShort, 
indexOffset, serializationVersion));
     }
 
-       private static int getSerializedOffset(byte[] bytes, int arrayIndex, 
boolean useShort, int indexOffset, byte serializationVersion) {
+       static int getSerializedOffset(byte[] bytes, int arrayIndex, boolean 
useShort, int indexOffset, byte serializationVersion) {
                int offset;
         if (useShort) {
             offset = indexOffset + (Bytes.SIZEOF_SHORT * arrayIndex);
@@ -514,13 +390,13 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
      */
     private byte[] createArrayBytes(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
             PhoenixArray array, int noOfElements, PDataType baseType, 
SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
-        PArrayDataTypeBytesArrayBuilder builder =
-                new PArrayDataTypeBytesArrayBuilder(byteStream, oStream, 
noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
+        PArrayDataTypeEncoder builder =
+                new PArrayDataTypeEncoder(byteStream, oStream, noOfElements, 
baseType, sortOrder, rowKeyOrderOptimizable);
         for (int i = 0; i < noOfElements; i++) {
             byte[] bytes = array.toBytes(i);
-            builder.appendElem(bytes);
+            builder.appendValue(bytes);
         }
-        return builder.getBytesAndClose();
+        return builder.encode();
     }
 
     public static boolean appendItemToArray(ImmutableBytesWritable ptr, int 
length, int offset, byte[] arrayBytes,
@@ -1211,140 +1087,4 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         buf.append(']');
         return buf.toString();
     }
-
-    static public class PArrayDataTypeBytesArrayBuilder {
-        static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
-
-        private PDataType baseType;
-        private SortOrder sortOrder;
-        private List<Integer> offsetPos;
-        private TrustedByteArrayOutputStream byteStream;
-        private DataOutputStream oStream;
-        private int nulls;
-        private byte serializationVersion;
-        private boolean rowKeyOrderOptimizable;
-
-        public PArrayDataTypeBytesArrayBuilder(PDataType baseType, SortOrder 
sortOrder) {
-            this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), 
new LinkedList<Integer>(), baseType, sortOrder, true);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, DataOutputStream oStream,
-                int numElements, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable, byte serializationVersion) {
-            this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, DataOutputStream oStream,
-                int numElements, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable) {
-            this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, 
-                List<Integer> offsetPos, PDataType baseType, SortOrder 
sortOrder, boolean rowKeyOrderOptimizable) {
-            this(byteStream, new DataOutputStream(byteStream), offsetPos, 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, DataOutputStream oStream,
-                List<Integer> offsetPos, PDataType baseType, SortOrder 
sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
-            this.baseType = baseType;
-            this.sortOrder = sortOrder;
-            this.offsetPos = offsetPos;
-            this.byteStream = byteStream;
-            this.oStream = oStream;
-            this.nulls = 0;
-            this.serializationVersion = serializationVersion;
-            this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
-        }
-
-        private void close() {
-            try {
-                if (byteStream != null) byteStream.close();
-                if (oStream != null) oStream.close();
-                byteStream = null;
-                oStream = null;
-            } catch (IOException ioe) {}
-        }
-        
-        // used to represent the absence of a value 
-        public void appendMissingElement() {
-            if (serializationVersion == 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) {
-                offsetPos.add(-byteStream.size());
-                nulls++;
-            }
-        }
-
-        public boolean appendElem(byte[] bytes) {
-            return appendElem(bytes, 0, bytes.length);
-        }
-
-        public boolean appendElem(byte[] bytes, int offset, int len) {
-            if (oStream == null || byteStream == null) return false;
-            try {
-                // track the offset position here from the size of the 
byteStream
-                if (!baseType.isFixedWidth()) {
-                    // Any variable length array would follow the below order
-                    // Every element would be seperated by a seperator byte '0'
-                    // Null elements are counted and once a first non null 
element appears we
-                    // write the count of the nulls prefixed with a seperator 
byte
-                    // Trailing nulls are not taken into account
-                    // The last non null element is followed by two seperator 
bytes
-                    // For eg
-                    // a, b, null, null, c, null would be 
-                    // 65 0 66 0 0 2 67 0 0 0
-                    // a null null null b c null d would be
-                    // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
-                    if (len == 0) {
-                        offsetPos.add(byteStream.size());
-                        nulls++;
-                    } else {
-                        nulls = serializeNulls(oStream, nulls);
-                        offsetPos.add(byteStream.size());
-                        if (sortOrder == SortOrder.DESC) {
-                            SortOrder.invert(bytes, offset, bytes, offset, 
len);
-                            offset = 0;
-                        }
-                        oStream.write(bytes, offset, len);
-                        if (serializationVersion == 
SORTABLE_SERIALIZATION_VERSION) {
-                            
oStream.write(getSeparatorByte(rowKeyOrderOptimizable, sortOrder));
-                        }
-                    }
-                } else {
-                    // No nulls for fixed length
-                    if (sortOrder == SortOrder.DESC) {
-                        SortOrder.invert(bytes, offset, bytes, offset, len);
-                        offset = 0;
-                    }
-                    oStream.write(bytes, offset, len);
-                }
-                return true;
-            } catch (IOException e) {}
-            return false;
-        }
-
-        public byte[] getBytesAndClose() {
-            try {
-                if (!baseType.isFixedWidth()) {
-                    int noOfElements = offsetPos.size();
-                    int[] offsetPosArray = new int[noOfElements];
-                    int index = 0;
-                    for (Integer i : offsetPos) {
-                        offsetPosArray[index] = i;
-                        ++index;
-                    }
-                    if (serializationVersion == 
SORTABLE_SERIALIZATION_VERSION) {
-                        // Double seperator byte to show end of the non null 
array
-                        writeEndSeperatorForVarLengthArray(oStream, sortOrder, 
rowKeyOrderOptimizable);
-                    }
-                    noOfElements = 
PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
-                            offsetPosArray[offsetPosArray.length - 1], 
offsetPosArray, serializationVersion);
-                    serializeHeaderInfoIntoStream(oStream, noOfElements, 
serializationVersion);
-                }
-                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                ptr.set(byteStream.getBuffer(), 0, byteStream.size());
-                return ByteUtil.copyKeyBytesIfNecessary(ptr);
-            } catch (IOException e) {} finally {
-                close();
-            }
-            return null;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
new file mode 100644
index 0000000..7a6ea91
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnValueDecoder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+
+
+public class PArrayDataTypeDecoder implements ColumnValueDecoder {
+    
+    @Override
+    public boolean decode(ImmutableBytesWritable ptr, int index) {
+        return PArrayDataTypeDecoder.positionAtArrayElement(ptr, index, 
PVarbinary.INSTANCE, null);
+    }
+
+    public static boolean positionAtArrayElement(Tuple tuple, 
ImmutableBytesWritable ptr, int index,
+            Expression arrayExpr, PDataType pDataType, Integer maxLen) {
+        if (!arrayExpr.evaluate(tuple, ptr)) {
+            return false;
+        } else if (ptr.getLength() == 0) { return true; }
+    
+        // Given a ptr to the entire array, set ptr to point to a particular 
element within that array
+        // given the type of an array element (see comments in 
PDataTypeForArray)
+        return positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
+    }
+
+    public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, 
int arrayIndex, PDataType baseDataType,
+            Integer byteSize) {
+        byte[] bytes = ptr.get();
+        int initPos = ptr.getOffset();
+        if (!baseDataType.isFixedWidth()) {
+               byte serializationVersion = bytes[ptr.getOffset() + 
ptr.getLength() - Bytes.SIZEOF_BYTE];
+            int noOfElements = Bytes.toInt(bytes,
+                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
+            boolean useShort = true;
+            if (noOfElements < 0) {
+                noOfElements = -noOfElements;
+                useShort = false;
+            }
+            if (arrayIndex >= noOfElements) {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                return false;
+            }
+    
+            int indexOffset = Bytes.toInt(bytes,
+                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
+            // Skip those many offsets as given in the arrayIndex
+            // If suppose there are 5 elements in the array and the arrayIndex 
= 3
+            // This means we need to read the 4th element of the array
+            // So inorder to know the length of the 4th element we will read 
the offset of 4th element and the
+            // offset of 5th element.
+            // Subtracting the offset of 5th element and 4th element will give 
the length of 4th element
+            // So we could just skip reading the other elements.
+            int currOffset = PArrayDataType.getSerializedOffset(bytes, 
arrayIndex, useShort, indexOffset, serializationVersion);
+            if (currOffset<0) {
+               ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                return false;
+            }
+            int elementLength = 0;
+            if (arrayIndex == (noOfElements - 1)) {
+                int separatorBytes =  serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 3 : 0;
+                elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
+                        - (currOffset + initPos) - separatorBytes;
+            } else {
+                int separatorByte =  serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 1 : 0;
+                elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : PArrayDataType.getOffset(bytes,
+                        arrayIndex + 1, useShort, indexOffset, 
serializationVersion) - currOffset - separatorByte;
+            }
+            ptr.set(bytes, currOffset + initPos, elementLength);
+        } else {
+            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() 
: byteSize);
+            int offset = arrayIndex * elemByteSize;
+            if (offset >= ptr.getLength()) {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            } else {
+                ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
+            }
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
new file mode 100644
index 0000000..bb293bb
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.ColumnValueEncoder;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+public class PArrayDataTypeEncoder implements ColumnValueEncoder {
+    static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
+
+    private PDataType baseType;
+    private SortOrder sortOrder;
+    private List<Integer> offsetPos;
+    private TrustedByteArrayOutputStream byteStream;
+    private DataOutputStream oStream;
+    private int nulls;
+    private byte serializationVersion;
+    private boolean rowKeyOrderOptimizable;
+
+    public PArrayDataTypeEncoder(PDataType baseType, SortOrder sortOrder) {
+        this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), new 
LinkedList<Integer>(), baseType, sortOrder, true);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
+            int numElements, PDataType baseType, SortOrder sortOrder, boolean 
rowKeyOrderOptimizable, byte serializationVersion) {
+        this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
+            int numElements, PDataType baseType, SortOrder sortOrder, boolean 
rowKeyOrderOptimizable) {
+        this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
+            List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable) {
+        this(byteStream, new DataOutputStream(byteStream), offsetPos, 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
+            List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable, byte serializationVersion) {
+        this.baseType = baseType;
+        this.sortOrder = sortOrder;
+        this.offsetPos = offsetPos;
+        this.byteStream = byteStream;
+        this.oStream = oStream;
+        this.nulls = 0;
+        this.serializationVersion = serializationVersion;
+        this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
+    }
+
+    private void close() {
+        try {
+            if (byteStream != null) byteStream.close();
+            if (oStream != null) oStream.close();
+            byteStream = null;
+            oStream = null;
+        } catch (IOException ioe) {}
+    }
+    
+    // used to represent the absence of a value 
+    @Override
+    public void appendAbsentValue() {
+        if (serializationVersion == 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) {
+            offsetPos.add(-byteStream.size());
+            nulls++;
+        }
+        else {
+            throw new UnsupportedOperationException("Cannot represent an 
absent element");
+        }
+    }
+
+    public void appendValue(byte[] bytes) {
+        appendValue(bytes, 0, bytes.length);
+    }
+
+    @Override
+    public void appendValue(byte[] bytes, int offset, int len) {
+        try {
+            // track the offset position here from the size of the byteStream
+            if (!baseType.isFixedWidth()) {
+                // Any variable length array would follow the below order
+                // Every element would be seperated by a seperator byte '0'
+                // Null elements are counted and once a first non null element 
appears we
+                // write the count of the nulls prefixed with a seperator byte
+                // Trailing nulls are not taken into account
+                // The last non null element is followed by two seperator bytes
+                // For eg
+                // a, b, null, null, c, null would be 
+                // 65 0 66 0 0 2 67 0 0 0
+                // a null null null b c null d would be
+                // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
+                if (len == 0) {
+                    offsetPos.add(byteStream.size());
+                    nulls++;
+                } else {
+                    nulls = PArrayDataType.serializeNulls(oStream, nulls);
+                    offsetPos.add(byteStream.size());
+                    if (sortOrder == SortOrder.DESC) {
+                        SortOrder.invert(bytes, offset, bytes, offset, len);
+                        offset = 0;
+                    }
+                    oStream.write(bytes, offset, len);
+                    if (serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+                        
oStream.write(PArrayDataType.getSeparatorByte(rowKeyOrderOptimizable, 
sortOrder));
+                    }
+                }
+            } else {
+                // No nulls for fixed length
+                if (sortOrder == SortOrder.DESC) {
+                    SortOrder.invert(bytes, offset, bytes, offset, len);
+                    offset = 0;
+                }
+                oStream.write(bytes, offset, len);
+            }
+        } catch (IOException e) {}
+    }
+
+    @Override
+    public byte[] encode() {
+        try {
+            if (!baseType.isFixedWidth()) {
+                int noOfElements = offsetPos.size();
+                int[] offsetPosArray = new int[noOfElements];
+                int index = 0;
+                for (Integer i : offsetPos) {
+                    offsetPosArray[index] = i;
+                    ++index;
+                }
+                if (serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+                    // Double seperator byte to show end of the non null array
+                    PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, 
sortOrder, rowKeyOrderOptimizable);
+                }
+                noOfElements = 
PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
+                        offsetPosArray[offsetPosArray.length - 1], 
offsetPosArray, serializationVersion);
+                PArrayDataType.serializeHeaderInfoIntoStream(oStream, 
noOfElements, serializationVersion);
+            }
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            ptr.set(byteStream.getBuffer(), 0, byteStream.size());
+            return ByteUtil.copyKeyBytesIfNecessary(ptr);
+        } catch (IOException e) {} finally {
+            close();
+        }
+        return null;
+    }
+    
+}
\ No newline at end of file

Reply via email to