http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
new file mode 100644
index 0000000..5a5b355
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static 
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+
+/**
+ * List implementation that provides indexed based look up when the cell 
column qualifiers are positive numbers. 
+ * These qualifiers are generated by using one of the column qualifier 
encoding schemes specified in {@link ImmutableStorageScheme}. 
+ * The api methods in this list assume that the caller wants to see
+ * and add only non null elements in the list. 
+ * <p>
+ * Please note that this implementation doesn't implement all the optional 
methods of the 
+ * {@link List} interface. Such unsupported methods could violate the basic 
invariance of the list that every cell with
+ * an encoded column qualifier has a fixed position in the list.
+ * </p>
+ * <p>
+ * An important performance characteristic of this list is that doing look up 
on the basis of index via {@link #get(int)}
+ * is an O(n) operation. This makes iterating through the list using {@link 
#get(int)} an O(n^2) operation.
+ * Instead, for iterating through the list, one should use the iterators 
created through {@link #iterator()} or 
+ * {@link #listIterator()}. Do note that getting an element using {@link 
#getCellForColumnQualifier(int)} is an O(1) operation
+ * and should generally be the way for accessing elements in the list.
+ * </p> 
+ */
+@NotThreadSafe
+public class EncodedColumnQualiferCellsList implements List<Cell> {
+
+    private int minQualifier;
+    private int maxQualifier;
+    private int nonReservedRangeOffset;
+    private final Cell[] array;
+    private int numNonNullElements;
+    private int firstNonNullElementIdx = -1;
+    private static final int RESERVED_RANGE_SIZE = 
ENCODED_CQ_COUNTER_INITIAL_VALUE - ENCODED_EMPTY_COLUMN_NAME;
+    // Used by iterators to figure out if the list was structurally modified.
+    private int modCount = 0;
+    private final QualifierEncodingScheme encodingScheme;
+
+    public EncodedColumnQualiferCellsList(int minQ, int maxQ, 
QualifierEncodingScheme encodingScheme) {
+        checkArgument(minQ <= maxQ, "Invalid arguments. Min: " + minQ
+                + ". Max: " + maxQ);
+        this.minQualifier = minQ;
+        this.maxQualifier = maxQ;
+        int size = 0;
+        if (maxQ < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+            size = RESERVED_RANGE_SIZE;
+        } else if (minQ < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+            size = (maxQ - minQ + 1);
+        } else {
+            size = RESERVED_RANGE_SIZE + (maxQ - minQ + 1);
+        }
+        this.array = new Cell[size];
+        this.nonReservedRangeOffset = minQ > ENCODED_CQ_COUNTER_INITIAL_VALUE 
? minQ  - ENCODED_CQ_COUNTER_INITIAL_VALUE : 0;
+        this.encodingScheme = encodingScheme;
+    }
+
+    @Override
+    public int size() {
+        return numNonNullElements;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return numNonNullElements == 0;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return indexOf(o) >= 0;
+    }
+
+    @Override
+    public Object[] toArray() {
+        Object[] toReturn = new Object[numNonNullElements];
+        int counter = 0;
+        if (numNonNullElements > 0) {
+            for (int i = 0; i < array.length; i++) {
+                if (array[i] != null) {
+                    toReturn[counter++] = array[i];
+                }
+            }
+        }
+        return toReturn;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T[] toArray(T[] a) {
+        T[] toReturn =
+                (T[]) 
java.lang.reflect.Array.newInstance(a.getClass().getComponentType(),
+                    numNonNullElements);
+        int counter = 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != null) {
+                toReturn[counter++] = (T) array[i];
+            }
+        }
+        return toReturn;
+    }
+
+    @Override
+    public boolean add(Cell e) {
+        if (e == null) {
+            throw new NullPointerException();
+        }
+        int columnQualifier = encodingScheme.decode(e.getQualifierArray(), 
e.getQualifierOffset(), e.getQualifierLength());
+                
+        checkQualifierRange(columnQualifier);
+        int idx = getArrayIndex(columnQualifier);
+        if (array[idx] == null) {
+            numNonNullElements++;
+        }
+        array[idx] = e;
+        if (firstNonNullElementIdx == -1) {
+            firstNonNullElementIdx = idx;
+        } else if (idx < firstNonNullElementIdx) {
+            firstNonNullElementIdx = idx;
+        }
+        modCount++;
+        /*
+         * Note that we don't care about equality of the element being added 
with the element
+         * already present at the index.
+         */
+        return true;
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        if (o == null) {
+            return false;
+        }
+        Cell e = (Cell) o;
+        int i = 0;
+        while (i < array.length) {
+            if (array[i] != null && array[i].equals(e)) {
+                array[i] = null;
+                numNonNullElements--;
+                if (numNonNullElements == 0) {
+                    firstNonNullElementIdx = -1;
+                } else if (firstNonNullElementIdx == i) {
+                    // the element being removed was the first non-null 
element we knew
+                    while (i < array.length && (array[i]) == null) {
+                        i++;
+                    }
+                    if (i < array.length) {
+                        firstNonNullElementIdx = i;
+                    } else {
+                        firstNonNullElementIdx = -1;
+                    }
+                }
+                modCount++;
+                return true;
+            }
+            i++;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        boolean containsAll = true;
+        Iterator<?> itr = c.iterator();
+        while (itr.hasNext()) {
+            containsAll &= (indexOf(itr.next()) >= 0);
+        }
+        return containsAll;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends Cell> c) {
+        boolean changed = false;
+        for (Cell cell : c) {
+            if (c == null) {
+                throw new NullPointerException();
+            }
+            changed |= add(cell);
+        }
+        return changed;
+    }
+
+    @Override
+    public boolean addAll(int index, Collection<? extends Cell> c) {
+        throwGenericUnsupportedOperationException();
+        return false;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        Iterator<?> itr = c.iterator();
+        boolean changed = false;
+        while (itr.hasNext()) {
+            changed |= remove(itr.next());
+        }
+        return changed;
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> collection) {
+        boolean changed = false;
+        // Optimize if the passed collection is an instance of 
EncodedColumnQualiferCellsList
+        if (collection instanceof EncodedColumnQualiferCellsList) {
+            EncodedColumnQualiferCellsList list = 
(EncodedColumnQualiferCellsList) collection;
+            ListIterator<Cell> listItr = this.listIterator();
+            while (listItr.hasNext()) {
+                Cell cellInThis = listItr.next();
+                int qualifier = 
encodingScheme.decode(cellInThis.getQualifierArray(),
+                            cellInThis.getQualifierOffset(), 
cellInThis.getQualifierLength());
+                try {
+                    Cell cellInParam = 
list.getCellForColumnQualifier(qualifier);
+                    if (cellInParam != null && cellInParam.equals(cellInThis)) 
{
+                        continue;
+                    }
+                    listItr.remove();
+                    changed = true;
+                } catch (IndexOutOfBoundsException expected) {
+                    // this could happen when the qualifier of cellInParam 
lies out of
+                    // the range of this list.
+                    listItr.remove();
+                    changed = true;
+                }
+            }
+        } else {
+            throw new UnsupportedOperationException(
+                    "Operation only supported for collections of type 
EncodedColumnQualiferCellsList");
+        }
+        return changed;
+    }
+
+    @Override
+    public void clear() {
+        for (int i = 0; i < array.length; i++) {
+            array[i] = null;
+        }
+        firstNonNullElementIdx = -1;
+        numNonNullElements = 0;
+        modCount++;
+    }
+
+    @Override
+    public Cell get(int index) {
+        rangeCheck(index);
+        int numNonNullElementsFound = 0;
+        for (int i = firstNonNullElementIdx; i < array.length; i++) {
+            if (array[i] != null) {
+                numNonNullElementsFound++;
+                if (numNonNullElementsFound == index + 1) {
+                    return array[i];
+                }
+            }
+        }
+        throw new IllegalStateException("There was no element present in the 
list at index "
+                + index + " even though number of elements in the list are " + 
size());
+    }
+
+    @Override
+    public Cell set(int index, Cell e) {
+        throwGenericUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public void add(int index, Cell element) {
+        throwGenericUnsupportedOperationException();
+    }
+
+    @Override
+    public Cell remove(int index) {
+        throwGenericUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public int indexOf(Object o) {
+        if (o == null || isEmpty()) {
+            return -1;
+        } else {
+            int numNonNull = -1;
+            for (int i = 0; i < array.length; i++) {
+                if (array[i] != null) {
+                    numNonNull++;
+                }
+                if (o.equals(array[i])) {
+                    return numNonNull;
+                }
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public int lastIndexOf(Object o) {
+        if (o == null || isEmpty()) {
+            return -1;
+        }
+        int lastIndex = numNonNullElements;
+        for (int i = array.length - 1; i >= 0; i--) {
+            if (array[i] != null) {
+                lastIndex--;
+            }
+            if (o.equals(array[i])) {
+                return lastIndex;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public ListIterator<Cell> listIterator() {
+        return new ListItr();
+    }
+
+    @Override
+    public ListIterator<Cell> listIterator(int index) {
+        throwGenericUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public List<Cell> subList(int fromIndex, int toIndex) {
+        throwGenericUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public Iterator<Cell> iterator() {
+        return new Itr();
+    }
+
+    public Cell getCellForColumnQualifier(byte[] qualifierBytes) {
+        int columnQualifier = encodingScheme.decode(qualifierBytes);
+        return getCellForColumnQualifier(columnQualifier);
+    }
+    
+    public Cell getCellForColumnQualifier(byte[] qualifierBytes, int offset, 
int length) {
+        int columnQualifier = encodingScheme.decode(qualifierBytes, offset, 
length);
+        return getCellForColumnQualifier(columnQualifier);
+    }
+    
+    private Cell getCellForColumnQualifier(int columnQualifier) {
+        checkQualifierRange(columnQualifier);
+        int idx = getArrayIndex(columnQualifier);
+        Cell c = array[idx];
+        return c;
+    }
+
+    public Cell getFirstCell() {
+        if (firstNonNullElementIdx == -1) {
+            throw new NoSuchElementException("No elements present in the 
list");
+        }
+        return array[firstNonNullElementIdx];
+    }
+
+    private void checkQualifierRange(int qualifier) {
+        if (qualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+            return; // space in the array for reserved range is always 
allocated. 
+        }
+        if (qualifier < minQualifier || qualifier > maxQualifier) {
+            throw new IndexOutOfBoundsException("Qualifier " + qualifier
+                    + " is out of the valid range - (" + minQualifier + ", " + 
maxQualifier + ")");
+        }
+    }
+
+    private void rangeCheck(int index) {
+        if (index < 0 || index >= size()) {
+            throw new IndexOutOfBoundsException();
+        }
+    }
+
+    private int getArrayIndex(int columnQualifier) {
+        checkArgument(columnQualifier >= ENCODED_EMPTY_COLUMN_NAME);
+        if (columnQualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+            return columnQualifier;
+        }
+        return columnQualifier - nonReservedRangeOffset;
+    }
+
+    private void throwGenericUnsupportedOperationException() {
+        throw new UnsupportedOperationException(
+                "Operation cannot be supported because it potentially violates 
the invariance contract of this list implementation");
+    }
+
+    private class Itr implements Iterator<Cell> {
+        protected int nextIndex = 0;
+        protected int lastRet = -1;
+        protected int expectedModCount = modCount;
+        
+        private Itr() {
+            moveForward(true);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return nextIndex != -1;
+        }
+
+        @Override
+        public Cell next() {
+            checkForCoModification();
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            Cell next = array[nextIndex];
+            lastRet = nextIndex;
+            moveForward(false);
+            modCount++;
+            expectedModCount = modCount;
+            return next;
+        }
+
+        @Override
+        public void remove() {
+            if (lastRet < 0) {
+                throw new IllegalStateException();
+            }
+            checkForCoModification();
+            array[lastRet] = null;
+            lastRet = -1;
+            numNonNullElements--;
+            modCount++;
+            expectedModCount = modCount;
+        }
+
+        protected void moveForward(boolean init) {
+            int i = init ? 0 : nextIndex + 1;
+            while (i < array.length && (array[i]) == null) {
+                i++;
+            }
+            if (i < array.length) {
+                nextIndex = i;
+            } else {
+                nextIndex = -1;
+            }
+        }
+        
+        protected void checkForCoModification() {
+            if (modCount != expectedModCount) {
+                throw new ConcurrentModificationException();
+            }
+        }
+
+    }
+
+    private class ListItr extends Itr implements ListIterator<Cell> {
+        private int previousIndex = -1;
+        
+        private ListItr() {
+            moveForward(true);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return nextIndex != -1;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+            return previousIndex != -1;
+        }
+
+        @Override
+        public Cell previous() {
+            if (previousIndex == -1) {
+                throw new NoSuchElementException();
+            }
+            checkForCoModification();
+            lastRet = previousIndex;
+            movePointersBackward();
+            return array[lastRet];
+        }
+
+        @Override
+        public int nextIndex() {
+            return nextIndex;
+        }
+
+        @Override
+        public int previousIndex() {
+            return previousIndex;
+        }
+
+        @Override
+        public void remove() {
+            if (lastRet == nextIndex) {
+                moveNextPointer(nextIndex);
+            }
+            super.remove();
+            expectedModCount = modCount;
+        }
+
+        @Override
+        public void set(Cell e) {
+            if (lastRet == -1) {
+                throw new IllegalStateException();
+            }
+            int columnQualifier = encodingScheme.decode(e.getQualifierArray(), 
e.getQualifierOffset(), e.getQualifierLength());                    
+            int idx = getArrayIndex(columnQualifier);
+            if (idx != lastRet) {
+                throw new IllegalArgumentException("Cell " + e + " with column 
qualifier "
+                        + columnQualifier + " belongs at index " + idx
+                        + ". It cannot be added at the position " + lastRet
+                        + " to which the previous next() or previous() was 
pointing to.");
+            }
+            EncodedColumnQualiferCellsList.this.add(e);
+            expectedModCount = modCount;
+        }
+
+        @Override
+        public void add(Cell e) {
+            throwGenericUnsupportedOperationException();
+        }
+        
+        @Override
+        protected void moveForward(boolean init) {
+            if (!init) {
+                previousIndex = nextIndex;
+            }
+            int i = init ? 0 : nextIndex + 1; 
+            moveNextPointer(i);
+        }
+
+        private void moveNextPointer(int i) {
+            while (i < array.length && (array[i]) == null) {
+                i++;
+            }
+            if (i < array.length) {
+                nextIndex = i;
+            } else {
+                nextIndex = -1;
+            }
+        }
+
+        private void movePointersBackward() {
+            nextIndex = previousIndex;
+            int i = previousIndex - 1;
+            movePreviousPointer(i);
+        }
+
+        private void movePreviousPointer(int i) {
+            for (; i >= 0; i--) {
+                if (array[i] != null) {
+                    previousIndex = i;
+                    break;
+                }
+            }
+            if (i < 0) {
+                previousIndex = -1;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
index 53f155b..d946870 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
@@ -36,6 +36,7 @@ public class MultiKeyValueTuple extends BaseTuple {
     }
 
     /** Caller must not modify the list that is passed here */
+    @Override
     public void setKeyValues(List<Cell> values) {
         this.values = values;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
new file mode 100644
index 0000000..01a5e4d
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Tuple that uses the 
+ */
+public class PositionBasedMultiKeyValueTuple extends BaseTuple {
+    private EncodedColumnQualiferCellsList values;
+
+    public PositionBasedMultiKeyValueTuple() {}
+    
+    public PositionBasedMultiKeyValueTuple(List<Cell> values) {
+        checkArgument(values instanceof EncodedColumnQualiferCellsList, 
"PositionBasedMultiKeyValueTuple only works with lists of type 
EncodedColumnQualiferCellsList");
+        this.values = (EncodedColumnQualiferCellsList)values;
+    }
+    
+    /** Caller must not modify the list that is passed here */
+    @Override
+    public void setKeyValues(List<Cell> values) {
+        checkArgument(values instanceof EncodedColumnQualiferCellsList, 
"PositionBasedMultiKeyValueTuple only works with lists of type 
EncodedColumnQualiferCellsList");
+        this.values = (EncodedColumnQualiferCellsList)values;
+    }
+
+    @Override
+    public void getKey(ImmutableBytesWritable ptr) {
+        Cell value = values.getFirstCell();
+        ptr.set(value.getRowArray(), value.getRowOffset(), 
value.getRowLength());
+    }
+
+    @Override
+    public boolean isImmutable() {
+        return true;
+    }
+
+    @Override
+    public Cell getValue(byte[] family, byte[] qualifier) {
+        return values.getCellForColumnQualifier(qualifier);
+    }
+
+    @Override
+    public String toString() {
+        return values.toString();
+    }
+
+    @Override
+    public int size() {
+        return values.size();
+    }
+
+    @Override
+    public Cell getValue(int index) {
+        return values.get(index);
+    }
+
+    @Override
+    public boolean getValue(byte[] family, byte[] qualifier,
+            ImmutableBytesWritable ptr) {
+        Cell kv = getValue(family, qualifier);
+        if (kv == null)
+            return false;
+        ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+        return true;
+    }}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
new file mode 100644
index 0000000..63ba101
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+
+public class PositionBasedResultTuple extends BaseTuple {
+    private final EncodedColumnQualiferCellsList cells;
+    
+    public PositionBasedResultTuple(List<Cell> list) {
+        checkArgument(list instanceof EncodedColumnQualiferCellsList, "Invalid 
list type");
+        this.cells = (EncodedColumnQualiferCellsList)list;
+    }
+    
+    @Override
+    public void getKey(ImmutableBytesWritable ptr) {
+        Cell value = cells.getFirstCell();
+        ptr.set(value.getRowArray(), value.getRowOffset(), 
value.getRowLength());
+    }
+
+    @Override
+    public boolean isImmutable() {
+        return true;
+    }
+
+    @Override
+    public KeyValue getValue(byte[] family, byte[] qualifier) {
+        return 
org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cells.getCellForColumnQualifier(qualifier));
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("keyvalues=");
+      if(this.cells == null || this.cells.isEmpty()) {
+        sb.append("NONE");
+        return sb.toString();
+      }
+      sb.append("{");
+      boolean moreThanOne = false;
+      for(Cell kv : this.cells) {
+        if(moreThanOne) {
+          sb.append(", \n");
+        } else {
+          moreThanOne = true;
+        }
+        sb.append(kv.toString()+"/value="+Bytes.toString(kv.getValueArray(), 
+          kv.getValueOffset(), kv.getValueLength()));
+      }
+      sb.append("}\n");
+      return sb.toString();
+    }
+
+    @Override
+    public int size() {
+        return cells.size();
+    }
+
+    @Override
+    public KeyValue getValue(int index) {
+        return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(index == 0 
? cells.getFirstCell() : cells.get(index));
+    }
+
+    @Override
+    public boolean getValue(byte[] family, byte[] qualifier,
+            ImmutableBytesWritable ptr) {
+        KeyValue kv = getValue(family, qualifier);
+        if (kv == null)
+            return false;
+        ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+        return true;
+    }
+    
+    public Iterator<Cell> getTupleIterator() {
+        return new TupleIterator(cells.iterator());
+    }
+    
+    private static class TupleIterator implements Iterator<Cell> {
+        
+        private final Iterator<Cell> delegate;
+        private TupleIterator(Iterator<Cell> delegate) {
+            this.delegate = delegate;
+        }
+        
+        @Override
+        public boolean hasNext() {
+            return delegate.hasNext();
+        }
+
+        @Override
+        public Cell next() {
+            return delegate.next();
+        }
+
+        @Override
+        public void remove() {
+            delegate.remove();
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
index c28a2bf..3774837 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import java.util.Collections;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
@@ -25,25 +27,23 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.util.KeyValueUtil;
 
-
+/**
+ * 
+ * Wrapper around {@link Result} that implements Phoenix's {@link Tuple} 
interface.
+ *
+ */
 public class ResultTuple extends BaseTuple {
-    private Result result;
+    private final Result result;
+    public static final ResultTuple EMPTY_TUPLE = new 
ResultTuple(Result.create(Collections.<Cell>emptyList()));
     
     public ResultTuple(Result result) {
         this.result = result;
     }
     
-    public ResultTuple() {
-    }
-    
     public Result getResult() {
         return this.result;
     }
 
-    public void setResult(Result result) {
-        this.result = result;
-    }
-    
     @Override
     public void getKey(ImmutableBytesWritable ptr) {
         ptr.set(result.getRow());
@@ -104,4 +104,4 @@ public class ResultTuple extends BaseTuple {
         ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
         return true;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
index 61b2a4f..e4a887b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import java.util.List;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
@@ -87,4 +89,6 @@ public interface Tuple {
      * @return the current or next sequence value
      */
     public long getSequenceValue(int index);
+    
+    public void setKeyValues(List<Cell> values);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
index 1d2cfb2..f31f272 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
@@ -22,19 +22,15 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.Types;
 import java.text.Format;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema;
-import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
@@ -74,8 +70,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> 
{
           this, actualModifer, desiredModifier, true);
     }
 
-    public static final byte ARRAY_SERIALIZATION_VERSION = 1;
-
+    // array serialization format where bytes can be used as part of the row 
key
+    public static final byte SORTABLE_SERIALIZATION_VERSION = 1;
+    // array serialization format where bytes are immutable (does not support 
prepend/append or sorting)
+    public static final byte IMMUTABLE_SERIALIZATION_VERSION = 2;
+    
     protected PArrayDataType(String sqlTypeName, int sqlType, Class clazz, 
PDataCodec codec, int ordinal) {
         super(sqlTypeName, sqlType, clazz, codec, ordinal);
     }
@@ -186,9 +185,17 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         oStream.write(sepByte);
     }
 
-    public static boolean useShortForOffsetArray(int maxOffset) {
-        // If the max offset is less than Short.MAX_VALUE then offset array 
can use short
-        if (maxOffset <= (2 * Short.MAX_VALUE)) { return true; }
+    // this method is only for append/prepend/concat operations which are only 
supported for the SORTABLE_SERIALIZATION_VERSION
+    public static boolean useShortForOffsetArray(int maxoffset) {
+       return useShortForOffsetArray(maxoffset, 
SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public static boolean useShortForOffsetArray(int maxoffset, byte 
serializationVersion) {
+       if (serializationVersion == IMMUTABLE_SERIALIZATION_VERSION) {
+                return (maxoffset <= Short.MAX_VALUE && maxoffset >= 
Short.MIN_VALUE );
+       }
+       // If the max offset is less than Short.MAX_VALUE then offset array can 
use short
+       else if (maxoffset <= (2 * Short.MAX_VALUE)) { return true; }
         // else offset array can use Int
         return false;
     }
@@ -342,126 +349,20 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         return createPhoenixArray(bytes, offset, length, sortOrder, baseType, 
maxLength, desiredDataType);
     }
 
-    public static boolean positionAtArrayElement(Tuple tuple, 
ImmutableBytesWritable ptr, int index,
-            Expression arrayExpr, PDataType pDataType, Integer maxLen) {
-        if (!arrayExpr.evaluate(tuple, ptr)) {
-            return false;
-        } else if (ptr.getLength() == 0) { return true; }
-
-        // Given a ptr to the entire array, set ptr to point to a particular 
element within that array
-        // given the type of an array element (see comments in 
PDataTypeForArray)
-        positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
-        return true;
-    }
-
-    public static void positionAtArrayElement(ImmutableBytesWritable ptr, int 
arrayIndex, PDataType baseDataType,
-            Integer byteSize) {
-        byte[] bytes = ptr.get();
-        int initPos = ptr.getOffset();
-        if (!baseDataType.isFixedWidth()) {
-            int noOfElements = Bytes.toInt(bytes,
-                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
-            boolean useShort = true;
-            if (noOfElements < 0) {
-                noOfElements = -noOfElements;
-                useShort = false;
-            }
-            if (arrayIndex >= noOfElements) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                return;
-            }
-
-            int indexOffset = Bytes.toInt(bytes,
-                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
-            if (arrayIndex >= noOfElements) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-            } else {
-                // Skip those many offsets as given in the arrayIndex
-                // If suppose there are 5 elements in the array and the 
arrayIndex = 3
-                // This means we need to read the 4th element of the array
-                // So inorder to know the length of the 4th element we will 
read the offset of 4th element and the
-                // offset of 5th element.
-                // Subtracting the offset of 5th element and 4th element will 
give the length of 4th element
-                // So we could just skip reading the other elements.
-                int currOffset = getOffset(bytes, arrayIndex, useShort, 
indexOffset);
-                int elementLength = 0;
-                if (arrayIndex == (noOfElements - 1)) {
-                    elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
-                            - (currOffset + initPos) - 3;
-                } else {
-                    elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset) - 
currOffset - 1;
-                }
-                ptr.set(bytes, currOffset + initPos, elementLength);
-            }
-        } else {
-            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() 
: byteSize);
-            int offset = arrayIndex * elemByteSize;
-            if (offset >= ptr.getLength()) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-            } else {
-                ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
-            }
-        }
-    }
-
-    public static void positionAtArrayElement(ImmutableBytesWritable ptr, int 
arrayIndex, PDataType baseDataType,
-            Integer byteSize, int offset, int length, int noOfElements, 
boolean first) {
-        byte[] bytes = ptr.get();
-        if (!baseDataType.isFixedWidth()) {
-            int indexOffset = Bytes.toInt(bytes, (offset + length - 
(Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT)))
-                    + offset;
-            boolean useShort = true;
-            if (first) {
-                int count = Bytes.toInt(bytes,
-                        (ptr.getOffset() + ptr.getLength() - 
(Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
-                if (count < 0) {
-                    count = -count;
-                    useShort = false;
-                }
-            }
-            if (arrayIndex >= noOfElements) {
-                return;
-            } else {
-                // Skip those many offsets as given in the arrayIndex
-                // If suppose there are 5 elements in the array and the 
arrayIndex = 3
-                // This means we need to read the 4th element of the array
-                // So inorder to know the length of the 4th element we will 
read the offset of 4th element and the
-                // offset of 5th element.
-                // Subtracting the offset of 5th element and 4th element will 
give the length of 4th element
-                // So we could just skip reading the other elements.
-                int currOffset = getOffset(bytes, arrayIndex, useShort, 
indexOffset);
-                int elementLength = 0;
-                if (arrayIndex == (noOfElements - 1)) {
-                    elementLength = (bytes[currOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
-                            - (currOffset + offset) - 3;
-                } else {
-                    elementLength = (bytes[currOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset) - 
currOffset - 1;
-                }
-                ptr.set(bytes, currOffset + offset, elementLength);
-            }
-        } else {
-            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() 
: byteSize);
-            offset += arrayIndex * elemByteSize;
-            if (offset >= offset + length) {
-                return;
-            } else {
-                ptr.set(bytes, offset, elemByteSize);
-            }
-        }
+    static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int 
indexOffset, byte serializationVersion) {
+        return Math.abs(getSerializedOffset(bytes, arrayIndex, useShort, 
indexOffset, serializationVersion));
     }
 
-    private static int getOffset(byte[] bytes, int arrayIndex, boolean 
useShort, int indexOffset) {
-        int offset;
+       static int getSerializedOffset(byte[] bytes, int arrayIndex, boolean 
useShort, int indexOffset, byte serializationVersion) {
+               int offset;
         if (useShort) {
             offset = indexOffset + (Bytes.SIZEOF_SHORT * arrayIndex);
-            return Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT) + 
Short.MAX_VALUE;
+            return Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT) + 
(serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION ? 0 : 
Short.MAX_VALUE);
         } else {
             offset = indexOffset + (Bytes.SIZEOF_INT * arrayIndex);
             return Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
         }
-    }
+       }
 
     private static int getOffset(ByteBuffer indexBuffer, int arrayIndex, 
boolean useShort, int indexOffset) {
         int offset;
@@ -484,58 +385,18 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
     }
 
     /**
-     * creates array bytes
+     * creates array bytes using the SORTABLE_SERIALIZATION_VERSION format
      * @param rowKeyOrderOptimizable TODO
      */
     private byte[] createArrayBytes(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
             PhoenixArray array, int noOfElements, PDataType baseType, 
SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
-        try {
-            if (!baseType.isFixedWidth()) {
-                int[] offsetPos = new int[noOfElements];
-                int nulls = 0;
-                for (int i = 0; i < noOfElements; i++) {
-                    byte[] bytes = array.toBytes(i);
-                    if (bytes.length == 0) {
-                        offsetPos[i] = byteStream.size();
-                        nulls++;
-                    } else {
-                        nulls = serializeNulls(oStream, nulls);
-                        offsetPos[i] = byteStream.size();
-                        if (sortOrder == SortOrder.DESC) {
-                            SortOrder.invert(bytes, 0, bytes, 0, bytes.length);
-                        }
-                        oStream.write(bytes, 0, bytes.length);
-                        oStream.write(getSeparatorByte(rowKeyOrderOptimizable, 
sortOrder));
-                    }
-                }
-                // Double seperator byte to show end of the non null array
-                writeEndSeperatorForVarLengthArray(oStream, sortOrder, 
rowKeyOrderOptimizable);
-                noOfElements = 
PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
-                        offsetPos[offsetPos.length - 1], offsetPos);
-                serializeHeaderInfoIntoStream(oStream, noOfElements);
-            } else {
-                for (int i = 0; i < noOfElements; i++) {
-                    byte[] bytes = array.toBytes(i);
-                    int length = bytes.length;
-                    if (sortOrder == SortOrder.DESC) {
-                        SortOrder.invert(bytes, 0, bytes, 0, bytes.length);
-                    }
-                    oStream.write(bytes, 0, length);
-                }
-            }
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            ptr.set(byteStream.getBuffer(), 0, byteStream.size());
-            return ByteUtil.copyKeyBytesIfNecessary(ptr);
-        } catch (IOException e) {
-            try {
-                byteStream.close();
-                oStream.close();
-            } catch (IOException ioe) {
-
-            }
+        PArrayDataTypeEncoder builder =
+                new PArrayDataTypeEncoder(byteStream, oStream, noOfElements, 
baseType, sortOrder, rowKeyOrderOptimizable);
+        for (int i = 0; i < noOfElements; i++) {
+            byte[] bytes = array.toBytes(i);
+            builder.appendValue(bytes);
         }
-        // This should not happen
-        return null;
+        return builder.encode();
     }
 
     public static boolean appendItemToArray(ImmutableBytesWritable ptr, int 
length, int offset, byte[] arrayBytes,
@@ -557,7 +418,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
 
         byte[] newArray;
         if (!baseType.isFixedWidth()) {
-
+               byte serializationVersion = arrayBytes[offset + length - 
Bytes.SIZEOF_BYTE];
             int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length 
- Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
                     - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
             int offsetArrayLength = length - offsetArrayPosition - 
Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
@@ -612,7 +473,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                     int off = newOffsetArrayPosition;
                     for (int arrayIndex = 0; arrayIndex < 
Math.abs(arrayLength) - 1; arrayIndex++) {
                         Bytes.putInt(newArray, off,
-                                getOffset(arrayBytes, arrayIndex, true, 
offsetArrayPosition + offset));
+                                getOffset(arrayBytes, arrayIndex, true, 
offsetArrayPosition + offset, serializationVersion));
                         off += Bytes.SIZEOF_INT;
                     }
 
@@ -659,6 +520,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
 
         byte[] newArray;
         if (!baseType.isFixedWidth()) {
+               byte serializationVersion = arrayBytes[offset + length - 
Bytes.SIZEOF_BYTE];
             int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length 
- Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
                     - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
             int offsetArrayLength = length - offsetArrayPosition - 
Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
@@ -668,7 +530,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             // checks whether offset array consists of shorts or integers
             boolean useInt = offsetArrayLength / arrayLength == 
Bytes.SIZEOF_INT;
             boolean convertToInt = false;
-            int endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset)
+            int endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset, serializationVersion)
                     + elementLength + Bytes.SIZEOF_BYTE;
             int newOffsetArrayPosition;
             int lengthIncrease;
@@ -679,7 +541,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 int nulls = 1;
                 // counts the number of nulls which are already at the 
beginning of the array
                 for (int index = 0; index < arrayLength; index++) {
-                    int currOffset = getOffset(arrayBytes, index, !useInt, 
offsetArrayPosition + offset);
+                    int currOffset = getOffset(arrayBytes, index, !useInt, 
offsetArrayPosition + offset, serializationVersion);
                     if (arrayBytes[offset + currOffset] == 
QueryConstants.SEPARATOR_BYTE) {
                         nulls++;
                     } else {
@@ -709,7 +571,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // ex: initial array - 0 45(inverted) 65 0 66 0 0 0 after 
prepending null - 0 46(inverted) 65 0 66 0 0 0
                 lengthIncrease = nRemainingNulls == 1 ? (nMultiplesOver255 == 
0 ? 2 * Bytes.SIZEOF_BYTE
                         : Bytes.SIZEOF_BYTE) : 0;
-                endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset)
+                endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset, serializationVersion)
                         + lengthIncrease;
                 if (!useInt) {
                     if 
(PArrayDataType.useShortForOffsetArray(endElementPosition)) {
@@ -785,8 +647,9 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
 
         currentPosition += offsetArrayElementSize;
         boolean nullsAtBeginning = true;
+        byte serializationVersion = arrayBytes[offset + length - 
Bytes.SIZEOF_BYTE];
         for (int arrayIndex = 0; arrayIndex < arrayLength - 1; arrayIndex++) {
-            int oldOffset = getOffset(arrayBytes, arrayIndex, 
useShortPrevious, offsetArrayPosition + offset);
+            int oldOffset = getOffset(arrayBytes, arrayIndex, 
useShortPrevious, offsetArrayPosition + offset, serializationVersion);
             if (arrayBytes[offset + oldOffset] == 
QueryConstants.SEPARATOR_BYTE && nullsAtBeginning) {
                 if (useShortNew) {
                     Bytes.putShort(newArray, currentPosition, 
(short)(oldOffset - Short.MAX_VALUE));
@@ -820,6 +683,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         byte[] newArray;
 
         if (!baseType.isFixedWidth()) {
+               byte serializationVersion1 = array1Bytes[array1BytesOffset + 
array1BytesLength - Bytes.SIZEOF_BYTE];
             int offsetArrayPositionArray1 = Bytes.toInt(array1Bytes, 
array1BytesOffset + array1BytesLength
                     - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, 
Bytes.SIZEOF_INT);
             int offsetArrayPositionArray2 = Bytes.toInt(array2Bytes, 
array2BytesOffset + array2BytesLength
@@ -837,7 +701,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             boolean useIntNewArray = false;
             // count nulls at the end of array 1
             for (int index = actualLengthOfArray1 - 1; index > -1; index--) {
-                int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset + offsetArrayPositionArray1);
+                int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset + offsetArrayPositionArray1, serializationVersion1);
                 if (array1Bytes[array1BytesOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || array1Bytes[array1BytesOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) {
                     nullsAtTheEndOfArray1++;
                 } else {
@@ -847,8 +711,9 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             // count nulls at the beginning of the array 2
             int array2FirstNonNullElementOffset = 0;
             int array2FirstNonNullIndex = 0;
+            byte serializationVersion2 = array2Bytes[array2BytesOffset + 
array2BytesLength - Bytes.SIZEOF_BYTE];
             for (int index = 0; index < actualLengthOfArray2; index++) {
-                int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset + offsetArrayPositionArray2);
+                int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset + offsetArrayPositionArray2, serializationVersion2);
                 if (array2Bytes[array2BytesOffset + offset] == 
QueryConstants.SEPARATOR_BYTE) {
                     nullsAtTheBeginningOfArray2++;
                 } else {
@@ -870,7 +735,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             int newOffsetArrayPosition = offsetArrayPositionArray1 + 
offsetArrayPositionArray2 + lengthIncreaseForNulls
                     - 2 * Bytes.SIZEOF_BYTE;
             int endElementPositionOfArray2 = getOffset(array2Bytes, 
actualLengthOfArray2 - 1, !useIntArray2,
-                    array2BytesOffset + offsetArrayPositionArray2);
+                    array2BytesOffset + offsetArrayPositionArray2, 
serializationVersion2);
             int newEndElementPosition = lengthIncreaseForNulls + 
endElementPositionOfArray2 + offsetArrayPositionArray1
                     - 2 * Bytes.SIZEOF_BYTE;
             // Creates a byte array to store the concatenated array
@@ -902,14 +767,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // offsets for the elements from array 1. Simply copied.
                 for (int index = 0; index < actualLengthOfArray1; index++) {
                     int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset
-                            + offsetArrayPositionArray1);
+                            + offsetArrayPositionArray1, 
serializationVersion1);
                     Bytes.putInt(newArray, currentPosition, offset);
                     currentPosition += Bytes.SIZEOF_INT;
                 }
                 // offsets for nulls in the middle
                 for (int index = 0; index < array2FirstNonNullIndex; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putInt(newArray, currentPosition, offset + 
array2StartingPosition);
                     currentPosition += Bytes.SIZEOF_INT;
                 }
@@ -918,7 +783,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                         + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
                 for (int index = array2FirstNonNullIndex; index < 
actualLengthOfArray2; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putInt(newArray, currentPosition, offset - 
array2FirstNonNullElementOffset
                             + part2NonNullStartingPosition);
                     currentPosition += Bytes.SIZEOF_INT;
@@ -927,14 +792,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // offsets for the elements from array 1. Simply copied.
                 for (int index = 0; index < actualLengthOfArray1; index++) {
                     int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset
-                            + offsetArrayPositionArray1);
+                            + offsetArrayPositionArray1, 
serializationVersion1);
                     Bytes.putShort(newArray, currentPosition, (short)(offset - 
Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
                 }
                 // offsets for nulls in the middle
                 for (int index = 0; index < array2FirstNonNullIndex; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putShort(newArray, currentPosition,
                             (short)(offset + array2StartingPosition - 
Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
@@ -944,7 +809,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                         + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
                 for (int index = array2FirstNonNullIndex; index < 
actualLengthOfArray2; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putShort(newArray, currentPosition, (short)(offset - 
array2FirstNonNullElementOffset
                             + part2NonNullStartingPosition - Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
@@ -1013,13 +878,13 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         ptr.set(PVarcharArray.INSTANCE.toBytes(phoenixArray, 
PVarchar.INSTANCE, sortOrder));
         return true;
     }
-
-    public static int serailizeOffsetArrayIntoStream(DataOutputStream oStream, 
TrustedByteArrayOutputStream byteStream,
-            int noOfElements, int maxOffset, int[] offsetPos) throws 
IOException {
+    
+    public static int serializeOffsetArrayIntoStream(DataOutputStream oStream, 
TrustedByteArrayOutputStream byteStream,
+            int noOfElements, int maxOffset, int[] offsetPos, byte 
serializationVersion) throws IOException {
         int offsetPosition = (byteStream.size());
         byte[] offsetArr = null;
         boolean useInt = true;
-        if (PArrayDataType.useShortForOffsetArray(maxOffset)) {
+        if (PArrayDataType.useShortForOffsetArray(maxOffset, 
serializationVersion)) {
             offsetArr = new byte[PArrayDataType.initOffsetArray(noOfElements, 
Bytes.SIZEOF_SHORT)];
             useInt = false;
         } else {
@@ -1034,7 +899,8 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             }
         } else {
             for (int pos : offsetPos) {
-                Bytes.putShort(offsetArr, off, (short)(pos - Short.MAX_VALUE));
+                short val = serializationVersion == 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION ? (short)pos : (short)(pos - 
Short.MAX_VALUE);
+                               Bytes.putShort(offsetArr, off, val);
                 off += Bytes.SIZEOF_SHORT;
             }
         }
@@ -1043,18 +909,11 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         return noOfElements;
     }
 
-    public static void serializeHeaderInfoIntoBuffer(ByteBuffer buffer, int 
noOfElements) {
-        // No of elements
-        buffer.putInt(noOfElements);
-        // Version of the array
-        buffer.put(ARRAY_SERIALIZATION_VERSION);
-    }
-
-    public static void serializeHeaderInfoIntoStream(DataOutputStream oStream, 
int noOfElements) throws IOException {
+    public static void serializeHeaderInfoIntoStream(DataOutputStream oStream, 
int noOfElements, byte serializationVersion) throws IOException {
         // No of elements
         oStream.writeInt(noOfElements);
         // Version of the array
-        oStream.write(ARRAY_SERIALIZATION_VERSION);
+        oStream.write(serializationVersion);
     }
 
     public static int initOffsetArray(int noOfElements, int baseSize) {
@@ -1228,91 +1087,4 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         buf.append(']');
         return buf.toString();
     }
-
-    // FIXME: remove this duplicate code
-    static public class PArrayDataTypeBytesArrayBuilder<T> {
-        static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
-
-        private PDataType baseType;
-        private SortOrder sortOrder;
-        private List<Integer> offsetPos;
-        private TrustedByteArrayOutputStream byteStream;
-        private DataOutputStream oStream;
-        private int nulls;
-
-        public PArrayDataTypeBytesArrayBuilder(PDataType baseType, SortOrder 
sortOrder) {
-            this.baseType = baseType;
-            this.sortOrder = sortOrder;
-            offsetPos = new LinkedList<Integer>();
-            byteStream = new 
TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE);
-            oStream = new DataOutputStream(byteStream);
-            nulls = 0;
-        }
-
-        private void close() {
-            try {
-                if (byteStream != null) byteStream.close();
-                if (oStream != null) oStream.close();
-                byteStream = null;
-                oStream = null;
-            } catch (IOException ioe) {}
-        }
-
-        public boolean appendElem(byte[] bytes) {
-            return appendElem(bytes, 0, bytes.length);
-        }
-
-        public boolean appendElem(byte[] bytes, int offset, int len) {
-            if (oStream == null || byteStream == null) return false;
-            try {
-                if (!baseType.isFixedWidth()) {
-                    if (len == 0) {
-                        offsetPos.add(byteStream.size());
-                        nulls++;
-                    } else {
-                        nulls = serializeNulls(oStream, nulls);
-                        offsetPos.add(byteStream.size());
-                        if (sortOrder == SortOrder.DESC) {
-                            SortOrder.invert(bytes, offset, bytes, offset, 
len);
-                            offset = 0;
-                        }
-                        oStream.write(bytes, offset, len);
-                        oStream.write(getSeparatorByte(true, sortOrder));
-                    }
-                } else {
-                    if (sortOrder == SortOrder.DESC) {
-                        SortOrder.invert(bytes, offset, bytes, offset, len);
-                        offset = 0;
-                    }
-                    oStream.write(bytes, offset, len);
-                }
-                return true;
-            } catch (IOException e) {}
-            return false;
-        }
-
-        public byte[] getBytesAndClose(SortOrder sortOrder) {
-            try {
-                if (!baseType.isFixedWidth()) {
-                    int noOfElements = offsetPos.size();
-                    int[] offsetPosArray = new int[noOfElements];
-                    int index = 0;
-                    for (Integer i : offsetPos) {
-                        offsetPosArray[index] = i;
-                        ++index;
-                    }
-                    PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, 
sortOrder);
-                    noOfElements = 
PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
-                            offsetPosArray[offsetPosArray.length - 1], 
offsetPosArray);
-                    serializeHeaderInfoIntoStream(oStream, noOfElements);
-                }
-                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                ptr.set(byteStream.getBuffer(), 0, byteStream.size());
-                return ByteUtil.copyKeyBytesIfNecessary(ptr);
-            } catch (IOException e) {} finally {
-                close();
-            }
-            return null;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
new file mode 100644
index 0000000..7a6ea91
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnValueDecoder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+
+
+public class PArrayDataTypeDecoder implements ColumnValueDecoder {
+    
+    @Override
+    public boolean decode(ImmutableBytesWritable ptr, int index) {
+        return PArrayDataTypeDecoder.positionAtArrayElement(ptr, index, 
PVarbinary.INSTANCE, null);
+    }
+
+    public static boolean positionAtArrayElement(Tuple tuple, 
ImmutableBytesWritable ptr, int index,
+            Expression arrayExpr, PDataType pDataType, Integer maxLen) {
+        if (!arrayExpr.evaluate(tuple, ptr)) {
+            return false;
+        } else if (ptr.getLength() == 0) { return true; }
+    
+        // Given a ptr to the entire array, set ptr to point to a particular 
element within that array
+        // given the type of an array element (see comments in 
PDataTypeForArray)
+        return positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
+    }
+
+    public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, 
int arrayIndex, PDataType baseDataType,
+            Integer byteSize) {
+        byte[] bytes = ptr.get();
+        int initPos = ptr.getOffset();
+        if (!baseDataType.isFixedWidth()) {
+               byte serializationVersion = bytes[ptr.getOffset() + 
ptr.getLength() - Bytes.SIZEOF_BYTE];
+            int noOfElements = Bytes.toInt(bytes,
+                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
+            boolean useShort = true;
+            if (noOfElements < 0) {
+                noOfElements = -noOfElements;
+                useShort = false;
+            }
+            if (arrayIndex >= noOfElements) {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                return false;
+            }
+    
+            int indexOffset = Bytes.toInt(bytes,
+                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
+            // Skip those many offsets as given in the arrayIndex
+            // If suppose there are 5 elements in the array and the arrayIndex 
= 3
+            // This means we need to read the 4th element of the array
+            // So inorder to know the length of the 4th element we will read 
the offset of 4th element and the
+            // offset of 5th element.
+            // Subtracting the offset of 5th element and 4th element will give 
the length of 4th element
+            // So we could just skip reading the other elements.
+            int currOffset = PArrayDataType.getSerializedOffset(bytes, 
arrayIndex, useShort, indexOffset, serializationVersion);
+            if (currOffset<0) {
+               ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                return false;
+            }
+            int elementLength = 0;
+            if (arrayIndex == (noOfElements - 1)) {
+                int separatorBytes =  serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 3 : 0;
+                elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
+                        - (currOffset + initPos) - separatorBytes;
+            } else {
+                int separatorByte =  serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 1 : 0;
+                elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : PArrayDataType.getOffset(bytes,
+                        arrayIndex + 1, useShort, indexOffset, 
serializationVersion) - currOffset - separatorByte;
+            }
+            ptr.set(bytes, currOffset + initPos, elementLength);
+        } else {
+            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() 
: byteSize);
+            int offset = arrayIndex * elemByteSize;
+            if (offset >= ptr.getLength()) {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            } else {
+                ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
+            }
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
new file mode 100644
index 0000000..bb293bb
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.ColumnValueEncoder;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+public class PArrayDataTypeEncoder implements ColumnValueEncoder {
+    static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
+
+    private PDataType baseType;
+    private SortOrder sortOrder;
+    private List<Integer> offsetPos;
+    private TrustedByteArrayOutputStream byteStream;
+    private DataOutputStream oStream;
+    private int nulls;
+    private byte serializationVersion;
+    private boolean rowKeyOrderOptimizable;
+
+    public PArrayDataTypeEncoder(PDataType baseType, SortOrder sortOrder) {
+        this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), new 
LinkedList<Integer>(), baseType, sortOrder, true);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
+            int numElements, PDataType baseType, SortOrder sortOrder, boolean 
rowKeyOrderOptimizable, byte serializationVersion) {
+        this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
+            int numElements, PDataType baseType, SortOrder sortOrder, boolean 
rowKeyOrderOptimizable) {
+        this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
+            List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable) {
+        this(byteStream, new DataOutputStream(byteStream), offsetPos, 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
+            List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable, byte serializationVersion) {
+        this.baseType = baseType;
+        this.sortOrder = sortOrder;
+        this.offsetPos = offsetPos;
+        this.byteStream = byteStream;
+        this.oStream = oStream;
+        this.nulls = 0;
+        this.serializationVersion = serializationVersion;
+        this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
+    }
+
+    private void close() {
+        try {
+            if (byteStream != null) byteStream.close();
+            if (oStream != null) oStream.close();
+            byteStream = null;
+            oStream = null;
+        } catch (IOException ioe) {}
+    }
+    
+    // used to represent the absence of a value 
+    @Override
+    public void appendAbsentValue() {
+        if (serializationVersion == 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) {
+            offsetPos.add(-byteStream.size());
+            nulls++;
+        }
+        else {
+            throw new UnsupportedOperationException("Cannot represent an 
absent element");
+        }
+    }
+
+    public void appendValue(byte[] bytes) {
+        appendValue(bytes, 0, bytes.length);
+    }
+
+    @Override
+    public void appendValue(byte[] bytes, int offset, int len) {
+        try {
+            // track the offset position here from the size of the byteStream
+            if (!baseType.isFixedWidth()) {
+                // Any variable length array would follow the below order
+                // Every element would be seperated by a seperator byte '0'
+                // Null elements are counted and once a first non null element 
appears we
+                // write the count of the nulls prefixed with a seperator byte
+                // Trailing nulls are not taken into account
+                // The last non null element is followed by two seperator bytes
+                // For eg
+                // a, b, null, null, c, null would be 
+                // 65 0 66 0 0 2 67 0 0 0
+                // a null null null b c null d would be
+                // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
+                if (len == 0) {
+                    offsetPos.add(byteStream.size());
+                    nulls++;
+                } else {
+                    nulls = PArrayDataType.serializeNulls(oStream, nulls);
+                    offsetPos.add(byteStream.size());
+                    if (sortOrder == SortOrder.DESC) {
+                        SortOrder.invert(bytes, offset, bytes, offset, len);
+                        offset = 0;
+                    }
+                    oStream.write(bytes, offset, len);
+                    if (serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+                        
oStream.write(PArrayDataType.getSeparatorByte(rowKeyOrderOptimizable, 
sortOrder));
+                    }
+                }
+            } else {
+                // No nulls for fixed length
+                if (sortOrder == SortOrder.DESC) {
+                    SortOrder.invert(bytes, offset, bytes, offset, len);
+                    offset = 0;
+                }
+                oStream.write(bytes, offset, len);
+            }
+        } catch (IOException e) {}
+    }
+
+    @Override
+    public byte[] encode() {
+        try {
+            if (!baseType.isFixedWidth()) {
+                int noOfElements = offsetPos.size();
+                int[] offsetPosArray = new int[noOfElements];
+                int index = 0;
+                for (Integer i : offsetPos) {
+                    offsetPosArray[index] = i;
+                    ++index;
+                }
+                if (serializationVersion == 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+                    // Double seperator byte to show end of the non null array
+                    PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, 
sortOrder, rowKeyOrderOptimizable);
+                }
+                noOfElements = 
PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
+                        offsetPosArray[offsetPosArray.length - 1], 
offsetPosArray, serializationVersion);
+                PArrayDataType.serializeHeaderInfoIntoStream(oStream, 
noOfElements, serializationVersion);
+            }
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            ptr.set(byteStream.getBuffer(), 0, byteStream.size());
+            return ByteUtil.copyKeyBytesIfNecessary(ptr);
+        } catch (IOException e) {} finally {
+            close();
+        }
+        return null;
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
new file mode 100644
index 0000000..59e99fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.DelegateExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class EncodedColumnsUtil {
+
+    public static boolean usesEncodedColumnNames(PTable table) {
+        return usesEncodedColumnNames(table.getEncodingScheme());
+    }
+    
+    public static boolean usesEncodedColumnNames(QualifierEncodingScheme 
encodingScheme) {
+        return encodingScheme != null && encodingScheme != 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+    }
+    
+    public static void setColumns(PColumn column, PTable table, Scan scan) {
+       if (table.getImmutableStorageScheme() == 
ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+            // if a table storage scheme is COLUMNS_STORED_IN_SINGLE_CELL set 
then all columns of a column family are stored in a single cell 
+            // (with the qualifier name being same as the family name), just 
project the column family here
+            // so that we can calculate estimatedByteSize correctly in 
ProjectionCompiler 
+               scan.addFamily(column.getFamilyName().getBytes());
+       }
+        else {
+            if (column.getColumnQualifierBytes() != null) {
+                scan.addColumn(column.getFamilyName().getBytes(), 
column.getColumnQualifierBytes());
+            }
+        }
+    }
+    
+    public static QualifierEncodingScheme getQualifierEncodingScheme(Scan s) {
+        // null check for backward compatibility
+        return 
s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME) == null ? 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : 
QualifierEncodingScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME)[0]);
+    }
+    
+    public static ImmutableStorageScheme getImmutableStorageScheme(Scan s) {
+        // null check for backward compatibility
+        return 
s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME) == 
null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : 
ImmutableStorageScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME)[0]);
+    }
+
+    /**
+     * @return pair of byte arrays. The first part of the pair is the empty 
key value's column qualifier, and the second
+     *         part is the value to use for it.
+     */
+    public static Pair<byte[], byte[]> getEmptyKeyValueInfo(PTable table) {
+        return usesEncodedColumnNames(table) ? new 
Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new 
Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+    }
+
+    /**
+     * @return pair of byte arrays. The first part of the pair is the empty 
key value's column qualifier, and the second
+     *         part is the value to use for it.
+     */
+    public static Pair<byte[], byte[]> getEmptyKeyValueInfo(boolean 
usesEncodedColumnNames) {
+        return usesEncodedColumnNames ? new 
Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new 
Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+    }
+    
+    /**
+     * @return pair of byte arrays. The first part of the pair is the empty 
key value's column qualifier, and the second
+     *         part is the value to use for it.
+     */
+    public static Pair<byte[], byte[]> 
getEmptyKeyValueInfo(QualifierEncodingScheme encodingScheme) {
+        return usesEncodedColumnNames(encodingScheme) ? new 
Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new 
Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+    }
+
+    public static Pair<Integer, Integer> getMinMaxQualifiersFromScan(Scan 
scan) {
+        Integer minQ = null, maxQ = null;
+        byte[] minQualifier = 
scan.getAttribute(BaseScannerRegionObserver.MIN_QUALIFIER);
+        if (minQualifier != null) {
+            minQ = Bytes.toInt(minQualifier);
+        }
+        byte[] maxQualifier = 
scan.getAttribute(BaseScannerRegionObserver.MAX_QUALIFIER);
+        if (maxQualifier != null) {
+            maxQ = Bytes.toInt(maxQualifier);
+        }
+        if (minQualifier == null) {
+            return null;
+        }
+        return new Pair<>(minQ, maxQ);
+    }
+
+    public static boolean setQualifierRanges(PTable table) {
+        return table.getImmutableStorageScheme() != null
+                && table.getImmutableStorageScheme() == 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN
+                && usesEncodedColumnNames(table) && !table.isTransactional()
+                && !ScanUtil.hasDynamicColumns(table);
+    }
+
+    public static boolean useQualifierAsIndex(Pair<Integer, Integer> 
minMaxQualifiers) {
+        return minMaxQualifiers != null;
+    }
+
+    public static Map<String, Pair<Integer, Integer>> 
getFamilyQualifierRanges(PTable table) {
+        checkNotNull(table);
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        Preconditions.checkArgument(encodingScheme != NON_ENCODED_QUALIFIERS);
+        if (table.getEncodedCQCounter() != null) {
+            Map<String, Integer> values = table.getEncodedCQCounter().values();
+            Map<String, Pair<Integer, Integer>> toReturn = 
Maps.newHashMapWithExpectedSize(values.size());
+            for (Entry<String, Integer> e : values.entrySet()) {
+                Integer lowerBound = 
QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+                Integer upperBound = e.getValue() - 1;
+                if (lowerBound > upperBound) {
+                    lowerBound = upperBound;
+                }
+                toReturn.put(e.getKey(), new Pair<>(lowerBound, upperBound));
+            }
+            return toReturn;
+        }
+        return Collections.emptyMap();
+    }
+    
+    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, PTable table, boolean isPk) {
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        return getColumnQualifierBytes(columnName, numberBasedQualifier, 
encodingScheme, isPk);
+    }
+    
+    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, QualifierEncodingScheme encodingScheme, boolean isPk) {
+        if (isPk) {
+            return null;
+        }
+        if (encodingScheme == null || encodingScheme == 
NON_ENCODED_QUALIFIERS) {
+            return Bytes.toBytes(columnName);
+        }
+        return encodingScheme.encode(numberBasedQualifier);
+    }
+    
+    public static Expression[] createColumnExpressionArray(int 
maxEncodedColumnQualifier) {
+        // reserve the first position and offset maxEncodedColumnQualifier by 
ENCODED_CQ_COUNTER_INITIAL_VALUE (which is the minimum encoded column qualifier)
+        int numElements = maxEncodedColumnQualifier - 
QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE + 2;
+        Expression[] colValues = new Expression[numElements];
+        Arrays.fill(colValues, new 
DelegateExpression(LiteralExpression.newConstant(null)) {
+                   @Override
+                   public boolean evaluate(Tuple tuple, ImmutableBytesWritable 
ptr) {
+                       return false;
+                   }
+               });
+        // 0 is a reserved position, set it to a non-null value so that we can 
represent absence of a value using a negative offset
+        
colValues[0]=LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        return colValues;
+    }
+
+    public static boolean isReservedColumnQualifier(int number) {
+        if (number < 0) {
+            throw new IllegalArgumentException("Negative column qualifier" + 
number + " not allowed ");
+        }
+        return number < QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+    }
+}

Reply via email to